You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2019/05/07 17:02:22 UTC

[spark] branch master updated: [SPARK-27590][CORE] do not consider skipped tasks when scheduling speculative tasks

This is an automated email from the ASF dual-hosted git repository.

irashid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d124ce9  [SPARK-27590][CORE] do not consider skipped tasks when scheduling speculative tasks
d124ce9 is described below

commit d124ce9c7ea537c59a776c05977c9f918d38febc
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Tue May 7 12:02:08 2019 -0500

    [SPARK-27590][CORE] do not consider skipped tasks when scheduling speculative tasks
    
    ## What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/24375
    
    When `TaskSetManager` skips a task because its corresponding partition is already completed by other `TaskSetManager`s, we should not consider the duration of the task that is finished by other `TaskSetManager`s to schedule the speculative tasks of this `TaskSetManager`.
    
    ## How was this patch tested?
    
    updated test case
    
    Closes #24485 from cloud-fan/minor.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Imran Rashid <ir...@cloudera.com>
---
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  3 +--
 .../apache/spark/scheduler/TaskResultGetter.scala  |  5 ++---
 .../org/apache/spark/scheduler/TaskScheduler.scala |  2 +-
 .../apache/spark/scheduler/TaskSchedulerImpl.scala | 12 ++++-------
 .../apache/spark/scheduler/TaskSetManager.scala    | 24 +++++++++++-----------
 .../apache/spark/scheduler/DAGSchedulerSuite.scala |  6 ++----
 .../scheduler/ExternalClusterManagerSuite.scala    |  3 +--
 .../spark/scheduler/TaskSetManagerSuite.scala      |  4 ++--
 8 files changed, 25 insertions(+), 34 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b817eb6..1d4972e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1394,8 +1394,7 @@ private[spark] class DAGScheduler(
         // finished. Here we notify the task scheduler to skip running tasks for the same partition,
         // to save resource.
         if (task.stageAttemptId < stage.latestInfo.attemptNumber()) {
-          taskScheduler.notifyPartitionCompletion(
-            stageId, task.partitionId, event.taskInfo.duration)
+          taskScheduler.notifyPartitionCompletion(stageId, task.partitionId)
         }
 
         task match {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 09c4d9b..9b7f901 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -158,10 +158,9 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
   // This method calls `TaskSchedulerImpl.handlePartitionCompleted` asynchronously. We do not want
   // DAGScheduler to call `TaskSchedulerImpl.handlePartitionCompleted` directly, as it's
   // synchronized and may hurt the throughput of the scheduler.
-  def enqueuePartitionCompletionNotification(
-      stageId: Int, partitionId: Int, taskDuration: Long): Unit = {
+  def enqueuePartitionCompletionNotification(stageId: Int, partitionId: Int): Unit = {
     getTaskResultExecutor.execute(() => Utils.logUncaughtExceptions {
-      scheduler.handlePartitionCompleted(stageId, partitionId, taskDuration)
+      scheduler.handlePartitionCompleted(stageId, partitionId)
     })
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 1862e16..bfdbf02 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -70,7 +70,7 @@ private[spark] trait TaskScheduler {
 
   // Notify the corresponding `TaskSetManager`s of the stage, that a partition has already completed
   // and they can skip running tasks for it.
-  def notifyPartitionCompletion(stageId: Int, partitionId: Int, taskDuration: Long)
+  def notifyPartitionCompletion(stageId: Int, partitionId: Int)
 
   // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
   def setDAGScheduler(dagScheduler: DAGScheduler): Unit
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 7e820c3..532eb32 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -301,9 +301,8 @@ private[spark] class TaskSchedulerImpl(
     }
   }
 
-  override def notifyPartitionCompletion(
-      stageId: Int, partitionId: Int, taskDuration: Long): Unit = {
-    taskResultGetter.enqueuePartitionCompletionNotification(stageId, partitionId, taskDuration)
+  override def notifyPartitionCompletion(stageId: Int, partitionId: Int): Unit = {
+    taskResultGetter.enqueuePartitionCompletionNotification(stageId, partitionId)
   }
 
   /**
@@ -651,12 +650,9 @@ private[spark] class TaskSchedulerImpl(
    * means that a task completion from an earlier zombie attempt can lead to the entire stage
    * getting marked as successful.
    */
-  private[scheduler] def handlePartitionCompleted(
-      stageId: Int,
-      partitionId: Int,
-      taskDuration: Long) = synchronized {
+  private[scheduler] def handlePartitionCompleted(stageId: Int, partitionId: Int) = synchronized {
     taskSetsByStageIdAndAttempt.get(stageId).foreach(_.values.filter(!_.isZombie).foreach { tsm =>
-      tsm.markPartitionCompleted(partitionId, taskDuration)
+      tsm.markPartitionCompleted(partitionId)
     })
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index b3aa814..52323b3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -62,14 +62,8 @@ private[spark] class TaskSetManager(
   private val addedJars = HashMap[String, Long](sched.sc.addedJars.toSeq: _*)
   private val addedFiles = HashMap[String, Long](sched.sc.addedFiles.toSeq: _*)
 
-  // Quantile of tasks at which to start speculation
-  val speculationQuantile = conf.get(SPECULATION_QUANTILE)
-  val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER)
-
   val maxResultSize = conf.get(config.MAX_RESULT_SIZE)
 
-  val speculationEnabled = conf.get(SPECULATION_ENABLED)
-
   // Serializer for closures and tasks.
   val env = SparkEnv.get
   val ser = env.closureSerializer.newInstance()
@@ -80,6 +74,12 @@ private[spark] class TaskSetManager(
   val numTasks = tasks.length
   val copiesRunning = new Array[Int](numTasks)
 
+  val speculationEnabled = conf.get(SPECULATION_ENABLED)
+  // Quantile of tasks at which to start speculation
+  val speculationQuantile = conf.get(SPECULATION_QUANTILE)
+  val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER)
+  val minFinishedForSpeculation = math.max((speculationQuantile * numTasks).floor.toInt, 1)
+
   // For each task, tracks whether a copy of the task has succeeded. A task will also be
   // marked as "succeeded" if it failed with a fetch failure, in which case it should not
   // be re-run because the missing map data needs to be regenerated first.
@@ -816,12 +816,9 @@ private[spark] class TaskSetManager(
     maybeFinishTaskSet()
   }
 
-  private[scheduler] def markPartitionCompleted(partitionId: Int, taskDuration: Long): Unit = {
+  private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
     partitionToIndex.get(partitionId).foreach { index =>
       if (!successful(index)) {
-        if (speculationEnabled && !isZombie) {
-          successfulTaskDurations.insert(taskDuration)
-        }
         tasksSuccessful += 1
         successful(index) = true
         if (tasksSuccessful == numTasks) {
@@ -1035,10 +1032,13 @@ private[spark] class TaskSetManager(
       return false
     }
     var foundTasks = false
-    val minFinishedForSpeculation = (speculationQuantile * numTasks).floor.toInt
     logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
 
-    if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
+    // It's possible that a task is marked as completed by the scheduler, then the size of
+    // `successfulTaskDurations` may not equal to `tasksSuccessful`. Here we should only count the
+    // tasks that are submitted by this `TaskSetManager` and are completed successfully.
+    val numSuccessfulTasks = successfulTaskDurations.size()
+    if (numSuccessfulTasks >= minFinishedForSpeculation) {
       val time = clock.getTimeMillis()
       val medianDuration = successfulTaskDurations.median
       val threshold = max(speculationMultiplier * medianDuration, minTimeToSpeculation)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 749e47c..d58ee4e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -157,8 +157,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
       taskId: Long, interruptThread: Boolean, reason: String): Boolean = false
     override def killAllTaskAttempts(
       stageId: Int, interruptThread: Boolean, reason: String): Unit = {}
-    override def notifyPartitionCompletion(
-        stageId: Int, partitionId: Int, taskDuration: Long): Unit = {
+    override def notifyPartitionCompletion(stageId: Int, partitionId: Int): Unit = {
       taskSets.filter(_.stageId == stageId).lastOption.foreach { ts =>
         val tasks = ts.tasks.filter(_.partitionId == partitionId)
         assert(tasks.length == 1)
@@ -668,8 +667,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
           stageId: Int, interruptThread: Boolean, reason: String): Unit = {
         throw new UnsupportedOperationException
       }
-      override def notifyPartitionCompletion(
-          stageId: Int, partitionId: Int, taskDuration: Long): Unit = {
+      override def notifyPartitionCompletion(stageId: Int, partitionId: Int): Unit = {
         throw new UnsupportedOperationException
       }
       override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
index 347064d..ead34e5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -84,8 +84,7 @@ private class DummyTaskScheduler extends TaskScheduler {
     taskId: Long, interruptThread: Boolean, reason: String): Boolean = false
   override def killAllTaskAttempts(
     stageId: Int, interruptThread: Boolean, reason: String): Unit = {}
-  override def notifyPartitionCompletion(
-      stageId: Int, partitionId: Int, taskDuration: Long): Unit = {}
+  override def notifyPartitionCompletion(stageId: Int, partitionId: Int): Unit = {}
   override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
   override def defaultParallelism(): Int = 2
   override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 0666bc3..72c6ab9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -1394,8 +1394,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
 
     val taskSetManager = sched.taskSetManagerForAttempt(0, 0).get
     assert(taskSetManager.runningTasks === 8)
-    taskSetManager.markPartitionCompleted(8, 0)
-    assert(!taskSetManager.successfulTaskDurations.isEmpty())
+    taskSetManager.markPartitionCompleted(8)
+    assert(taskSetManager.successfulTaskDurations.isEmpty())
     taskSetManager.checkSpeculatableTasks(0)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org