You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2019/05/07 17:02:22 UTC
[spark] branch master updated: [SPARK-27590][CORE] do not consider
skipped tasks when scheduling speculative tasks
This is an automated email from the ASF dual-hosted git repository.
irashid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d124ce9 [SPARK-27590][CORE] do not consider skipped tasks when scheduling speculative tasks
d124ce9 is described below
commit d124ce9c7ea537c59a776c05977c9f918d38febc
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Tue May 7 12:02:08 2019 -0500
[SPARK-27590][CORE] do not consider skipped tasks when scheduling speculative tasks
## What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/24375
When `TaskSetManager` skips a task because its corresponding partition is already completed by other `TaskSetManager`s, we should not consider the duration of the task that is finished by other `TaskSetManager`s to schedule the speculative tasks of this `TaskSetManager`.
## How was this patch tested?
updated test case
Closes #24485 from cloud-fan/minor.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Imran Rashid <ir...@cloudera.com>
---
.../org/apache/spark/scheduler/DAGScheduler.scala | 3 +--
.../apache/spark/scheduler/TaskResultGetter.scala | 5 ++---
.../org/apache/spark/scheduler/TaskScheduler.scala | 2 +-
.../apache/spark/scheduler/TaskSchedulerImpl.scala | 12 ++++-------
.../apache/spark/scheduler/TaskSetManager.scala | 24 +++++++++++-----------
.../apache/spark/scheduler/DAGSchedulerSuite.scala | 6 ++----
.../scheduler/ExternalClusterManagerSuite.scala | 3 +--
.../spark/scheduler/TaskSetManagerSuite.scala | 4 ++--
8 files changed, 25 insertions(+), 34 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b817eb6..1d4972e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1394,8 +1394,7 @@ private[spark] class DAGScheduler(
// finished. Here we notify the task scheduler to skip running tasks for the same partition,
// to save resource.
if (task.stageAttemptId < stage.latestInfo.attemptNumber()) {
- taskScheduler.notifyPartitionCompletion(
- stageId, task.partitionId, event.taskInfo.duration)
+ taskScheduler.notifyPartitionCompletion(stageId, task.partitionId)
}
task match {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 09c4d9b..9b7f901 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -158,10 +158,9 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
// This method calls `TaskSchedulerImpl.handlePartitionCompleted` asynchronously. We do not want
// DAGScheduler to call `TaskSchedulerImpl.handlePartitionCompleted` directly, as it's
// synchronized and may hurt the throughput of the scheduler.
- def enqueuePartitionCompletionNotification(
- stageId: Int, partitionId: Int, taskDuration: Long): Unit = {
+ def enqueuePartitionCompletionNotification(stageId: Int, partitionId: Int): Unit = {
getTaskResultExecutor.execute(() => Utils.logUncaughtExceptions {
- scheduler.handlePartitionCompleted(stageId, partitionId, taskDuration)
+ scheduler.handlePartitionCompleted(stageId, partitionId)
})
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 1862e16..bfdbf02 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -70,7 +70,7 @@ private[spark] trait TaskScheduler {
// Notify the corresponding `TaskSetManager`s of the stage, that a partition has already completed
// and they can skip running tasks for it.
- def notifyPartitionCompletion(stageId: Int, partitionId: Int, taskDuration: Long)
+ def notifyPartitionCompletion(stageId: Int, partitionId: Int)
// Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
def setDAGScheduler(dagScheduler: DAGScheduler): Unit
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 7e820c3..532eb32 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -301,9 +301,8 @@ private[spark] class TaskSchedulerImpl(
}
}
- override def notifyPartitionCompletion(
- stageId: Int, partitionId: Int, taskDuration: Long): Unit = {
- taskResultGetter.enqueuePartitionCompletionNotification(stageId, partitionId, taskDuration)
+ override def notifyPartitionCompletion(stageId: Int, partitionId: Int): Unit = {
+ taskResultGetter.enqueuePartitionCompletionNotification(stageId, partitionId)
}
/**
@@ -651,12 +650,9 @@ private[spark] class TaskSchedulerImpl(
* means that a task completion from an earlier zombie attempt can lead to the entire stage
* getting marked as successful.
*/
- private[scheduler] def handlePartitionCompleted(
- stageId: Int,
- partitionId: Int,
- taskDuration: Long) = synchronized {
+ private[scheduler] def handlePartitionCompleted(stageId: Int, partitionId: Int) = synchronized {
taskSetsByStageIdAndAttempt.get(stageId).foreach(_.values.filter(!_.isZombie).foreach { tsm =>
- tsm.markPartitionCompleted(partitionId, taskDuration)
+ tsm.markPartitionCompleted(partitionId)
})
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index b3aa814..52323b3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -62,14 +62,8 @@ private[spark] class TaskSetManager(
private val addedJars = HashMap[String, Long](sched.sc.addedJars.toSeq: _*)
private val addedFiles = HashMap[String, Long](sched.sc.addedFiles.toSeq: _*)
- // Quantile of tasks at which to start speculation
- val speculationQuantile = conf.get(SPECULATION_QUANTILE)
- val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER)
-
val maxResultSize = conf.get(config.MAX_RESULT_SIZE)
- val speculationEnabled = conf.get(SPECULATION_ENABLED)
-
// Serializer for closures and tasks.
val env = SparkEnv.get
val ser = env.closureSerializer.newInstance()
@@ -80,6 +74,12 @@ private[spark] class TaskSetManager(
val numTasks = tasks.length
val copiesRunning = new Array[Int](numTasks)
+ val speculationEnabled = conf.get(SPECULATION_ENABLED)
+ // Quantile of tasks at which to start speculation
+ val speculationQuantile = conf.get(SPECULATION_QUANTILE)
+ val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER)
+ val minFinishedForSpeculation = math.max((speculationQuantile * numTasks).floor.toInt, 1)
+
// For each task, tracks whether a copy of the task has succeeded. A task will also be
// marked as "succeeded" if it failed with a fetch failure, in which case it should not
// be re-run because the missing map data needs to be regenerated first.
@@ -816,12 +816,9 @@ private[spark] class TaskSetManager(
maybeFinishTaskSet()
}
- private[scheduler] def markPartitionCompleted(partitionId: Int, taskDuration: Long): Unit = {
+ private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
partitionToIndex.get(partitionId).foreach { index =>
if (!successful(index)) {
- if (speculationEnabled && !isZombie) {
- successfulTaskDurations.insert(taskDuration)
- }
tasksSuccessful += 1
successful(index) = true
if (tasksSuccessful == numTasks) {
@@ -1035,10 +1032,13 @@ private[spark] class TaskSetManager(
return false
}
var foundTasks = false
- val minFinishedForSpeculation = (speculationQuantile * numTasks).floor.toInt
logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
- if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
+ // It's possible that a task is marked as completed by the scheduler, then the size of
+ // `successfulTaskDurations` may not equal to `tasksSuccessful`. Here we should only count the
+ // tasks that are submitted by this `TaskSetManager` and are completed successfully.
+ val numSuccessfulTasks = successfulTaskDurations.size()
+ if (numSuccessfulTasks >= minFinishedForSpeculation) {
val time = clock.getTimeMillis()
val medianDuration = successfulTaskDurations.median
val threshold = max(speculationMultiplier * medianDuration, minTimeToSpeculation)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 749e47c..d58ee4e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -157,8 +157,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
taskId: Long, interruptThread: Boolean, reason: String): Boolean = false
override def killAllTaskAttempts(
stageId: Int, interruptThread: Boolean, reason: String): Unit = {}
- override def notifyPartitionCompletion(
- stageId: Int, partitionId: Int, taskDuration: Long): Unit = {
+ override def notifyPartitionCompletion(stageId: Int, partitionId: Int): Unit = {
taskSets.filter(_.stageId == stageId).lastOption.foreach { ts =>
val tasks = ts.tasks.filter(_.partitionId == partitionId)
assert(tasks.length == 1)
@@ -668,8 +667,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
stageId: Int, interruptThread: Boolean, reason: String): Unit = {
throw new UnsupportedOperationException
}
- override def notifyPartitionCompletion(
- stageId: Int, partitionId: Int, taskDuration: Long): Unit = {
+ override def notifyPartitionCompletion(stageId: Int, partitionId: Int): Unit = {
throw new UnsupportedOperationException
}
override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
index 347064d..ead34e5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -84,8 +84,7 @@ private class DummyTaskScheduler extends TaskScheduler {
taskId: Long, interruptThread: Boolean, reason: String): Boolean = false
override def killAllTaskAttempts(
stageId: Int, interruptThread: Boolean, reason: String): Unit = {}
- override def notifyPartitionCompletion(
- stageId: Int, partitionId: Int, taskDuration: Long): Unit = {}
+ override def notifyPartitionCompletion(stageId: Int, partitionId: Int): Unit = {}
override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
override def defaultParallelism(): Int = 2
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 0666bc3..72c6ab9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -1394,8 +1394,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val taskSetManager = sched.taskSetManagerForAttempt(0, 0).get
assert(taskSetManager.runningTasks === 8)
- taskSetManager.markPartitionCompleted(8, 0)
- assert(!taskSetManager.successfulTaskDurations.isEmpty())
+ taskSetManager.markPartitionCompleted(8)
+ assert(taskSetManager.successfulTaskDurations.isEmpty())
taskSetManager.checkSpeculatableTasks(0)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org