You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2015/01/09 18:47:11 UTC

spark git commit: [SPARK-1143] Separate pool tests into their own suite.

Repository: spark
Updated Branches:
  refs/heads/master 1790b3869 -> b6aa55730


[SPARK-1143] Separate pool tests into their own suite.

The current TaskSchedulerImplSuite includes some tests that are
actually for the TaskSchedulerImpl, but the remainder of the tests avoid using
the TaskSchedulerImpl entirely, and actually test the pool and scheduling
algorithm mechanisms. This commit separates the pool/scheduling algorithm
tests into their own suite, and also simplifies those tests.

The pull request replaces #339.

Author: Kay Ousterhout <ka...@gmail.com>

Closes #3967 from kayousterhout/SPARK-1143 and squashes the following commits:

8a898c4 [Kay Ousterhout] [SPARK-1143] Separate pool tests into their own suite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b6aa5573
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b6aa5573
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b6aa5573

Branch: refs/heads/master
Commit: b6aa557300275b835cce7baa7bc8a80eb5425cbb
Parents: 1790b38
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Fri Jan 9 09:47:06 2015 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Fri Jan 9 09:47:06 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/PoolSuite.scala  | 183 +++++++++++++++
 .../scheduler/TaskSchedulerImplSuite.scala      | 230 -------------------
 2 files changed, 183 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b6aa5573/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
new file mode 100644
index 0000000..e8f461e
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.Properties
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext}
+
+/**
+ * Tests that pools and the associated scheduling algorithms for FIFO and fair scheduling work
+ * correctly.
+ */
+class PoolSuite extends FunSuite with LocalSparkContext {
+
+  def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl)
+    : TaskSetManager = {
+    val tasks = Array.tabulate[Task[_]](numTasks) { i =>
+      new FakeTask(i, Nil)
+    }
+    new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null), 0)
+  }
+
+  def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: Int) {
+    val taskSetQueue = rootPool.getSortedTaskSetQueue
+    val nextTaskSetToSchedule =
+      taskSetQueue.find(t => (t.runningTasks + t.tasksSuccessful) < t.numTasks)
+    assert(nextTaskSetToSchedule.isDefined)
+    nextTaskSetToSchedule.get.addRunningTask(taskId)
+    assert(nextTaskSetToSchedule.get.stageId === expectedStageId)
+  }
+
+  test("FIFO Scheduler Test") {
+    sc = new SparkContext("local", "TaskSchedulerImplSuite")
+    val taskScheduler = new TaskSchedulerImpl(sc)
+
+    val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
+    val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
+    schedulableBuilder.buildPools()
+
+    val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler)
+    val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler)
+    val taskSetManager2 = createTaskSetManager(2, 2, taskScheduler)
+    schedulableBuilder.addTaskSetManager(taskSetManager0, null)
+    schedulableBuilder.addTaskSetManager(taskSetManager1, null)
+    schedulableBuilder.addTaskSetManager(taskSetManager2, null)
+
+    scheduleTaskAndVerifyId(0, rootPool, 0)
+    scheduleTaskAndVerifyId(1, rootPool, 0)
+    scheduleTaskAndVerifyId(2, rootPool, 1)
+    scheduleTaskAndVerifyId(3, rootPool, 1)
+    scheduleTaskAndVerifyId(4, rootPool, 2)
+    scheduleTaskAndVerifyId(5, rootPool, 2)
+  }
+
+  /**
+   * This test creates three scheduling pools, and creates task set managers in the first
+   * two scheduling pools. The test verifies that as tasks are scheduled, the fair scheduling
+   * algorithm properly orders the two scheduling pools.
+   */
+  test("Fair Scheduler Test") {
+    val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
+    val conf = new SparkConf().set("spark.scheduler.allocation.file", xmlPath)
+    sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
+    val taskScheduler = new TaskSchedulerImpl(sc)
+
+    val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
+    val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
+    schedulableBuilder.buildPools()
+
+    // Ensure that the XML file was read in correctly.
+    assert(rootPool.getSchedulableByName("default") != null)
+    assert(rootPool.getSchedulableByName("1") != null)
+    assert(rootPool.getSchedulableByName("2") != null)
+    assert(rootPool.getSchedulableByName("3") != null)
+    assert(rootPool.getSchedulableByName("1").minShare === 2)
+    assert(rootPool.getSchedulableByName("1").weight === 1)
+    assert(rootPool.getSchedulableByName("2").minShare === 3)
+    assert(rootPool.getSchedulableByName("2").weight === 1)
+    assert(rootPool.getSchedulableByName("3").minShare === 0)
+    assert(rootPool.getSchedulableByName("3").weight === 1)
+
+    val properties1 = new Properties()
+    properties1.setProperty("spark.scheduler.pool","1")
+    val properties2 = new Properties()
+    properties2.setProperty("spark.scheduler.pool","2")
+
+    val taskSetManager10 = createTaskSetManager(0, 1, taskScheduler)
+    val taskSetManager11 = createTaskSetManager(1, 1, taskScheduler)
+    val taskSetManager12 = createTaskSetManager(2, 2, taskScheduler)
+    schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
+    schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
+    schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
+
+    val taskSetManager23 = createTaskSetManager(3, 2, taskScheduler)
+    val taskSetManager24 = createTaskSetManager(4, 2, taskScheduler)
+    schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
+    schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
+
+    // Pool 1 share ratio: 0. Pool 2 share ratio: 0. 1 gets scheduled based on ordering of names.
+    scheduleTaskAndVerifyId(0, rootPool, 0)
+    // Pool 1 share ratio: 1/2. Pool 2 share ratio: 0. 2 gets scheduled because ratio is lower.
+    scheduleTaskAndVerifyId(1, rootPool, 3)
+    // Pool 1 share ratio: 1/2. Pool 2 share ratio: 1/3. 2 gets scheduled because ratio is lower.
+    scheduleTaskAndVerifyId(2, rootPool, 3)
+    // Pool 1 share ratio: 1/2. Pool 2 share ratio: 2/3. 1 gets scheduled because ratio is lower.
+    scheduleTaskAndVerifyId(3, rootPool, 1)
+    // Pool 1 share ratio: 1. Pool 2 share ratio: 2/3. 2 gets scheduled because ratio is lower.
+    scheduleTaskAndVerifyId(4, rootPool, 4)
+    // Neither pool is needy so ordering is based on number of running tasks.
+    // Pool 1 running tasks: 2, Pool 2 running tasks: 3. 1 gets scheduled because fewer running
+    // tasks.
+    scheduleTaskAndVerifyId(5, rootPool, 2)
+    // Pool 1 running tasks: 3, Pool 2 running tasks: 3. 1 gets scheduled because of naming
+    // ordering.
+    scheduleTaskAndVerifyId(6, rootPool, 2)
+    // Pool 1 running tasks: 4, Pool 2 running tasks: 3. 2 gets scheduled because fewer running
+    // tasks.
+    scheduleTaskAndVerifyId(7, rootPool, 4)
+  }
+
+  test("Nested Pool Test") {
+    sc = new SparkContext("local", "TaskSchedulerImplSuite")
+    val taskScheduler = new TaskSchedulerImpl(sc)
+
+    val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
+    val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
+    val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
+    rootPool.addSchedulable(pool0)
+    rootPool.addSchedulable(pool1)
+
+    val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2)
+    val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1)
+    pool0.addSchedulable(pool00)
+    pool0.addSchedulable(pool01)
+
+    val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2)
+    val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
+    pool1.addSchedulable(pool10)
+    pool1.addSchedulable(pool11)
+
+    val taskSetManager000 = createTaskSetManager(0, 5, taskScheduler)
+    val taskSetManager001 = createTaskSetManager(1, 5, taskScheduler)
+    pool00.addSchedulable(taskSetManager000)
+    pool00.addSchedulable(taskSetManager001)
+
+    val taskSetManager010 = createTaskSetManager(2, 5, taskScheduler)
+    val taskSetManager011 = createTaskSetManager(3, 5, taskScheduler)
+    pool01.addSchedulable(taskSetManager010)
+    pool01.addSchedulable(taskSetManager011)
+
+    val taskSetManager100 = createTaskSetManager(4, 5, taskScheduler)
+    val taskSetManager101 = createTaskSetManager(5, 5, taskScheduler)
+    pool10.addSchedulable(taskSetManager100)
+    pool10.addSchedulable(taskSetManager101)
+
+    val taskSetManager110 = createTaskSetManager(6, 5, taskScheduler)
+    val taskSetManager111 = createTaskSetManager(7, 5, taskScheduler)
+    pool11.addSchedulable(taskSetManager110)
+    pool11.addSchedulable(taskSetManager111)
+
+    scheduleTaskAndVerifyId(0, rootPool, 0)
+    scheduleTaskAndVerifyId(1, rootPool, 4)
+    scheduleTaskAndVerifyId(2, rootPool, 6)
+    scheduleTaskAndVerifyId(3, rootPool, 2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b6aa5573/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 00812e6..8874cf0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -30,238 +30,8 @@ class FakeSchedulerBackend extends SchedulerBackend {
   def defaultParallelism() = 1
 }
 
-class FakeTaskSetManager(
-    initPriority: Int,
-    initStageId: Int,
-    initNumTasks: Int,
-    taskScheduler: TaskSchedulerImpl,
-    taskSet: TaskSet)
-  extends TaskSetManager(taskScheduler, taskSet, 0) {
-
-  parent = null
-  weight = 1
-  minShare = 2
-  priority = initPriority
-  stageId = initStageId
-  name = "TaskSet_"+stageId
-  override val numTasks = initNumTasks
-  tasksSuccessful = 0
-
-  var numRunningTasks = 0
-  override def runningTasks = numRunningTasks
-
-  def increaseRunningTasks(taskNum: Int) {
-    numRunningTasks += taskNum
-    if (parent != null) {
-      parent.increaseRunningTasks(taskNum)
-    }
-  }
-
-  def decreaseRunningTasks(taskNum: Int) {
-    numRunningTasks -= taskNum
-    if (parent != null) {
-      parent.decreaseRunningTasks(taskNum)
-    }
-  }
-
-  override def addSchedulable(schedulable: Schedulable) {
-  }
-
-  override def removeSchedulable(schedulable: Schedulable) {
-  }
-
-  override def getSchedulableByName(name: String): Schedulable = {
-    null
-  }
-
-  override def executorLost(executorId: String, host: String): Unit = {
-  }
-
-  override def resourceOffer(
-      execId: String,
-      host: String,
-      maxLocality: TaskLocality.TaskLocality)
-    : Option[TaskDescription] =
-  {
-    if (tasksSuccessful + numRunningTasks < numTasks) {
-      increaseRunningTasks(1)
-      Some(new TaskDescription(0, execId, "task 0:0", 0, null))
-    } else {
-      None
-    }
-  }
-
-  override def checkSpeculatableTasks(): Boolean = {
-    true
-  }
-
-  def taskFinished() {
-    decreaseRunningTasks(1)
-    tasksSuccessful +=1
-    if (tasksSuccessful == numTasks) {
-      parent.removeSchedulable(this)
-    }
-  }
-
-  def abort() {
-    decreaseRunningTasks(numRunningTasks)
-    parent.removeSchedulable(this)
-  }
-}
-
 class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Logging {
 
-  def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: TaskSchedulerImpl,
-      taskSet: TaskSet): FakeTaskSetManager = {
-    new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet)
-  }
-
-  def resourceOffer(rootPool: Pool): Int = {
-    val taskSetQueue = rootPool.getSortedTaskSetQueue
-    /* Just for Test*/
-    for (manager <- taskSetQueue) {
-       logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(
-         manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
-    }
-    for (taskSet <- taskSetQueue) {
-      taskSet.resourceOffer("execId_1", "hostname_1", TaskLocality.ANY) match {
-        case Some(task) =>
-          return taskSet.stageId
-        case None => {}
-      }
-    }
-    -1
-  }
-
-  def checkTaskSetId(rootPool: Pool, expectedTaskSetId: Int) {
-    assert(resourceOffer(rootPool) === expectedTaskSetId)
-  }
-
-  test("FIFO Scheduler Test") {
-    sc = new SparkContext("local", "TaskSchedulerImplSuite")
-    val taskScheduler = new TaskSchedulerImpl(sc)
-    val taskSet = FakeTask.createTaskSet(1)
-
-    val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
-    val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
-    schedulableBuilder.buildPools()
-
-    val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, taskScheduler, taskSet)
-    val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, taskScheduler, taskSet)
-    val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, taskScheduler, taskSet)
-    schedulableBuilder.addTaskSetManager(taskSetManager0, null)
-    schedulableBuilder.addTaskSetManager(taskSetManager1, null)
-    schedulableBuilder.addTaskSetManager(taskSetManager2, null)
-
-    checkTaskSetId(rootPool, 0)
-    resourceOffer(rootPool)
-    checkTaskSetId(rootPool, 1)
-    resourceOffer(rootPool)
-    taskSetManager1.abort()
-    checkTaskSetId(rootPool, 2)
-  }
-
-  test("Fair Scheduler Test") {
-    val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
-    val conf = new SparkConf().set("spark.scheduler.allocation.file", xmlPath)
-    sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
-    val taskScheduler = new TaskSchedulerImpl(sc)
-    val taskSet = FakeTask.createTaskSet(1)
-
-    val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
-    val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
-    schedulableBuilder.buildPools()
-
-    assert(rootPool.getSchedulableByName("default") != null)
-    assert(rootPool.getSchedulableByName("1") != null)
-    assert(rootPool.getSchedulableByName("2") != null)
-    assert(rootPool.getSchedulableByName("3") != null)
-    assert(rootPool.getSchedulableByName("1").minShare === 2)
-    assert(rootPool.getSchedulableByName("1").weight === 1)
-    assert(rootPool.getSchedulableByName("2").minShare === 3)
-    assert(rootPool.getSchedulableByName("2").weight === 1)
-    assert(rootPool.getSchedulableByName("3").minShare === 0)
-    assert(rootPool.getSchedulableByName("3").weight === 1)
-
-    val properties1 = new Properties()
-    properties1.setProperty("spark.scheduler.pool","1")
-    val properties2 = new Properties()
-    properties2.setProperty("spark.scheduler.pool","2")
-
-    val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, taskScheduler, taskSet)
-    val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, taskScheduler, taskSet)
-    val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, taskScheduler, taskSet)
-    schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
-    schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
-    schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
-
-    val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, taskScheduler, taskSet)
-    val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, taskScheduler, taskSet)
-    schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
-    schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
-
-    checkTaskSetId(rootPool, 0)
-    checkTaskSetId(rootPool, 3)
-    checkTaskSetId(rootPool, 3)
-    checkTaskSetId(rootPool, 1)
-    checkTaskSetId(rootPool, 4)
-    checkTaskSetId(rootPool, 2)
-    checkTaskSetId(rootPool, 2)
-    checkTaskSetId(rootPool, 4)
-
-    taskSetManager12.taskFinished()
-    assert(rootPool.getSchedulableByName("1").runningTasks === 3)
-    taskSetManager24.abort()
-    assert(rootPool.getSchedulableByName("2").runningTasks === 2)
-  }
-
-  test("Nested Pool Test") {
-    sc = new SparkContext("local", "TaskSchedulerImplSuite")
-    val taskScheduler = new TaskSchedulerImpl(sc)
-    val taskSet = FakeTask.createTaskSet(1)
-
-    val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
-    val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
-    val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
-    rootPool.addSchedulable(pool0)
-    rootPool.addSchedulable(pool1)
-
-    val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2)
-    val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1)
-    pool0.addSchedulable(pool00)
-    pool0.addSchedulable(pool01)
-
-    val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2)
-    val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
-    pool1.addSchedulable(pool10)
-    pool1.addSchedulable(pool11)
-
-    val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, taskScheduler, taskSet)
-    val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, taskScheduler, taskSet)
-    pool00.addSchedulable(taskSetManager000)
-    pool00.addSchedulable(taskSetManager001)
-
-    val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, taskScheduler, taskSet)
-    val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, taskScheduler, taskSet)
-    pool01.addSchedulable(taskSetManager010)
-    pool01.addSchedulable(taskSetManager011)
-
-    val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, taskScheduler, taskSet)
-    val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, taskScheduler, taskSet)
-    pool10.addSchedulable(taskSetManager100)
-    pool10.addSchedulable(taskSetManager101)
-
-    val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, taskScheduler, taskSet)
-    val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, taskScheduler, taskSet)
-    pool11.addSchedulable(taskSetManager110)
-    pool11.addSchedulable(taskSetManager111)
-
-    checkTaskSetId(rootPool, 0)
-    checkTaskSetId(rootPool, 4)
-    checkTaskSetId(rootPool, 6)
-    checkTaskSetId(rootPool, 2)
-  }
-
   test("Scheduler does not always schedule tasks on the same workers") {
     sc = new SparkContext("local", "TaskSchedulerImplSuite")
     val taskScheduler = new TaskSchedulerImpl(sc)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org