You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2017/02/07 06:38:16 UTC

spark git commit: [SPARK-18967][SCHEDULER] compute locality levels even if delay = 0

Repository: spark
Updated Branches:
  refs/heads/master 7a0a630e0 -> d9043092c


[SPARK-18967][SCHEDULER] compute locality levels even if delay = 0

## What changes were proposed in this pull request?

Before this change, with delay scheduling off, spark would effectively
ignore locality preferences for bulk scheduling.  With this change,
locality preferences are used when multiple offers are made
simultaneously.

## How was this patch tested?

Test case added which fails without this change.  All unit tests run via jenkins.

Author: Imran Rashid <ir...@cloudera.com>

Closes #16376 from squito/locality_without_delay.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9043092
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9043092
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9043092

Branch: refs/heads/master
Commit: d9043092caf71d5fa6be18ae8c51a0158bc2218e
Parents: 7a0a630
Author: Imran Rashid <ir...@cloudera.com>
Authored: Mon Feb 6 22:37:37 2017 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Mon Feb 6 22:37:37 2017 -0800

----------------------------------------------------------------------
 .../spark/scheduler/TaskSchedulerImpl.scala     | 13 ++-
 .../apache/spark/scheduler/TaskSetManager.scala | 13 ++-
 .../scheduler/TaskSchedulerImplSuite.scala      | 87 +++++++++++++++++++-
 3 files changed, 105 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d9043092/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 72ed55a..8ce2ca3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -54,7 +54,7 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils}
 private[spark] class TaskSchedulerImpl private[scheduler](
     val sc: SparkContext,
     val maxTaskFailures: Int,
-    blacklistTrackerOpt: Option[BlacklistTracker],
+    private[scheduler] val blacklistTrackerOpt: Option[BlacklistTracker],
     isLocal: Boolean = false)
   extends TaskScheduler with Logging
 {
@@ -337,8 +337,7 @@ private[spark] class TaskSchedulerImpl private[scheduler](
       }
     }.getOrElse(offers)
 
-    // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
-    val shuffledOffers = Random.shuffle(filteredOffers)
+    val shuffledOffers = shuffleOffers(filteredOffers)
     // Build a list of tasks to assign to each worker.
     val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
     val availableCpus = shuffledOffers.map(o => o.cores).toArray
@@ -375,6 +374,14 @@ private[spark] class TaskSchedulerImpl private[scheduler](
     return tasks
   }
 
+  /**
+   * Shuffle offers around to avoid always placing tasks on the same workers.  Exposed to allow
+   * overriding in tests, so it can be deterministic.
+   */
+  protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = {
+    Random.shuffle(offers)
+  }
+
   def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
     var failedExecutor: Option[String] = None
     var reason: Option[ExecutorLossReason] = None

http://git-wip-us.apache.org/repos/asf/spark/blob/d9043092/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 8825143..3b25513 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -163,7 +163,12 @@ private[spark] class TaskSetManager(
     addPendingTask(i)
   }
 
-  // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling
+  /**
+   * Track the set of locality levels which are valid given the tasks locality preferences and
+   * the set of currently available executors.  This is updated as executors are added and removed.
+   * This allows a performance optimization, of skipping levels that aren't relevant (eg., skip
+   * PROCESS_LOCAL if no tasks could be run PROCESS_LOCAL for the current set of executors).
+   */
   var myLocalityLevels = computeValidLocalityLevels()
   var localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level
 
@@ -961,18 +966,18 @@ private[spark] class TaskSetManager(
   private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
     import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
     val levels = new ArrayBuffer[TaskLocality.TaskLocality]
-    if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
+    if (!pendingTasksForExecutor.isEmpty &&
         pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
       levels += PROCESS_LOCAL
     }
-    if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 &&
+    if (!pendingTasksForHost.isEmpty &&
         pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
       levels += NODE_LOCAL
     }
     if (!pendingTasksWithNoPrefs.isEmpty) {
       levels += NO_PREF
     }
-    if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&
+    if (!pendingTasksForRack.isEmpty &&
         pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
       levels += RACK_LOCAL
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/d9043092/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 304dc9d..9ae0bcd 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -29,7 +29,7 @@ import org.scalatest.mock.MockitoSugar
 import org.apache.spark._
 import org.apache.spark.internal.config
 import org.apache.spark.internal.Logging
-import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.util.ManualClock
 
 class FakeSchedulerBackend extends SchedulerBackend {
   def start() {}
@@ -819,4 +819,89 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     assert(!taskScheduler.hasExecutorsAliveOnHost("host0"))
     assert(taskScheduler.getExecutorsAliveOnHost("host0").isEmpty)
   }
+
+  test("Locality should be used for bulk offers even with delay scheduling off") {
+    val conf = new SparkConf()
+      .set("spark.locality.wait", "0")
+    sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
+    // we create a manual clock just so we can be sure the clock doesn't advance at all in this test
+    val clock = new ManualClock()
+
+    // We customize the task scheduler just to let us control the way offers are shuffled, so we
+    // can be sure we try both permutations, and to control the clock on the tasksetmanager.
+    val taskScheduler = new TaskSchedulerImpl(sc) {
+      override def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = {
+        // Don't shuffle the offers around for this test.  Instead, we'll just pass in all
+        // the permutations we care about directly.
+        offers
+      }
+      override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = {
+        new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock)
+      }
+    }
+    // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
+    new DAGScheduler(sc, taskScheduler) {
+      override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
+      override def executorAdded(execId: String, host: String) {}
+    }
+    taskScheduler.initialize(new FakeSchedulerBackend)
+
+    // Make two different offers -- one in the preferred location, one that is not.
+    val offers = IndexedSeq(
+      WorkerOffer("exec1", "host1", 1),
+      WorkerOffer("exec2", "host2", 1)
+    )
+    Seq(false, true).foreach { swapOrder =>
+      // Submit a taskset with locality preferences.
+      val taskSet = FakeTask.createTaskSet(
+        1, stageId = 1, stageAttemptId = 0, Seq(TaskLocation("host1", "exec1")))
+      taskScheduler.submitTasks(taskSet)
+      val shuffledOffers = if (swapOrder) offers.reverse else offers
+      // Regardless of the order of the offers (after the task scheduler shuffles them), we should
+      // always take advantage of the local offer.
+      val taskDescs = taskScheduler.resourceOffers(shuffledOffers).flatten
+      withClue(s"swapOrder = $swapOrder") {
+        assert(taskDescs.size === 1)
+        assert(taskDescs.head.executorId === "exec1")
+      }
+    }
+  }
+
+  test("With delay scheduling off, tasks can be run at any locality level immediately") {
+    val conf = new SparkConf()
+      .set("spark.locality.wait", "0")
+    sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
+
+    // we create a manual clock just so we can be sure the clock doesn't advance at all in this test
+    val clock = new ManualClock()
+    val taskScheduler = new TaskSchedulerImpl(sc) {
+      override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = {
+        new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock)
+      }
+    }
+    // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
+    new DAGScheduler(sc, taskScheduler) {
+      override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
+      override def executorAdded(execId: String, host: String) {}
+    }
+    taskScheduler.initialize(new FakeSchedulerBackend)
+    // make an offer on the preferred host so the scheduler knows its alive.  This is necessary
+    // so that the taskset knows that it *could* take advantage of locality.
+    taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec1", "host1", 1)))
+
+    // Submit a taskset with locality preferences.
+    val taskSet = FakeTask.createTaskSet(
+      1, stageId = 1, stageAttemptId = 0, Seq(TaskLocation("host1", "exec1")))
+    taskScheduler.submitTasks(taskSet)
+    val tsm = taskScheduler.taskSetManagerForAttempt(1, 0).get
+    // make sure we've setup our test correctly, so that the taskset knows it *could* use local
+    // offers.
+    assert(tsm.myLocalityLevels.contains(TaskLocality.NODE_LOCAL))
+    // make an offer on a non-preferred location.  Since the delay is 0, we should still schedule
+    // immediately.
+    val taskDescs =
+      taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec2", "host2", 1))).flatten
+    assert(taskDescs.size === 1)
+    assert(taskDescs.head.executorId === "exec2")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org