You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2016/11/28 19:51:37 UTC

spark git commit: [SPARK-18117][CORE] Add test for TaskSetBlacklist

Repository: spark
Updated Branches:
  refs/heads/master ad67993b7 -> 8b1609beb


[SPARK-18117][CORE] Add test for TaskSetBlacklist

## What changes were proposed in this pull request?

This adds tests to verify the interaction between TaskSetBlacklist and
TaskSchedulerImpl.  TaskSetBlacklist was introduced by SPARK-17675 but
it neglected to add these tests.

This change does not fix any bugs -- it is just for increasing test
coverage.
## How was this patch tested?

Jenkins

Author: Imran Rashid <ir...@cloudera.com>

Closes #15644 from squito/taskset_blacklist_test_update.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b1609be
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b1609be
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b1609be

Branch: refs/heads/master
Commit: 8b1609bebe489b2ef78db4be6e9836687089fe3d
Parents: ad67993
Author: Imran Rashid <ir...@cloudera.com>
Authored: Mon Nov 28 13:47:09 2016 -0600
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Mon Nov 28 13:47:09 2016 -0600

----------------------------------------------------------------------
 .../apache/spark/scheduler/TaskSetManager.scala |   2 +-
 .../scheduler/TaskSchedulerImplSuite.scala      | 254 ++++++++++++++++++-
 .../spark/scheduler/TaskSetManagerSuite.scala   |  45 +++-
 3 files changed, 292 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8b1609be/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index b766e41..f2a432c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -84,7 +84,7 @@ private[spark] class TaskSetManager(
   var totalResultSize = 0L
   var calculatedTasks = 0
 
-  private val taskSetBlacklistHelperOpt: Option[TaskSetBlacklist] = {
+  private[scheduler] val taskSetBlacklistHelperOpt: Option[TaskSetBlacklist] = {
     if (BlacklistTracker.isBlacklistEnabled(conf)) {
       Some(new TaskSetBlacklist(conf, stageId, clock))
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/8b1609be/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index f5f1947..5dc7708 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -17,7 +17,12 @@
 
 package org.apache.spark.scheduler
 
+import scala.collection.mutable.HashMap
+
+import org.mockito.Matchers.{anyInt, anyString, eq => meq}
+import org.mockito.Mockito.{atLeast, atMost, never, spy, verify, when}
 import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mock.MockitoSugar
 
 import org.apache.spark._
 import org.apache.spark.internal.config
@@ -31,7 +36,7 @@ class FakeSchedulerBackend extends SchedulerBackend {
 }
 
 class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach
-    with Logging {
+    with Logging with MockitoSugar {
 
   var failedTaskSetException: Option[Throwable] = None
   var failedTaskSetReason: String = null
@@ -40,11 +45,16 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
   var taskScheduler: TaskSchedulerImpl = null
   var dagScheduler: DAGScheduler = null
 
+  val stageToMockTaskSetBlacklist = new HashMap[Int, TaskSetBlacklist]()
+  val stageToMockTaskSetManager = new HashMap[Int, TaskSetManager]()
+
   override def beforeEach(): Unit = {
     super.beforeEach()
     failedTaskSet = false
     failedTaskSetException = None
     failedTaskSetReason = null
+    stageToMockTaskSetBlacklist.clear()
+    stageToMockTaskSetManager.clear()
   }
 
   override def afterEach(): Unit = {
@@ -66,6 +76,30 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     }
     sc = new SparkContext(conf)
     taskScheduler = new TaskSchedulerImpl(sc)
+    setupHelper()
+  }
+
+  def setupSchedulerWithMockTaskSetBlacklist(): TaskSchedulerImpl = {
+    val conf = new SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite")
+    conf.set(config.BLACKLIST_ENABLED, true)
+    sc = new SparkContext(conf)
+    taskScheduler =
+      new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4)) {
+        override def createTaskSetManager(taskSet: TaskSet, maxFailures: Int): TaskSetManager = {
+          val tsm = super.createTaskSetManager(taskSet, maxFailures)
+          // we need to create a spied tsm just so we can set the TaskSetBlacklist
+          val tsmSpy = spy(tsm)
+          val taskSetBlacklist = mock[TaskSetBlacklist]
+          when(tsmSpy.taskSetBlacklistHelperOpt).thenReturn(Some(taskSetBlacklist))
+          stageToMockTaskSetManager(taskSet.stageId) = tsmSpy
+          stageToMockTaskSetBlacklist(taskSet.stageId) = taskSetBlacklist
+          tsmSpy
+        }
+      }
+    setupHelper()
+  }
+
+  def setupHelper(): TaskSchedulerImpl = {
     taskScheduler.initialize(new FakeSchedulerBackend)
     // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
     dagScheduler = new DAGScheduler(sc, taskScheduler) {
@@ -282,6 +316,211 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     assert(!failedTaskSet)
   }
 
+  test("scheduled tasks obey task and stage blacklists") {
+    taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+    (0 to 2).foreach {stageId =>
+      val taskSet = FakeTask.createTaskSet(numTasks = 2, stageId = stageId, stageAttemptId = 0)
+      taskScheduler.submitTasks(taskSet)
+    }
+
+    // Setup our mock blacklist:
+    // * stage 0 is blacklisted on node "host1"
+    // * stage 1 is blacklisted on executor "executor3"
+    // * stage 0, partition 0 is blacklisted on executor 0
+    // (mocked methods default to returning false, ie. no blacklisting)
+    when(stageToMockTaskSetBlacklist(0).isNodeBlacklistedForTaskSet("host1")).thenReturn(true)
+    when(stageToMockTaskSetBlacklist(1).isExecutorBlacklistedForTaskSet("executor3"))
+      .thenReturn(true)
+    when(stageToMockTaskSetBlacklist(0).isExecutorBlacklistedForTask("executor0", 0))
+      .thenReturn(true)
+
+    val offers = IndexedSeq(
+      new WorkerOffer("executor0", "host0", 1),
+      new WorkerOffer("executor1", "host1", 1),
+      new WorkerOffer("executor2", "host1", 1),
+      new WorkerOffer("executor3", "host2", 10)
+    )
+    val firstTaskAttempts = taskScheduler.resourceOffers(offers).flatten
+    // We should schedule all tasks.
+    assert(firstTaskAttempts.size === 6)
+    // Whenever we schedule a task, we must consult the node and executor blacklist.  (The test
+    // doesn't check exactly what checks are made because the offers get shuffled.)
+    (0 to 2).foreach { stageId =>
+      verify(stageToMockTaskSetBlacklist(stageId), atLeast(1))
+        .isNodeBlacklistedForTaskSet(anyString())
+      verify(stageToMockTaskSetBlacklist(stageId), atLeast(1))
+        .isExecutorBlacklistedForTaskSet(anyString())
+    }
+
+    def tasksForStage(stageId: Int): Seq[TaskDescription] = {
+      firstTaskAttempts.filter{_.name.contains(s"stage $stageId")}
+    }
+    tasksForStage(0).foreach { task =>
+      // executors 1 & 2 blacklisted for node
+      // executor 0 blacklisted just for partition 0
+      if (task.index == 0) {
+        assert(task.executorId === "executor3")
+      } else {
+        assert(Set("executor0", "executor3").contains(task.executorId))
+      }
+    }
+    tasksForStage(1).foreach { task =>
+      // executor 3 blacklisted
+      assert("executor3" != task.executorId)
+    }
+    // no restrictions on stage 2
+
+    // Finally, just make sure that we can still complete tasks as usual with blacklisting
+    // in effect.  Finish each of the tasksets -- taskset 0 & 1 complete successfully, taskset 2
+    // fails.
+    (0 to 2).foreach { stageId =>
+      val tasks = tasksForStage(stageId)
+      val tsm = taskScheduler.taskSetManagerForAttempt(stageId, 0).get
+      val valueSer = SparkEnv.get.serializer.newInstance()
+      if (stageId == 2) {
+        // Just need to make one task fail 4 times.
+        var task = tasks(0)
+        val taskIndex = task.index
+        (0 until 4).foreach { attempt =>
+          assert(task.attemptNumber === attempt)
+          tsm.handleFailedTask(task.taskId, TaskState.FAILED, TaskResultLost)
+          val nextAttempts =
+            taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("executor4", "host4", 1))).flatten
+          if (attempt < 3) {
+            assert(nextAttempts.size === 1)
+            task = nextAttempts(0)
+            assert(task.index === taskIndex)
+          } else {
+            assert(nextAttempts.size === 0)
+          }
+        }
+        // End the other task of the taskset, doesn't matter whether it succeeds or fails.
+        val otherTask = tasks(1)
+        val result = new DirectTaskResult[Int](valueSer.serialize(otherTask.taskId), Seq())
+        tsm.handleSuccessfulTask(otherTask.taskId, result)
+      } else {
+        tasks.foreach { task =>
+          val result = new DirectTaskResult[Int](valueSer.serialize(task.taskId), Seq())
+          tsm.handleSuccessfulTask(task.taskId, result)
+        }
+      }
+      assert(tsm.isZombie)
+    }
+  }
+
+  /**
+   * Helper for performance tests.  Takes the explicitly blacklisted nodes and executors; verifies
+   * that the blacklists are used efficiently to ensure scheduling is not O(numPendingTasks).
+   * Creates 1 offer on executor[1-3].  Executor1 & 2 are on host1, executor3 is on host2.  Passed
+   * in nodes and executors should be on that list.
+   */
+  private def testBlacklistPerformance(
+      testName: String,
+      nodeBlacklist: Seq[String],
+      execBlacklist: Seq[String]): Unit = {
+    // Because scheduling involves shuffling the order of offers around, we run this test a few
+    // times to cover more possibilities.  There are only 3 offers, which means 6 permutations,
+    // so 10 iterations is pretty good.
+    (0 until 10).foreach { testItr =>
+      test(s"$testName: iteration $testItr") {
+        // When an executor or node is blacklisted, we want to make sure that we don't try
+        // scheduling each pending task, one by one, to discover they are all blacklisted.  This is
+        // important for performance -- if we did check each task one-by-one, then responding to a
+        // resource offer (which is usually O(1)-ish) would become O(numPendingTasks), which would
+        // slow down scheduler throughput and slow down scheduling even on healthy executors.
+        // Here, we check a proxy for the runtime -- we make sure the scheduling is short-circuited
+        // at the node or executor blacklist, so we never check the per-task blacklist.  We also
+        // make sure we don't check the node & executor blacklist for the entire taskset
+        // O(numPendingTasks) times.
+
+        taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+        // we schedule 500 tasks so we can clearly distinguish anything that is O(numPendingTasks)
+        val taskSet = FakeTask.createTaskSet(numTasks = 500, stageId = 0, stageAttemptId = 0)
+        taskScheduler.submitTasks(taskSet)
+
+        val offers = IndexedSeq(
+          new WorkerOffer("executor1", "host1", 1),
+          new WorkerOffer("executor2", "host1", 1),
+          new WorkerOffer("executor3", "host2", 1)
+        )
+        // We should check the node & exec blacklists, but only O(numOffers), not O(numPendingTasks)
+        // times.  In the worst case, after shuffling, we offer our blacklisted resource first, and
+        // then offer other resources which do get used.  The taskset blacklist is consulted
+        // repeatedly as we offer resources to the taskset -- each iteration either schedules
+        // something, or it terminates that locality level, so the maximum number of checks is
+        // numCores + numLocalityLevels
+        val numCoresOnAllOffers = offers.map(_.cores).sum
+        val numLocalityLevels = TaskLocality.values.size
+        val maxBlacklistChecks = numCoresOnAllOffers + numLocalityLevels
+
+        // Setup the blacklist
+        nodeBlacklist.foreach { node =>
+          when(stageToMockTaskSetBlacklist(0).isNodeBlacklistedForTaskSet(node)).thenReturn(true)
+        }
+        execBlacklist.foreach { exec =>
+          when(stageToMockTaskSetBlacklist(0).isExecutorBlacklistedForTaskSet(exec))
+            .thenReturn(true)
+        }
+
+        // Figure out which nodes have any effective blacklisting on them.  This means all nodes
+        // that are explicitly blacklisted, plus those that have *any* executors blacklisted.
+        val nodesForBlacklistedExecutors = offers.filter { offer =>
+          execBlacklist.contains(offer.executorId)
+        }.map(_.host).toSet.toSeq
+        val nodesWithAnyBlacklisting = (nodeBlacklist ++ nodesForBlacklistedExecutors).toSet
+        // Similarly, figure out which executors have any blacklisting.  This means all executors
+        // that are explicitly blacklisted, plus all executors on nodes that are blacklisted.
+        val execsForBlacklistedNodes = offers.filter { offer =>
+          nodeBlacklist.contains(offer.host)
+        }.map(_.executorId).toSeq
+        val executorsWithAnyBlacklisting = (execBlacklist ++ execsForBlacklistedNodes).toSet
+
+        // Schedule a taskset, and make sure our test setup is correct -- we are able to schedule
+        // a task on all executors that aren't blacklisted (whether that executor is a explicitly
+        // blacklisted, or implicitly blacklisted via the node blacklist).
+        val firstTaskAttempts = taskScheduler.resourceOffers(offers).flatten
+        assert(firstTaskAttempts.size === offers.size - executorsWithAnyBlacklisting.size)
+
+        // Now check that we haven't made too many calls to any of the blacklist methods.
+        // We should be checking our node blacklist, but it should be within the bound we defined
+        // above.
+        verify(stageToMockTaskSetBlacklist(0), atMost(maxBlacklistChecks))
+          .isNodeBlacklistedForTaskSet(anyString())
+        // We shouldn't ever consult the per-task blacklist for the nodes that have been blacklisted
+        // for the entire taskset, since the taskset level blacklisting should prevent scheduling
+        // from ever looking at specific tasks.
+        nodesWithAnyBlacklisting.foreach { node =>
+          verify(stageToMockTaskSetBlacklist(0), never)
+            .isNodeBlacklistedForTask(meq(node), anyInt())
+        }
+        executorsWithAnyBlacklisting.foreach { exec =>
+          // We should be checking our executor blacklist, but it should be within the bound defined
+          // above.  Its possible that this will be significantly fewer calls, maybe even 0, if
+          // there is also a node-blacklist which takes effect first.  But this assert is all we
+          // need to avoid an O(numPendingTask) slowdown.
+          verify(stageToMockTaskSetBlacklist(0), atMost(maxBlacklistChecks))
+            .isExecutorBlacklistedForTaskSet(exec)
+          // We shouldn't ever consult the per-task blacklist for executors that have been
+          // blacklisted for the entire taskset, since the taskset level blacklisting should prevent
+          // scheduling from ever looking at specific tasks.
+          verify(stageToMockTaskSetBlacklist(0), never)
+            .isExecutorBlacklistedForTask(meq(exec), anyInt())
+        }
+      }
+    }
+  }
+
+  testBlacklistPerformance(
+    testName = "Blacklisted node for entire task set prevents per-task blacklist checks",
+    nodeBlacklist = Seq("host1"),
+    execBlacklist = Seq())
+
+  testBlacklistPerformance(
+    testName = "Blacklisted executor for entire task set prevents per-task blacklist checks",
+    nodeBlacklist = Seq(),
+    execBlacklist = Seq("executor3")
+  )
+
   test("abort stage if executor loss results in unschedulability from previously failed tasks") {
     // Make sure we can detect when a taskset becomes unschedulable from a blacklisting.  This
     // test explores a particular corner case -- you may have one task fail, but still be
@@ -301,27 +540,27 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     )).flatten
     assert(Set("executor0", "executor1") === firstTaskAttempts.map(_.executorId).toSet)
 
-    // fail one of the tasks, but leave the other running
+    // Fail one of the tasks, but leave the other running.
     val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get
     taskScheduler.handleFailedTask(tsm, failedTask.taskId, TaskState.FAILED, TaskResultLost)
-    // at this point, our failed task could run on the other executor, so don't give up the task
+    // At this point, our failed task could run on the other executor, so don't give up the task
     // set yet.
     assert(!failedTaskSet)
 
     // Now we fail our second executor.  The other task can still run on executor1, so make an offer
-    // on that executor, and make sure that the other task (not the failed one) is assigned there
+    // on that executor, and make sure that the other task (not the failed one) is assigned there.
     taskScheduler.executorLost("executor1", SlaveLost("oops"))
     val nextTaskAttempts =
       taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1))).flatten
     // Note: Its OK if some future change makes this already realize the taskset has become
-    // unschedulable at this point (though in the current implementation, we're sure it will not)
+    // unschedulable at this point (though in the current implementation, we're sure it will not).
     assert(nextTaskAttempts.size === 1)
     assert(nextTaskAttempts.head.executorId === "executor0")
     assert(nextTaskAttempts.head.attemptNumber === 1)
     assert(nextTaskAttempts.head.index != failedTask.index)
 
-    // now we should definitely realize that our task set is unschedulable, because the only
-    // task left can't be scheduled on any executors due to the blacklist
+    // Now we should definitely realize that our task set is unschedulable, because the only
+    // task left can't be scheduled on any executors due to the blacklist.
     taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1)))
     sc.listenerBus.waitUntilEmpty(100000)
     assert(tsm.isZombie)
@@ -408,4 +647,5 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     assert(thirdTaskDescs.size === 0)
     assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1", "executor3")))
   }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8b1609be/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 1b1a764..abc8fff 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -22,11 +22,13 @@ import java.util.Random
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
-import org.mockito.Mockito.{mock, verify}
+import org.mockito.Matchers.{anyInt, anyString}
+import org.mockito.Mockito.{mock, never, spy, verify, when}
 
 import org.apache.spark._
 import org.apache.spark.internal.config
 import org.apache.spark.internal.Logging
+import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.{AccumulatorV2, ManualClock}
 
 class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
@@ -992,6 +994,47 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     assert(manager3.name === "TaskSet_1.1")
   }
 
+  test("don't update blacklist for shuffle-fetch failures, preemption, denied commits, " +
+      "or killed tasks") {
+    // Setup a taskset, and fail some tasks for a fetch failure, preemption, denied commit,
+    // and killed task.
+    val conf = new SparkConf().
+      set(config.BLACKLIST_ENABLED, true)
+    sc = new SparkContext("local", "test", conf)
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    val taskSet = FakeTask.createTaskSet(4)
+    val tsm = new TaskSetManager(sched, taskSet, 4)
+    // we need a spy so we can attach our mock blacklist
+    val tsmSpy = spy(tsm)
+    val blacklist = mock(classOf[TaskSetBlacklist])
+    when(tsmSpy.taskSetBlacklistHelperOpt).thenReturn(Some(blacklist))
+
+    // make some offers to our taskset, to get tasks we will fail
+    val taskDescs = Seq(
+      "exec1" -> "host1",
+      "exec2" -> "host1"
+    ).flatMap { case (exec, host) =>
+      // offer each executor twice (simulating 2 cores per executor)
+      (0 until 2).flatMap{ _ => tsmSpy.resourceOffer(exec, host, TaskLocality.ANY)}
+    }
+    assert(taskDescs.size === 4)
+
+    // now fail those tasks
+    tsmSpy.handleFailedTask(taskDescs(0).taskId, TaskState.FAILED,
+      FetchFailed(BlockManagerId(taskDescs(0).executorId, "host1", 12345), 0, 0, 0, "ignored"))
+    tsmSpy.handleFailedTask(taskDescs(1).taskId, TaskState.FAILED,
+      ExecutorLostFailure(taskDescs(1).executorId, exitCausedByApp = false, reason = None))
+    tsmSpy.handleFailedTask(taskDescs(2).taskId, TaskState.FAILED,
+      TaskCommitDenied(0, 2, 0))
+    tsmSpy.handleFailedTask(taskDescs(3).taskId, TaskState.KILLED,
+      TaskKilled)
+
+    // Make sure that the blacklist ignored all of the task failures above, since they aren't
+    // the fault of the executor where the task was running.
+    verify(blacklist, never())
+      .updateBlacklistForFailedTask(anyString(), anyString(), anyInt())
+  }
+
   private def createTaskResult(
       id: Int,
       accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org