You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/09/29 19:32:30 UTC

spark git commit: [SPARK-17648][CORE] TaskScheduler really needs offers to be an IndexedSeq

Repository: spark
Updated Branches:
  refs/heads/master 958200497 -> 7f779e743


[SPARK-17648][CORE] TaskScheduler really needs offers to be an IndexedSeq

## What changes were proposed in this pull request?

The Seq[WorkerOffer] is accessed by index, so it really should be an
IndexedSeq, otherwise an O(n) operation becomes O(n^2).  In practice
this hasn't been an issue b/c where these offers are generated, the call
to `.toSeq` just happens to create an IndexedSeq anyway.I got bitten by
this in performance tests I was doing, and its better for the types to be
more precise so eg. a change in Scala doesn't destroy performance.

## How was this patch tested?

Unit tests via jenkins.

Author: Imran Rashid <ir...@cloudera.com>

Closes #15221 from squito/SPARK-17648.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f779e74
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f779e74
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f779e74

Branch: refs/heads/master
Commit: 7f779e7439127efa0e3611f7745e1c8423845198
Parents: 9582004
Author: Imran Rashid <ir...@cloudera.com>
Authored: Thu Sep 29 15:36:40 2016 -0400
Committer: Andrew Or <an...@gmail.com>
Committed: Thu Sep 29 15:36:40 2016 -0400

----------------------------------------------------------------------
 .../spark/scheduler/TaskSchedulerImpl.scala     |  4 +--
 .../cluster/CoarseGrainedSchedulerBackend.scala |  4 +--
 .../scheduler/local/LocalSchedulerBackend.scala |  2 +-
 .../scheduler/SchedulerIntegrationSuite.scala   |  7 ++---
 .../scheduler/TaskSchedulerImplSuite.scala      | 32 ++++++++++----------
 .../MesosFineGrainedSchedulerBackend.scala      |  2 +-
 .../MesosFineGrainedSchedulerBackendSuite.scala |  2 +-
 7 files changed, 26 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7f779e74/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 52a7186..0ad4730 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -252,7 +252,7 @@ private[spark] class TaskSchedulerImpl(
       maxLocality: TaskLocality,
       shuffledOffers: Seq[WorkerOffer],
       availableCpus: Array[Int],
-      tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
+      tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
     var launchedTask = false
     for (i <- 0 until shuffledOffers.size) {
       val execId = shuffledOffers(i).executorId
@@ -286,7 +286,7 @@ private[spark] class TaskSchedulerImpl(
    * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
    * that tasks are balanced across the cluster.
    */
-  def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
+  def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
     // Mark each slave as alive and remember its hostname
     // Also track if new executor is added
     var newExecAvail = false

http://git-wip-us.apache.org/repos/asf/spark/blob/7f779e74/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index edc3c19..2d09863 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -216,7 +216,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
       val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
       val workOffers = activeExecutors.map { case (id, executorData) =>
         new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
-      }.toSeq
+      }.toIndexedSeq
       launchTasks(scheduler.resourceOffers(workOffers))
     }
 
@@ -233,7 +233,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
       // Filter out executors under killing
       if (executorIsAlive(executorId)) {
         val executorData = executorDataMap(executorId)
-        val workOffers = Seq(
+        val workOffers = IndexedSeq(
           new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
         launchTasks(scheduler.resourceOffers(workOffers))
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/7f779e74/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
index e386052..7a73e8e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
@@ -81,7 +81,7 @@ private[spark] class LocalEndpoint(
   }
 
   def reviveOffers() {
-    val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
+    val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
     for (task <- scheduler.resourceOffers(offers).flatten) {
       freeCores -= scheduler.CPUS_PER_TASK
       executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,

http://git-wip-us.apache.org/repos/asf/spark/blob/7f779e74/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 14f52a6..5cd548b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -366,13 +366,13 @@ private[spark] abstract class MockBackend(
    */
   def executorIdToExecutor: Map[String, ExecutorTaskStatus]
 
-  private def generateOffers(): Seq[WorkerOffer] = {
+  private def generateOffers(): IndexedSeq[WorkerOffer] = {
     executorIdToExecutor.values.filter { exec =>
       exec.freeCores > 0
     }.map { exec =>
       WorkerOffer(executorId = exec.executorId, host = exec.host,
         cores = exec.freeCores)
-    }.toSeq
+    }.toIndexedSeq
   }
 
   /**
@@ -381,8 +381,7 @@ private[spark] abstract class MockBackend(
    * scheduling.
    */
   override def reviveOffers(): Unit = {
-    val offers: Seq[WorkerOffer] = generateOffers()
-    val newTaskDescriptions = taskScheduler.resourceOffers(offers).flatten
+    val newTaskDescriptions = taskScheduler.resourceOffers(generateOffers()).flatten
     // get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual
     // tests from introducing a race if they need it
     val newTasks = taskScheduler.synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/7f779e74/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 100b157..61787b5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -87,7 +87,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
   test("Scheduler does not always schedule tasks on the same workers") {
     val taskScheduler = setupScheduler()
     val numFreeCores = 1
-    val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores),
+    val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores),
       new WorkerOffer("executor1", "host1", numFreeCores))
     // Repeatedly try to schedule a 1-task job, and make sure that it doesn't always
     // get scheduled on the same executor. While there is a chance this test will fail
@@ -112,7 +112,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     val taskCpus = 2
     val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString)
     // Give zero core offers. Should not generate any tasks
-    val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0),
+    val zeroCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 0),
       new WorkerOffer("executor1", "host1", 0))
     val taskSet = FakeTask.createTaskSet(1)
     taskScheduler.submitTasks(taskSet)
@@ -121,7 +121,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
 
     // No tasks should run as we only have 1 core free.
     val numFreeCores = 1
-    val singleCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores),
+    val singleCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores),
       new WorkerOffer("executor1", "host1", numFreeCores))
     taskScheduler.submitTasks(taskSet)
     taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten
@@ -129,7 +129,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
 
     // Now change the offers to have 2 cores in one executor and verify if it
     // is chosen.
-    val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus),
+    val multiCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", taskCpus),
       new WorkerOffer("executor1", "host1", numFreeCores))
     taskScheduler.submitTasks(taskSet)
     taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
@@ -144,7 +144,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     val numFreeCores = 1
     val taskSet = new TaskSet(
       Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
-    val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus),
+    val multiCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", taskCpus),
       new WorkerOffer("executor1", "host1", numFreeCores))
     taskScheduler.submitTasks(taskSet)
     var taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
@@ -184,7 +184,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     val taskScheduler = setupScheduler()
 
     val numFreeCores = 1
-    val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores))
+    val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores))
     val attempt1 = FakeTask.createTaskSet(10)
 
     // submit attempt 1, offer some resources, some tasks get scheduled
@@ -216,7 +216,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     val taskScheduler = setupScheduler()
 
     val numFreeCores = 10
-    val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores))
+    val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores))
     val attempt1 = FakeTask.createTaskSet(10)
 
     // submit attempt 1, offer some resources, some tasks get scheduled
@@ -254,8 +254,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
   test("tasks are not re-scheduled while executor loss reason is pending") {
     val taskScheduler = setupScheduler()
 
-    val e0Offers = Seq(new WorkerOffer("executor0", "host0", 1))
-    val e1Offers = Seq(new WorkerOffer("executor1", "host0", 1))
+    val e0Offers = IndexedSeq(new WorkerOffer("executor0", "host0", 1))
+    val e1Offers = IndexedSeq(new WorkerOffer("executor1", "host0", 1))
     val attempt1 = FakeTask.createTaskSet(1)
 
     // submit attempt 1, offer resources, task gets scheduled
@@ -296,7 +296,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     taskScheduler.submitTasks(taskSet)
     val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId, taskSet.stageAttemptId).get
 
-    val firstTaskAttempts = taskScheduler.resourceOffers(Seq(
+    val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
       new WorkerOffer("executor0", "host0", 1),
       new WorkerOffer("executor1", "host1", 1)
     )).flatten
@@ -313,7 +313,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     // on that executor, and make sure that the other task (not the failed one) is assigned there
     taskScheduler.executorLost("executor1", SlaveLost("oops"))
     val nextTaskAttempts =
-      taskScheduler.resourceOffers(Seq(new WorkerOffer("executor0", "host0", 1))).flatten
+      taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1))).flatten
     // Note: Its OK if some future change makes this already realize the taskset has become
     // unschedulable at this point (though in the current implementation, we're sure it will not)
     assert(nextTaskAttempts.size === 1)
@@ -323,7 +323,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
 
     // now we should definitely realize that our task set is unschedulable, because the only
     // task left can't be scheduled on any executors due to the blacklist
-    taskScheduler.resourceOffers(Seq(new WorkerOffer("executor0", "host0", 1)))
+    taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1)))
     sc.listenerBus.waitUntilEmpty(100000)
     assert(tsm.isZombie)
     assert(failedTaskSet)
@@ -348,7 +348,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     taskScheduler.submitTasks(taskSet)
     val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId, taskSet.stageAttemptId).get
 
-    val offers = Seq(
+    val offers = IndexedSeq(
       // each offer has more than enough free cores for the entire task set, so when combined
       // with the locality preferences, we schedule all tasks on one executor
       new WorkerOffer("executor0", "host0", 4),
@@ -380,7 +380,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
       (0 until 2).map { _ => Seq(TaskLocation("host0", "executor2"))}: _*
     ))
 
-    val taskDescs = taskScheduler.resourceOffers(Seq(
+    val taskDescs = taskScheduler.resourceOffers(IndexedSeq(
       new WorkerOffer("executor0", "host0", 1),
       new WorkerOffer("executor1", "host1", 1)
     )).flatten
@@ -396,7 +396,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     // when executor2 is added, we should realize that we can run process-local tasks.
     // And we should know its alive on the host.
     val secondTaskDescs = taskScheduler.resourceOffers(
-      Seq(new WorkerOffer("executor2", "host0", 1))).flatten
+      IndexedSeq(new WorkerOffer("executor2", "host0", 1))).flatten
     assert(secondTaskDescs.size === 1)
     assert(mgr.myLocalityLevels.toSet ===
       Set(TaskLocality.PROCESS_LOCAL, TaskLocality.NODE_LOCAL, TaskLocality.ANY))
@@ -406,7 +406,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     // And even if we don't have anything left to schedule, another resource offer on yet another
     // executor should also update the set of live executors
     val thirdTaskDescs = taskScheduler.resourceOffers(
-      Seq(new WorkerOffer("executor3", "host1", 1))).flatten
+      IndexedSeq(new WorkerOffer("executor3", "host1", 1))).flatten
     assert(thirdTaskDescs.size === 0)
     assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1", "executor3")))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7f779e74/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index eb3b235..09a252f 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -286,7 +286,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
           o.getSlaveId.getValue,
           o.getHostname,
           cpus)
-      }
+      }.toIndexedSeq
 
       val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap
       val slaveIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap

http://git-wip-us.apache.org/repos/asf/spark/blob/7f779e74/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
index 7a706ab..1d7a86f 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
@@ -283,7 +283,7 @@ class MesosFineGrainedSchedulerBackendSuite
     mesosOffers2.add(createOffer(1, minMem, minCpu))
     reset(taskScheduler)
     reset(driver)
-    when(taskScheduler.resourceOffers(any(classOf[Seq[WorkerOffer]]))).thenReturn(Seq(Seq()))
+    when(taskScheduler.resourceOffers(any(classOf[IndexedSeq[WorkerOffer]]))).thenReturn(Seq(Seq()))
     when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
     when(driver.declineOffer(mesosOffers2.get(0).getId)).thenReturn(Status.valueOf(1))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org