You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2014/06/09 22:14:00 UTC
git commit: Added a TaskSetManager unit test.
Repository: spark
Updated Branches:
refs/heads/master 0cf600280 -> 6cf335d79
Added a TaskSetManager unit test.
This test ensures that when there are no
alive executors that satisfy a particular locality level,
the TaskSetManager doesn't ever use that as the maximum
allowed locality level (this optimization ensures that a
job doesn't wait extra time in an attempt to satisfy
a scheduling locality level that is impossible).
@mateiz and @lirui-intel this unit test illustrates an issue
with #892 (it fails with that patch).
Author: Kay Ousterhout <ka...@gmail.com>
Closes #1024 from kayousterhout/scheduler_unit_test and squashes the following commits:
de6a08f [Kay Ousterhout] Added a TaskSetManager unit test.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6cf335d7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6cf335d7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6cf335d7
Branch: refs/heads/master
Commit: 6cf335d79a2f69ecd9a139dd0a03acff60585be4
Parents: 0cf6002
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Mon Jun 9 13:13:53 2014 -0700
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Mon Jun 9 13:13:53 2014 -0700
----------------------------------------------------------------------
.../spark/scheduler/TaskSetManagerSuite.scala | 16 ++++++++++++++++
1 file changed, 16 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/6cf335d7/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index c92b6dc..6f1fd25 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -141,6 +141,22 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(sched.finishedManagers.contains(manager))
}
+ test("skip unsatisfiable locality levels") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2"))
+ val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "execB")))
+ val clock = new FakeClock
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+
+ // An executor that is not NODE_LOCAL should be rejected.
+ assert(manager.resourceOffer("execC", "host2", ANY) === None)
+
+ // Because there are no alive PROCESS_LOCAL executors, the base locality level should be
+ // NODE_LOCAL. So, we should schedule the task on this offered NODE_LOCAL executor before
+ // any of the locality wait timers expire.
+ assert(manager.resourceOffer("execA", "host1", ANY).get.index === 0)
+ }
+
test("basic delay scheduling") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))