You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/08/23 03:31:59 UTC
spark git commit: [SPARK-19326] Speculated task attempts do not get
launched in few scenarios
Repository: spark
Updated Branches:
refs/heads/master 342961905 -> d58a3507e
[SPARK-19326] Speculated task attempts do not get launched in few scenarios
## What changes were proposed in this pull request?
Add a new listener event when a speculative task is created and notify it to ExecutorAllocationManager for requesting more executor.
## How was this patch tested?
- Added Unittests.
- For the test snippet in the jira:
val n = 100
val someRDD = sc.parallelize(1 to n, n)
someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
if (index == 1) {
Thread.sleep(Long.MaxValue) // fake long running task(s)
}
it.toList.map(x => index + ", " + x).iterator
}).collect
With this code change, spark indicates 101 jobs are running (99 succeeded, 2 running and 1 is speculative job)
Author: Jane Wang <ja...@fb.com>
Closes #18492 from janewangfb/speculated_task_not_launched.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d58a3507
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d58a3507
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d58a3507
Branch: refs/heads/master
Commit: d58a3507ed2d48eabb857c92aecead19a52f4952
Parents: 3429619
Author: Jane Wang <ja...@fb.com>
Authored: Wed Aug 23 11:31:54 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Aug 23 11:31:54 2017 +0800
----------------------------------------------------------------------
.../org/apache/spark/SparkFirehoseListener.java | 5 ++
.../spark/ExecutorAllocationManager.scala | 61 +++++++++++++++++---
.../apache/spark/scheduler/DAGScheduler.scala | 14 +++++
.../spark/scheduler/DAGSchedulerEvent.scala | 4 ++
.../apache/spark/scheduler/SparkListener.scala | 11 ++++
.../spark/scheduler/SparkListenerBus.scala | 2 +
.../apache/spark/scheduler/TaskSetManager.scala | 1 +
.../spark/ExecutorAllocationManagerSuite.scala | 48 ++++++++++++++-
.../spark/scheduler/TaskSetManagerSuite.scala | 9 +++
9 files changed, 144 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d58a3507/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
index 140c52f..3583856 100644
--- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
+++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
@@ -140,6 +140,11 @@ public class SparkFirehoseListener implements SparkListenerInterface {
}
@Override
+ public void onSpeculativeTaskSubmitted(SparkListenerSpeculativeTaskSubmitted speculativeTask) {
+ onEvent(speculativeTask);
+ }
+
+ @Override
public void onOtherEvent(SparkListenerEvent event) {
onEvent(event);
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d58a3507/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 337631a..3350326 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -373,8 +373,14 @@ private[spark] class ExecutorAllocationManager(
// If our target has not changed, do not send a message
// to the cluster manager and reset our exponential growth
if (delta == 0) {
- numExecutorsToAdd = 1
- return 0
+ // Check if there is any speculative jobs pending
+ if (listener.pendingTasks == 0 && listener.pendingSpeculativeTasks > 0) {
+ numExecutorsTarget =
+ math.max(math.min(maxNumExecutorsNeeded + 1, maxNumExecutors), minNumExecutors)
+ } else {
+ numExecutorsToAdd = 1
+ return 0
+ }
}
val addRequestAcknowledged = try {
@@ -588,17 +594,22 @@ private[spark] class ExecutorAllocationManager(
* A listener that notifies the given allocation manager of when to add and remove executors.
*
* This class is intentionally conservative in its assumptions about the relative ordering
- * and consistency of events returned by the listener. For simplicity, it does not account
- * for speculated tasks.
+ * and consistency of events returned by the listener.
*/
private class ExecutorAllocationListener extends SparkListener {
private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
- // Number of tasks currently running on the cluster. Should be 0 when no stages are active.
+ // Number of tasks currently running on the cluster including speculative tasks.
+ // Should be 0 when no stages are active.
private var numRunningTasks: Int = _
+ // Number of speculative tasks to be scheduled in each stage
+ private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, Int]
+ // The speculative tasks started in each stage
+ private val stageIdToSpeculativeTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
+
// stageId to tuple (the number of task with locality preferences, a map where each pair is a
// node and the number of tasks that would like to be scheduled on that node) map,
// maintain the executor placement hints for each stage Id used by resource framework to better
@@ -637,7 +648,9 @@ private[spark] class ExecutorAllocationManager(
val stageId = stageCompleted.stageInfo.stageId
allocationManager.synchronized {
stageIdToNumTasks -= stageId
+ stageIdToNumSpeculativeTasks -= stageId
stageIdToTaskIndices -= stageId
+ stageIdToSpeculativeTaskIndices -= stageId
stageIdToExecutorPlacementHints -= stageId
// Update the executor placement hints
@@ -645,7 +658,7 @@ private[spark] class ExecutorAllocationManager(
// If this is the last stage with pending tasks, mark the scheduler queue as empty
// This is needed in case the stage is aborted for any reason
- if (stageIdToNumTasks.isEmpty) {
+ if (stageIdToNumTasks.isEmpty && stageIdToNumSpeculativeTasks.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
if (numRunningTasks != 0) {
logWarning("No stages are running, but numRunningTasks != 0")
@@ -671,7 +684,12 @@ private[spark] class ExecutorAllocationManager(
}
// If this is the last pending task, mark the scheduler queue as empty
- stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex
+ if (taskStart.taskInfo.speculative) {
+ stageIdToSpeculativeTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) +=
+ taskIndex
+ } else {
+ stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex
+ }
if (totalPendingTasks() == 0) {
allocationManager.onSchedulerQueueEmpty()
}
@@ -705,7 +723,11 @@ private[spark] class ExecutorAllocationManager(
if (totalPendingTasks() == 0) {
allocationManager.onSchedulerBacklogged()
}
- stageIdToTaskIndices.get(stageId).foreach { _.remove(taskIndex) }
+ if (taskEnd.taskInfo.speculative) {
+ stageIdToSpeculativeTaskIndices.get(stageId).foreach {_.remove(taskIndex)}
+ } else {
+ stageIdToTaskIndices.get(stageId).foreach {_.remove(taskIndex)}
+ }
}
}
}
@@ -726,18 +748,39 @@ private[spark] class ExecutorAllocationManager(
allocationManager.onExecutorRemoved(executorRemoved.executorId)
}
+ override def onSpeculativeTaskSubmitted(speculativeTask: SparkListenerSpeculativeTaskSubmitted)
+ : Unit = {
+ val stageId = speculativeTask.stageId
+
+ allocationManager.synchronized {
+ stageIdToNumSpeculativeTasks(stageId) =
+ stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
+ allocationManager.onSchedulerBacklogged()
+ }
+ }
+
/**
* An estimate of the total number of pending tasks remaining for currently running stages. Does
* not account for tasks which may have failed and been resubmitted.
*
* Note: This is not thread-safe without the caller owning the `allocationManager` lock.
*/
- def totalPendingTasks(): Int = {
+ def pendingTasks(): Int = {
stageIdToNumTasks.map { case (stageId, numTasks) =>
numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0)
}.sum
}
+ def pendingSpeculativeTasks(): Int = {
+ stageIdToNumSpeculativeTasks.map { case (stageId, numTasks) =>
+ numTasks - stageIdToSpeculativeTaskIndices.get(stageId).map(_.size).getOrElse(0)
+ }.sum
+ }
+
+ def totalPendingTasks(): Int = {
+ pendingTasks + pendingSpeculativeTasks
+ }
+
/**
* The number of tasks currently running across all stages.
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/d58a3507/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 21bf9d0..562dd1d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -281,6 +281,13 @@ class DAGScheduler(
eventProcessLoop.post(TaskSetFailed(taskSet, reason, exception))
}
+ /**
+ * Called by the TaskSetManager when it decides a speculative task is needed.
+ */
+ def speculativeTaskSubmitted(task: Task[_]): Unit = {
+ eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+ }
+
private[scheduler]
def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized {
// Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times
@@ -812,6 +819,10 @@ class DAGScheduler(
listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
}
+ private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = {
+ listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId))
+ }
+
private[scheduler] def handleTaskSetFailed(
taskSet: TaskSet,
reason: String,
@@ -1778,6 +1789,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
+ case SpeculativeTaskSubmitted(task) =>
+ dagScheduler.handleSpeculativeTaskSubmitted(task)
+
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
http://git-wip-us.apache.org/repos/asf/spark/blob/d58a3507/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 3f8d563..54ab8f8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -94,3 +94,7 @@ case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Thr
extends DAGSchedulerEvent
private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent
+
+private[scheduler]
+case class SpeculativeTaskSubmitted(task: Task[_]) extends DAGSchedulerEvent
+
http://git-wip-us.apache.org/repos/asf/spark/blob/d58a3507/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 59f89a8..b76e560 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -53,6 +53,9 @@ case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: T
case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent
@DeveloperApi
+case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends SparkListenerEvent
+
+@DeveloperApi
case class SparkListenerTaskEnd(
stageId: Int,
stageAttemptId: Int,
@@ -291,6 +294,11 @@ private[spark] trait SparkListenerInterface {
def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit
/**
+ * Called when a speculative task is submitted
+ */
+ def onSpeculativeTaskSubmitted(speculativeTask: SparkListenerSpeculativeTaskSubmitted): Unit
+
+ /**
* Called when other events like SQL-specific events are posted.
*/
def onOtherEvent(event: SparkListenerEvent): Unit
@@ -354,5 +362,8 @@ abstract class SparkListener extends SparkListenerInterface {
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }
+ override def onSpeculativeTaskSubmitted(
+ speculativeTask: SparkListenerSpeculativeTaskSubmitted): Unit = { }
+
override def onOtherEvent(event: SparkListenerEvent): Unit = { }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d58a3507/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 3b0d3b1..056c0cb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -71,6 +71,8 @@ private[spark] trait SparkListenerBus
listener.onNodeUnblacklisted(nodeUnblacklisted)
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
+ case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
+ listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
case _ => listener.onOtherEvent(event)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d58a3507/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index c2f8178..3804ea8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -966,6 +966,7 @@ private[spark] class TaskSetManager(
"Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms"
.format(index, taskSet.id, info.host, threshold))
speculatableTasks += index
+ sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
foundTasks = true
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d58a3507/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index b9ce71a..7da4bae 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -188,6 +188,40 @@ class ExecutorAllocationManagerSuite
assert(numExecutorsTarget(manager) === 10)
}
+ test("add executors when speculative tasks added") {
+ sc = createSparkContext(0, 10, 0)
+ val manager = sc.executorAllocationManager.get
+
+ // Verify that we're capped at number of tasks including the speculative ones in the stage
+ sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+ assert(numExecutorsTarget(manager) === 0)
+ assert(numExecutorsToAdd(manager) === 1)
+ assert(addExecutors(manager) === 1)
+ sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+ sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2)))
+ assert(numExecutorsTarget(manager) === 1)
+ assert(numExecutorsToAdd(manager) === 2)
+ assert(addExecutors(manager) === 2)
+ assert(numExecutorsTarget(manager) === 3)
+ assert(numExecutorsToAdd(manager) === 4)
+ assert(addExecutors(manager) === 2)
+ assert(numExecutorsTarget(manager) === 5)
+ assert(numExecutorsToAdd(manager) === 1)
+
+ // Verify that running a task doesn't affect the target
+ sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
+ assert(numExecutorsTarget(manager) === 5)
+ assert(addExecutors(manager) === 0)
+ assert(numExecutorsToAdd(manager) === 1)
+
+ // Verify that running a speculative task doesn't affect the target
+ sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-2", true)))
+ assert(numExecutorsTarget(manager) === 5)
+ assert(addExecutors(manager) === 0)
+ assert(numExecutorsToAdd(manager) === 1)
+ }
+
test("cancel pending executors when no longer needed") {
sc = createSparkContext(0, 10, 0)
val manager = sc.executorAllocationManager.get
@@ -1031,10 +1065,15 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
taskLocalityPreferences = taskLocalityPreferences)
}
- private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): TaskInfo = {
- new TaskInfo(taskId, taskIndex, 0, 0, executorId, "", TaskLocality.ANY, speculative = false)
+ private def createTaskInfo(
+ taskId: Int,
+ taskIndex: Int,
+ executorId: String,
+ speculative: Boolean = false): TaskInfo = {
+ new TaskInfo(taskId, taskIndex, 0, 0, executorId, "", TaskLocality.ANY, speculative)
}
+
/* ------------------------------------------------------- *
| Helper methods for accessing private methods and fields |
* ------------------------------------------------------- */
@@ -1061,6 +1100,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
private val _onExecutorBusy = PrivateMethod[Unit]('onExecutorBusy)
private val _localityAwareTasks = PrivateMethod[Int]('localityAwareTasks)
private val _hostToLocalTaskCount = PrivateMethod[Map[String, Int]]('hostToLocalTaskCount)
+ private val _onSpeculativeTaskSubmitted = PrivateMethod[Unit]('onSpeculativeTaskSubmitted)
private def numExecutorsToAdd(manager: ExecutorAllocationManager): Int = {
manager invokePrivate _numExecutorsToAdd()
@@ -1136,6 +1176,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
manager invokePrivate _onExecutorBusy(id)
}
+ private def onSpeculativeTaskSubmitted(manager: ExecutorAllocationManager, id: String) : Unit = {
+ manager invokePrivate _onSpeculativeTaskSubmitted(id)
+ }
+
private def localityAwareTasks(manager: ExecutorAllocationManager): Int = {
manager invokePrivate _localityAwareTasks()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d58a3507/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 6f1663b..ae43f4c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -60,6 +60,10 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
exception: Option[Throwable]): Unit = {
taskScheduler.taskSetsFailed += taskSet.id
}
+
+ override def speculativeTaskSubmitted(task: Task[_]): Unit = {
+ taskScheduler.speculativeTasks += task.partitionId
+ }
}
// Get the rack for a given host
@@ -92,6 +96,7 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
val endedTasks = new mutable.HashMap[Long, TaskEndReason]
val finishedManagers = new ArrayBuffer[TaskSetManager]
val taskSetsFailed = new ArrayBuffer[String]
+ val speculativeTasks = new ArrayBuffer[Int]
val executors = new mutable.HashMap[String, String]
for ((execId, host) <- liveExecutors) {
@@ -139,6 +144,7 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
}
}
+
override def getRackForHost(value: String): Option[String] = FakeRackUtil.getRackForHost(value)
}
@@ -929,6 +935,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// > 0ms, so advance the clock by 1ms here.
clock.advance(1)
assert(manager.checkSpeculatableTasks(0))
+ assert(sched.speculativeTasks.toSet === Set(3))
+
// Offer resource to start the speculative attempt for the running task
val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption5.isDefined)
@@ -1016,6 +1024,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// > 0ms, so advance the clock by 1ms here.
clock.advance(1)
assert(manager.checkSpeculatableTasks(0))
+ assert(sched.speculativeTasks.toSet === Set(3, 4))
// Offer resource to start the speculative attempt for the running task
val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption5.isDefined)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org