You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/09/29 19:32:30 UTC
spark git commit: [SPARK-17648][CORE] TaskScheduler really needs
offers to be an IndexedSeq
Repository: spark
Updated Branches:
refs/heads/master 958200497 -> 7f779e743
[SPARK-17648][CORE] TaskScheduler really needs offers to be an IndexedSeq
## What changes were proposed in this pull request?
The Seq[WorkerOffer] is accessed by index, so it really should be an
IndexedSeq, otherwise an O(n) operation becomes O(n^2). In practice
this hasn't been an issue b/c where these offers are generated, the call
to `.toSeq` just happens to create an IndexedSeq anyway.I got bitten by
this in performance tests I was doing, and its better for the types to be
more precise so eg. a change in Scala doesn't destroy performance.
## How was this patch tested?
Unit tests via jenkins.
Author: Imran Rashid <ir...@cloudera.com>
Closes #15221 from squito/SPARK-17648.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f779e74
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f779e74
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f779e74
Branch: refs/heads/master
Commit: 7f779e7439127efa0e3611f7745e1c8423845198
Parents: 9582004
Author: Imran Rashid <ir...@cloudera.com>
Authored: Thu Sep 29 15:36:40 2016 -0400
Committer: Andrew Or <an...@gmail.com>
Committed: Thu Sep 29 15:36:40 2016 -0400
----------------------------------------------------------------------
.../spark/scheduler/TaskSchedulerImpl.scala | 4 +--
.../cluster/CoarseGrainedSchedulerBackend.scala | 4 +--
.../scheduler/local/LocalSchedulerBackend.scala | 2 +-
.../scheduler/SchedulerIntegrationSuite.scala | 7 ++---
.../scheduler/TaskSchedulerImplSuite.scala | 32 ++++++++++----------
.../MesosFineGrainedSchedulerBackend.scala | 2 +-
.../MesosFineGrainedSchedulerBackendSuite.scala | 2 +-
7 files changed, 26 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/7f779e74/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 52a7186..0ad4730 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -252,7 +252,7 @@ private[spark] class TaskSchedulerImpl(
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
- tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
+ tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
@@ -286,7 +286,7 @@ private[spark] class TaskSchedulerImpl(
* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
* that tasks are balanced across the cluster.
*/
- def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
+ def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname
// Also track if new executor is added
var newExecAvail = false
http://git-wip-us.apache.org/repos/asf/spark/blob/7f779e74/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index edc3c19..2d09863 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -216,7 +216,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
- }.toSeq
+ }.toIndexedSeq
launchTasks(scheduler.resourceOffers(workOffers))
}
@@ -233,7 +233,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Filter out executors under killing
if (executorIsAlive(executorId)) {
val executorData = executorDataMap(executorId)
- val workOffers = Seq(
+ val workOffers = IndexedSeq(
new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
launchTasks(scheduler.resourceOffers(workOffers))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/7f779e74/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
index e386052..7a73e8e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
@@ -81,7 +81,7 @@ private[spark] class LocalEndpoint(
}
def reviveOffers() {
- val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
+ val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
for (task <- scheduler.resourceOffers(offers).flatten) {
freeCores -= scheduler.CPUS_PER_TASK
executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
http://git-wip-us.apache.org/repos/asf/spark/blob/7f779e74/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 14f52a6..5cd548b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -366,13 +366,13 @@ private[spark] abstract class MockBackend(
*/
def executorIdToExecutor: Map[String, ExecutorTaskStatus]
- private def generateOffers(): Seq[WorkerOffer] = {
+ private def generateOffers(): IndexedSeq[WorkerOffer] = {
executorIdToExecutor.values.filter { exec =>
exec.freeCores > 0
}.map { exec =>
WorkerOffer(executorId = exec.executorId, host = exec.host,
cores = exec.freeCores)
- }.toSeq
+ }.toIndexedSeq
}
/**
@@ -381,8 +381,7 @@ private[spark] abstract class MockBackend(
* scheduling.
*/
override def reviveOffers(): Unit = {
- val offers: Seq[WorkerOffer] = generateOffers()
- val newTaskDescriptions = taskScheduler.resourceOffers(offers).flatten
+ val newTaskDescriptions = taskScheduler.resourceOffers(generateOffers()).flatten
// get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual
// tests from introducing a race if they need it
val newTasks = taskScheduler.synchronized {
http://git-wip-us.apache.org/repos/asf/spark/blob/7f779e74/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 100b157..61787b5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -87,7 +87,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
test("Scheduler does not always schedule tasks on the same workers") {
val taskScheduler = setupScheduler()
val numFreeCores = 1
- val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores),
+ val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores),
new WorkerOffer("executor1", "host1", numFreeCores))
// Repeatedly try to schedule a 1-task job, and make sure that it doesn't always
// get scheduled on the same executor. While there is a chance this test will fail
@@ -112,7 +112,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
val taskCpus = 2
val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString)
// Give zero core offers. Should not generate any tasks
- val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0),
+ val zeroCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 0),
new WorkerOffer("executor1", "host1", 0))
val taskSet = FakeTask.createTaskSet(1)
taskScheduler.submitTasks(taskSet)
@@ -121,7 +121,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// No tasks should run as we only have 1 core free.
val numFreeCores = 1
- val singleCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores),
+ val singleCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores),
new WorkerOffer("executor1", "host1", numFreeCores))
taskScheduler.submitTasks(taskSet)
taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten
@@ -129,7 +129,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// Now change the offers to have 2 cores in one executor and verify if it
// is chosen.
- val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus),
+ val multiCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", taskCpus),
new WorkerOffer("executor1", "host1", numFreeCores))
taskScheduler.submitTasks(taskSet)
taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
@@ -144,7 +144,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
val numFreeCores = 1
val taskSet = new TaskSet(
Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
- val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus),
+ val multiCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", taskCpus),
new WorkerOffer("executor1", "host1", numFreeCores))
taskScheduler.submitTasks(taskSet)
var taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
@@ -184,7 +184,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
val taskScheduler = setupScheduler()
val numFreeCores = 1
- val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores))
+ val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores))
val attempt1 = FakeTask.createTaskSet(10)
// submit attempt 1, offer some resources, some tasks get scheduled
@@ -216,7 +216,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
val taskScheduler = setupScheduler()
val numFreeCores = 10
- val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores))
+ val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores))
val attempt1 = FakeTask.createTaskSet(10)
// submit attempt 1, offer some resources, some tasks get scheduled
@@ -254,8 +254,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
test("tasks are not re-scheduled while executor loss reason is pending") {
val taskScheduler = setupScheduler()
- val e0Offers = Seq(new WorkerOffer("executor0", "host0", 1))
- val e1Offers = Seq(new WorkerOffer("executor1", "host0", 1))
+ val e0Offers = IndexedSeq(new WorkerOffer("executor0", "host0", 1))
+ val e1Offers = IndexedSeq(new WorkerOffer("executor1", "host0", 1))
val attempt1 = FakeTask.createTaskSet(1)
// submit attempt 1, offer resources, task gets scheduled
@@ -296,7 +296,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
taskScheduler.submitTasks(taskSet)
val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId, taskSet.stageAttemptId).get
- val firstTaskAttempts = taskScheduler.resourceOffers(Seq(
+ val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
new WorkerOffer("executor0", "host0", 1),
new WorkerOffer("executor1", "host1", 1)
)).flatten
@@ -313,7 +313,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// on that executor, and make sure that the other task (not the failed one) is assigned there
taskScheduler.executorLost("executor1", SlaveLost("oops"))
val nextTaskAttempts =
- taskScheduler.resourceOffers(Seq(new WorkerOffer("executor0", "host0", 1))).flatten
+ taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1))).flatten
// Note: Its OK if some future change makes this already realize the taskset has become
// unschedulable at this point (though in the current implementation, we're sure it will not)
assert(nextTaskAttempts.size === 1)
@@ -323,7 +323,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// now we should definitely realize that our task set is unschedulable, because the only
// task left can't be scheduled on any executors due to the blacklist
- taskScheduler.resourceOffers(Seq(new WorkerOffer("executor0", "host0", 1)))
+ taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1)))
sc.listenerBus.waitUntilEmpty(100000)
assert(tsm.isZombie)
assert(failedTaskSet)
@@ -348,7 +348,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
taskScheduler.submitTasks(taskSet)
val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId, taskSet.stageAttemptId).get
- val offers = Seq(
+ val offers = IndexedSeq(
// each offer has more than enough free cores for the entire task set, so when combined
// with the locality preferences, we schedule all tasks on one executor
new WorkerOffer("executor0", "host0", 4),
@@ -380,7 +380,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
(0 until 2).map { _ => Seq(TaskLocation("host0", "executor2"))}: _*
))
- val taskDescs = taskScheduler.resourceOffers(Seq(
+ val taskDescs = taskScheduler.resourceOffers(IndexedSeq(
new WorkerOffer("executor0", "host0", 1),
new WorkerOffer("executor1", "host1", 1)
)).flatten
@@ -396,7 +396,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// when executor2 is added, we should realize that we can run process-local tasks.
// And we should know its alive on the host.
val secondTaskDescs = taskScheduler.resourceOffers(
- Seq(new WorkerOffer("executor2", "host0", 1))).flatten
+ IndexedSeq(new WorkerOffer("executor2", "host0", 1))).flatten
assert(secondTaskDescs.size === 1)
assert(mgr.myLocalityLevels.toSet ===
Set(TaskLocality.PROCESS_LOCAL, TaskLocality.NODE_LOCAL, TaskLocality.ANY))
@@ -406,7 +406,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// And even if we don't have anything left to schedule, another resource offer on yet another
// executor should also update the set of live executors
val thirdTaskDescs = taskScheduler.resourceOffers(
- Seq(new WorkerOffer("executor3", "host1", 1))).flatten
+ IndexedSeq(new WorkerOffer("executor3", "host1", 1))).flatten
assert(thirdTaskDescs.size === 0)
assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1", "executor3")))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/7f779e74/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index eb3b235..09a252f 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -286,7 +286,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
o.getSlaveId.getValue,
o.getHostname,
cpus)
- }
+ }.toIndexedSeq
val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap
val slaveIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap
http://git-wip-us.apache.org/repos/asf/spark/blob/7f779e74/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
index 7a706ab..1d7a86f 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
@@ -283,7 +283,7 @@ class MesosFineGrainedSchedulerBackendSuite
mesosOffers2.add(createOffer(1, minMem, minCpu))
reset(taskScheduler)
reset(driver)
- when(taskScheduler.resourceOffers(any(classOf[Seq[WorkerOffer]]))).thenReturn(Seq(Seq()))
+ when(taskScheduler.resourceOffers(any(classOf[IndexedSeq[WorkerOffer]]))).thenReturn(Seq(Seq()))
when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
when(driver.declineOffer(mesosOffers2.get(0).getId)).thenReturn(Status.valueOf(1))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org