You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by wu...@apache.org on 2022/06/29 05:39:52 UTC

[spark] branch master updated: [SPARK-32170][CORE] Improve the speculation through the stage task metrics

This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f1046afa40 [SPARK-32170][CORE] Improve the speculation through the stage task metrics
6f1046afa40 is described below

commit 6f1046afa40096f477b29beecca5ca6286dfa7f3
Author: weixiuli <we...@jd.com>
AuthorDate: Wed Jun 29 13:39:03 2022 +0800

    [SPARK-32170][CORE] Improve the speculation through the stage task metrics
    
    ### What changes were proposed in this pull request?
    
    Currently, the mechanism of speculation is as follows:
    1. The number of successful tasks more than spark.speculation.quantile(default is 0.75) * numTasks.
    2. When some unsuccessful tasks run for more than spark.speculation.multiplier( default is 1.5) * medianDuration, they will speculate.
    
    The mechanism means that it will be the last 10% of the tasks subject to speculate, as long as above conditions are met.
    
    For example: A reduce stage whose TaskSet's size is 10, and the TaskSet state is  as follows:
    
    taskIndex | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9
    -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | --
    All   records need to deal with | 10000 | 10000 | 10000 | 10000 | 10000 | 10000 | 10000 | 10000 | 18000 | 18000
    Records   that have been processed | 10000 | 10000 | 10000 | 10000 | 10000 | 10000 | 10000 | 10000 | 11000 | 15000
    runTime(seconds) | 20 | 20 | 20 | 20 | 20 | 20 | 20 | 20 | 30 | 30
    progressRateļ¼ˆRecords   that have been processed/runTimeļ¼‰ | 500 | 500 | 500 | 500 | 500 | 500 | 500 | 500 | 183.33 | 500
    completed | true | true | true | true | true | true | true | true | false | false
    
    In the current speculation mechanism, both taskIndex 8 and taskIndex 9 will speculate, however, only taskIndex 8 needs to speculate, because only taskIndex 8 is inefficient.
    
    In our production, there are more than 110 million speculation tasks every day, but only 30% of speculation tasks are successful finally. To analysis the unsuccessful speculation tasks, and found that the original tasks of speculation are more efficient, which might be unnecessary to speculate at all.  The unnecessary speculative tasks not only waste cluster resources but also interfere with the scheduling of other tasks.
    
    This pr will try to improve the speculation through the stage task metrics. We use stage task metrics(inputMetrics and shuffleReadMetrics) and task runtimes to evaluate the efficiency of task processing, and  evaluate the inefficient tasks by measuring successful ones , and only need to speculate the inefficient tasks.  With this pr,  we may only speculate the taskIndex 8 in the example above,  which makes more sense and helps optimize cluster resources.
    
    In addition, in order to avoid regression, we should try to speculate as much as possible for long-running tasks.
    
    ### Why are the changes needed?
    Improve the speculation.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add unittests.
    
    Closes #36162 from weixiuli/improve-speculation.
    
    Authored-by: weixiuli <we...@jd.com>
    Signed-off-by: yi.wu <yi...@databricks.com>
---
 .../org/apache/spark/executor/InputMetrics.scala   |   2 +
 .../org/apache/spark/internal/config/package.scala |  35 ++++
 .../apache/spark/scheduler/TaskSchedulerImpl.scala |  44 +++-
 .../apache/spark/scheduler/TaskSetManager.scala    | 179 ++++++++++++----
 .../spark/scheduler/TaskSetManagerSuite.scala      | 232 +++++++++++++++++++++
 docs/configuration.md                              |  35 ++++
 6 files changed, 481 insertions(+), 46 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
index 3d15f3a0396..a398a0159cc 100644
--- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
@@ -56,4 +56,6 @@ class InputMetrics private[spark] () extends Serializable {
   private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v)
   private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
   private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v)
+  // For test only
+  private[spark] def setRecordsRead(v: Long): Unit = _recordsRead.setValue(v)
 }
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index b67250b7b84..02a52e86454 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2073,6 +2073,41 @@ package object config {
       .timeConf(TimeUnit.MILLISECONDS)
       .createOptional
 
+  private[spark] val SPECULATION_EFFICIENCY_TASK_PROCESS_RATE_MULTIPLIER =
+    ConfigBuilder("spark.speculation.efficiency.processRateMultiplier")
+      .doc("A multiplier that used when evaluating inefficient tasks. The higher the multiplier " +
+        "is, the more tasks will be possibly considered as inefficient.")
+      .version("3.4.0")
+      .doubleConf
+      .checkValue(v => v > 0.0 && v <= 1.0, "multiplier must be in (0.0, 1.0]")
+      .createWithDefault(0.75)
+
+  private[spark] val SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR =
+    ConfigBuilder("spark.speculation.efficiency.longRunTaskFactor")
+      .doc(s"A task will be speculated anyway as long as its duration has exceeded the value of " +
+        s"multiplying the factor and the time threshold (either be ${SPECULATION_MULTIPLIER.key} " +
+        s"* successfulTaskDurations.median or ${SPECULATION_MIN_THRESHOLD.key}) regardless of " +
+        s"it's data process rate is good or not. This avoids missing the inefficient tasks when " +
+        s"task slow isn't related to data process rate.")
+      .version("3.4.0")
+      .doubleConf
+      .checkValue(_ >= 1.0, "Duration factor must be >= 1.0")
+      .createWithDefault(2.0)
+
+  private[spark] val SPECULATION_EFFICIENCY_ENABLE =
+    ConfigBuilder("spark.speculation.efficiency.enabled")
+      .doc(s"When set to true, spark will evaluate the efficiency of task processing through the " +
+        s"stage task metrics or its duration, and only need to speculate the inefficient tasks. " +
+        s"A task is inefficient when 1)its data process rate is less than the average data " +
+        s"process rate of all successful tasks in the stage multiplied by a multiplier or 2)its " +
+        s"duration has exceeded the value of multiplying " +
+        s"${SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR.key} and the time threshold (either be " +
+        s"${SPECULATION_MULTIPLIER.key} * successfulTaskDurations.median or " +
+        s"${SPECULATION_MIN_THRESHOLD.key}).")
+      .version("3.4.0")
+      .booleanConf
+      .createWithDefault(true)
+
   private[spark] val DECOMMISSION_ENABLED =
     ConfigBuilder("spark.decommission.enabled")
       .doc("When decommission enabled, Spark will try its best to shutdown the executor " +
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index d20b534ee63..9bd6b976f40 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, HashSet}
 import scala.util.Random
 
 import org.apache.spark._
+import org.apache.spark.InternalAccumulator.{input, shuffleRead}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.executor.ExecutorMetrics
@@ -103,6 +104,9 @@ private[spark] class TaskSchedulerImpl(
   // of tasks that are very short.
   val MIN_TIME_TO_SPECULATION = conf.get(SPECULATION_MIN_THRESHOLD)
 
+  private[scheduler] val efficientTaskCalcualtionEnabled = conf.get(SPECULATION_ENABLED) &&
+    conf.get(SPECULATION_EFFICIENCY_ENABLE)
+
   private val speculationScheduler =
     ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")
 
@@ -853,8 +857,13 @@ private[spark] class TaskSchedulerImpl(
     // (taskId, stageId, stageAttemptId, accumUpdates)
     val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = {
       accumUpdates.flatMap { case (id, updates) =>
-        val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None))
         Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr =>
+          val (accInfos, taskProcessRate) = getTaskAccumulableInfosAndProcessRate(updates)
+          if (efficientTaskCalcualtionEnabled && taskProcessRate > 0.0) {
+            taskSetMgr.taskProcessRateCalculator.foreach {
+              _.updateRunningTaskProcessRate(id, taskProcessRate)
+            }
+          }
           (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, accInfos)
         }
       }
@@ -863,6 +872,39 @@ private[spark] class TaskSchedulerImpl(
       executorUpdates)
   }
 
+ private def getTaskAccumulableInfosAndProcessRate(
+     updates: Seq[AccumulatorV2[_, _]]): (Seq[AccumulableInfo], Double) = {
+   var recordsRead = 0L
+   var executorRunTime = 0L
+   val accInfos = updates.map { acc =>
+     if (efficientTaskCalcualtionEnabled && acc.name.isDefined) {
+       val name = acc.name.get
+       if (name == shuffleRead.RECORDS_READ || name == input.RECORDS_READ) {
+         recordsRead += acc.value.asInstanceOf[Long]
+       } else if (name == InternalAccumulator.EXECUTOR_RUN_TIME) {
+         executorRunTime = acc.value.asInstanceOf[Long]
+       }
+     }
+     acc.toInfo(Some(acc.value), None)
+   }
+   val taskProcessRate = if (efficientTaskCalcualtionEnabled) {
+     getTaskProcessRate(recordsRead, executorRunTime)
+   } else {
+     0.0D
+   }
+   (accInfos, taskProcessRate)
+ }
+
+  private[scheduler] def getTaskProcessRate(
+      recordsRead: Long,
+      executorRunTime: Long): Double = {
+    if (executorRunTime > 0 && recordsRead > 0) {
+      recordsRead / (executorRunTime / 1000.0)
+    } else {
+      0.0D
+    }
+  }
+
   def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit = synchronized {
     taskSetManager.handleTaskGettingResult(tid)
   }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 437f4860ed8..1636587e9dd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
 
 import java.io.NotSerializableException
 import java.nio.ByteBuffer
-import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, TimeUnit}
 
 import scala.collection.immutable.Map
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
@@ -27,6 +27,8 @@ import scala.math.max
 import scala.util.control.NonFatal
 
 import org.apache.spark._
+import org.apache.spark.InternalAccumulator
+import org.apache.spark.InternalAccumulator.{input, shuffleRead}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.internal.{config, Logging}
@@ -80,12 +82,17 @@ private[spark] class TaskSetManager(
   val copiesRunning = new Array[Int](numTasks)
 
   val speculationEnabled = conf.get(SPECULATION_ENABLED)
+  private val efficientTaskProcessMultiplier =
+    conf.get(SPECULATION_EFFICIENCY_TASK_PROCESS_RATE_MULTIPLIER)
+  private val efficientTaskDurationFactor = conf.get(SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR)
+
   // Quantile of tasks at which to start speculation
   val speculationQuantile = conf.get(SPECULATION_QUANTILE)
   val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER)
   val minFinishedForSpeculation = math.max((speculationQuantile * numTasks).floor.toInt, 1)
   // User provided threshold for speculation regardless of whether the quantile has been reached
   val speculationTaskDurationThresOpt = conf.get(SPECULATION_TASK_DURATION_THRESHOLD)
+  private val isSpeculationThresholdSpecified = speculationTaskDurationThresOpt.exists(_ > 0)
   // SPARK-29976: Only when the total number of tasks in the stage is less than or equal to the
   // number of slots on a single executor, would the task manager speculative run the tasks if
   // their duration is longer than the given threshold. In this way, we wouldn't speculate too
@@ -109,6 +116,13 @@ private[spark] class TaskSetManager(
   private val executorDecommissionKillInterval =
     conf.get(EXECUTOR_DECOMMISSION_KILL_INTERVAL).map(TimeUnit.SECONDS.toMillis)
 
+  private[scheduler] val taskProcessRateCalculator =
+    if (sched.efficientTaskCalcualtionEnabled) {
+      Some(new TaskProcessRateCalculator())
+    } else {
+      None
+    }
+
   // For each task, tracks whether a copy of the task has succeeded. A task will also be
   // marked as "succeeded" if it failed with a fetch failure, in which case it should not
   // be re-run because the missing map data needs to be regenerated first.
@@ -801,6 +815,7 @@ private[spark] class TaskSetManager(
     info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
     if (speculationEnabled) {
       successfulTaskDurations.insert(info.duration)
+      taskProcessRateCalculator.foreach(_.updateAvgTaskProcessRate(tid, result))
     }
     removeRunningTask(tid)
 
@@ -1069,25 +1084,66 @@ private[spark] class TaskSetManager(
    * Check if the task associated with the given tid has past the time threshold and should be
    * speculative run.
    */
-  private def checkAndSubmitSpeculatableTask(
-      tid: Long,
+  private def checkAndSubmitSpeculatableTasks(
       currentTimeMillis: Long,
-      threshold: Double): Boolean = {
-    val info = taskInfos(tid)
-    val index = info.index
-    if (!successful(index) && copiesRunning(index) == 1 &&
-        info.timeRunning(currentTimeMillis) > threshold && !speculatableTasks.contains(index)) {
-      addPendingTask(index, speculatable = true)
-      logInfo(
-        ("Marking task %d in stage %s (on %s) as speculatable because it ran more" +
-          " than %.0f ms(%d speculatable tasks in this taskset now)")
-          .format(index, taskSet.id, info.host, threshold, speculatableTasks.size + 1))
-      speculatableTasks += index
-      sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
-      true
-    } else {
-      false
+      threshold: Double,
+      customizedThreshold: Boolean = false): Boolean = {
+    var foundTasksResult = false
+    for (tid <- runningTasksSet) {
+      val info = taskInfos(tid)
+      val index = info.index
+      if (!successful(index) && copiesRunning(index) == 1 && !speculatableTasks.contains(index)) {
+        val runtimeMs = info.timeRunning(currentTimeMillis)
+
+        def checkMaySpeculate(): Boolean = {
+          if (customizedThreshold || taskProcessRateCalculator.isEmpty) {
+            true
+          } else {
+            isInefficient()
+          }
+        }
+
+        def isInefficient(): Boolean = {
+          (runtimeMs > efficientTaskDurationFactor * threshold) || taskProcessRateIsInefficient()
+        }
+
+        def taskProcessRateIsInefficient(): Boolean = {
+          taskProcessRateCalculator.forall(calculator => {
+            calculator.getRunningTasksProcessRate(tid) <
+              calculator.getAvgTaskProcessRate() * efficientTaskProcessMultiplier
+          })
+        }
+
+        def shouldSpeculateForExecutorDecommissioning(): Boolean = {
+          !customizedThreshold && executorDecommissionKillInterval.isDefined &&
+            !successfulTaskDurations.isEmpty() &&
+            sched.getExecutorDecommissionState(info.executorId).exists { decomState =>
+              // Check if this task might finish after this executor is decommissioned.
+              // We estimate the task's finish time by using the median task duration.
+              // Whereas the time when the executor might be decommissioned is estimated using the
+              // config executorDecommissionKillInterval. If the task is going to finish after
+              // decommissioning, then we will eagerly speculate the task.
+              val taskEndTimeBasedOnMedianDuration =
+                info.launchTime + successfulTaskDurations.median
+              val executorDecomTime = decomState.startTime + executorDecommissionKillInterval.get
+              executorDecomTime < taskEndTimeBasedOnMedianDuration
+            }
+        }
+        val speculated = (runtimeMs > threshold) && checkMaySpeculate() ||
+          shouldSpeculateForExecutorDecommissioning()
+        if (speculated) {
+          addPendingTask(index, speculatable = true)
+          logInfo(
+            ("Marking task %d in stage %s (on %s) as speculatable because it ran more" +
+              " than %.0f ms(%d speculatable tasks in this taskset now)")
+              .format(index, taskSet.id, info.host, threshold, speculatableTasks.size + 1))
+          speculatableTasks += index
+          sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
+        }
+        foundTasksResult |= speculated
+      }
     }
+    foundTasksResult
   }
 
   /**
@@ -1099,7 +1155,7 @@ private[spark] class TaskSetManager(
     // No need to speculate if the task set is zombie or is from a barrier stage. If there is only
     // one task we don't speculate since we don't have metrics to decide whether it's taking too
     // long or not, unless a task duration threshold is explicitly provided.
-    if (isZombie || isBarrier || (numTasks == 1 && !speculationTaskDurationThresOpt.isDefined)) {
+    if (isZombie || isBarrier || (numTasks == 1 && !isSpeculationThresholdSpecified)) {
       return false
     }
     var foundTasks = false
@@ -1109,40 +1165,24 @@ private[spark] class TaskSetManager(
     // `successfulTaskDurations` may not equal to `tasksSuccessful`. Here we should only count the
     // tasks that are submitted by this `TaskSetManager` and are completed successfully.
     val numSuccessfulTasks = successfulTaskDurations.size()
+    val timeMs = clock.getTimeMillis()
     if (numSuccessfulTasks >= minFinishedForSpeculation) {
-      val time = clock.getTimeMillis()
       val medianDuration = successfulTaskDurations.median
       val threshold = max(speculationMultiplier * medianDuration, minTimeToSpeculation)
       // TODO: Threshold should also look at standard deviation of task durations and have a lower
       // bound based on that.
       logDebug("Task length threshold for speculation: " + threshold)
-      for (tid <- runningTasksSet) {
-        var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
-        if (!speculated && executorDecommissionKillInterval.isDefined) {
-          val taskInfo = taskInfos(tid)
-          val decomState = sched.getExecutorDecommissionState(taskInfo.executorId)
-          if (decomState.isDefined) {
-            // Check if this task might finish after this executor is decommissioned.
-            // We estimate the task's finish time by using the median task duration.
-            // Whereas the time when the executor might be decommissioned is estimated using the
-            // config executorDecommissionKillInterval. If the task is going to finish after
-            // decommissioning, then we will eagerly speculate the task.
-            val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime + medianDuration
-            val executorDecomTime = decomState.get.startTime + executorDecommissionKillInterval.get
-            val canExceedDeadline = executorDecomTime < taskEndTimeBasedOnMedianDuration
-            if (canExceedDeadline) {
-              speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
-            }
-          }
-        }
-        foundTasks |= speculated
-      }
-    } else if (speculationTaskDurationThresOpt.isDefined && speculationTasksLessEqToSlots) {
-      val time = clock.getTimeMillis()
+      foundTasks = checkAndSubmitSpeculatableTasks(timeMs, threshold)
+    } else if (isSpeculationThresholdSpecified && speculationTasksLessEqToSlots) {
       val threshold = speculationTaskDurationThresOpt.get
       logDebug(s"Tasks taking longer time than provided speculation threshold: $threshold")
-      for (tid <- runningTasksSet) {
-        foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
+      foundTasks = checkAndSubmitSpeculatableTasks(timeMs, threshold, customizedThreshold = true)
+    }
+    // avoid more warning logs.
+    if (foundTasks) {
+      val elapsedMs = clock.getTimeMillis() - timeMs
+      if (elapsedMs > minTimeToSpeculation) {
+        logWarning(s"Time to checkSpeculatableTasks ${elapsedMs}ms > ${minTimeToSpeculation}ms")
       }
     }
     foundTasks
@@ -1218,6 +1258,55 @@ private[spark] class TaskSetManager(
   def executorAdded(): Unit = {
     recomputeLocality()
   }
+
+  /**
+   * A class for checking inefficient tasks to be speculated, a task is inefficient when its data
+   * process rate is less than the average data process rate of all successful tasks in the stage
+   * multiplied by a multiplier.
+   */
+  private[TaskSetManager] class TaskProcessRateCalculator {
+    private var totalRecordsRead = 0L
+    private var totalExecutorRunTime = 0L
+    private var avgTaskProcessRate = Double.MaxValue
+    private val runningTasksProcessRate = new ConcurrentHashMap[Long, Double]()
+
+    private[TaskSetManager] def getAvgTaskProcessRate(): Double = {
+      avgTaskProcessRate
+    }
+
+    private[TaskSetManager] def getRunningTasksProcessRate(taskId: Long): Double = {
+      runningTasksProcessRate.getOrDefault(taskId, 0.0)
+    }
+
+    private[TaskSetManager] def updateAvgTaskProcessRate(
+        taskId: Long,
+        result: DirectTaskResult[_]): Unit = {
+      var recordsRead = 0L
+      var executorRunTime = 0L
+      result.accumUpdates.foreach { a =>
+        if (a.name == Some(shuffleRead.RECORDS_READ) ||
+          a.name == Some(input.RECORDS_READ)) {
+          val acc = a.asInstanceOf[LongAccumulator]
+          recordsRead += acc.value
+        } else if (a.name == Some(InternalAccumulator.EXECUTOR_RUN_TIME)) {
+          val acc = a.asInstanceOf[LongAccumulator]
+          executorRunTime = acc.value
+        }
+      }
+      totalRecordsRead += recordsRead
+      totalExecutorRunTime += executorRunTime
+      if (totalRecordsRead > 0 && totalExecutorRunTime > 0) {
+        avgTaskProcessRate = sched.getTaskProcessRate(totalRecordsRead, totalExecutorRunTime)
+      }
+      runningTasksProcessRate.remove(taskId)
+    }
+
+    private[scheduler] def updateRunningTaskProcessRate(
+        taskId: Long,
+        taskProcessRate: Double): Unit = {
+      runningTasksProcessRate.put(taskId, taskProcessRate)
+    }
+  }
 }
 
 private[spark] object TaskSetManager {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index f21daa1aea6..c1c62ea4b85 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.scheduler
 
+import java.nio.ByteBuffer
 import java.util.{Properties, Random}
 
 import scala.collection.mutable
@@ -32,6 +33,7 @@ import org.scalatest.PrivateMethodTester
 import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.{FakeSchedulerBackend => _, _}
+import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
 import org.apache.spark.internal.config.Tests.SKIP_VALIDATE_CORES_TESTING
@@ -203,6 +205,9 @@ class TaskSetManagerSuite
 
   val LOCALITY_WAIT_MS = conf.get(config.LOCALITY_WAIT)
   val MAX_TASK_FAILURES = 4
+  val SUBMISSION_TIME = 0L
+  val RUNTIME = 20 * 1000
+  val RECORDS_NUM = 10000L
 
   var sched: FakeTaskScheduler = null
 
@@ -2245,6 +2250,233 @@ class TaskSetManagerSuite
     assert(sched.speculativeTasks.size == 1)
   }
 
+  private def createTaskMetrics(
+       taskSet: TaskSet,
+       inefficientTaskIds: Set[Int],
+       efficientMultiplier: Double = 0.6): Array[TaskMetrics] = {
+    taskSet.tasks.zipWithIndex.map { case (task, index) =>
+      val metrics = task.metrics
+      if (inefficientTaskIds.contains(index)) {
+        updateAndGetTaskMetrics(metrics, efficientMultiplier)
+      } else {
+        updateAndGetTaskMetrics(metrics, 1)
+      }
+    }
+  }
+
+  private def updateAndGetTaskMetrics(
+      taskMetrics: TaskMetrics,
+      efficientMultiplier: Double): TaskMetrics = {
+    taskMetrics.inputMetrics.setRecordsRead((efficientMultiplier * RECORDS_NUM).toLong)
+    taskMetrics.shuffleReadMetrics.setRecordsRead((efficientMultiplier * RECORDS_NUM).toLong)
+    taskMetrics.setExecutorRunTime(RUNTIME)
+    taskMetrics
+  }
+
+  test("SPARK-32170: test speculation for TaskSet with single task") {
+    val conf = new SparkConf()
+      .set(config.SPECULATION_ENABLED, true)
+    sc = new SparkContext("local", "test", conf)
+    Seq(0, 15).foreach { duration =>
+      sc.conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, duration.toString)
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val numTasks = 1
+      val taskSet = FakeTask.createTaskSet(numTasks)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
+      for ((k, v) <- List("exec1" -> "host1")) {
+        val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+        assert(taskOption.isDefined)
+        val task = taskOption.get
+        sched.taskIdToTaskSetManager.put(task.taskId, manager)
+      }
+      clock.advance(RUNTIME)
+      // runtimeMs(20s) > 15s(1 * 15s)
+      if (duration <= 0) {
+        assert(!manager.checkSpeculatableTasks(0))
+        assert(sched.speculativeTasks.toSet === Set.empty)
+      } else {
+        assert(manager.checkSpeculatableTasks(0))
+        assert(sched.speculativeTasks.toSet === Set(0))
+      }
+    }
+  }
+
+  test("SPARK-32170: test SPECULATION_MIN_THRESHOLD for speculating inefficient tasks") {
+    // set the speculation multiplier to be 0, so speculative tasks are launched based on
+    // minTimeToSpeculation parameter to checkSpeculatableTasks
+    val conf = new SparkConf()
+      .set(config.SPECULATION_MULTIPLIER, 0.0)
+      .set(config.SPECULATION_ENABLED, true)
+    sc = new SparkContext("local", "test", conf)
+    val ser = sc.env.closureSerializer.newInstance()
+    val speculativeAllDurations = Set(0)
+    val speculativeInefficientDurations = Set(10000)
+    val nonSpeculativeDurations = Set(50000)
+    (speculativeAllDurations ++ speculativeInefficientDurations
+      ++ nonSpeculativeDurations).foreach { minDuration =>
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val taskSet = FakeTask.createTaskSet(5)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
+      val taskMetricsByTask = createTaskMetrics(taskSet, Set(3), efficientMultiplier = 0.4)
+      val blockManagerId = BlockManagerId("exec1", "localhost", 12345)
+      // offer resources for 5 tasks to start
+      for ((k, v) <- List(
+        "exec1" -> "host1",
+        "exec1" -> "host1",
+        "exec1" -> "host1",
+        "exec2" -> "host2",
+        "exec2" -> "host2")) {
+        val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+        assert(taskOption.isDefined)
+        val task = taskOption.get
+        sched.taskIdToTaskSetManager.put(task.taskId, manager)
+      }
+      clock.advance(RUNTIME)
+      // complete the 3 tasks and leave 2 task in running
+      val task3Metrics: TaskMetrics =
+        ser.deserialize(ByteBuffer.wrap(ser.serialize(taskMetricsByTask(3)).array()))
+      sched.executorHeartbeatReceived("exec1", Array((3, task3Metrics.internalAccums)),
+        blockManagerId, mutable.Map.empty[(Int, Int), ExecutorMetrics])
+
+      updateAndGetTaskMetrics(taskMetricsByTask(4), efficientMultiplier = 5)
+      val task4Metrics: TaskMetrics =
+        ser.deserialize(ByteBuffer.wrap(ser.serialize(taskMetricsByTask(4)).array()))
+      sched.executorHeartbeatReceived("exec1", Array((4, task4Metrics.internalAccums)),
+        blockManagerId, mutable.Map.empty[(Int, Int), ExecutorMetrics])
+      for (id <- Set(0, 1, 2)) {
+        val resultBytes = ser.serialize(createTaskResult(id, taskMetricsByTask(id).internalAccums))
+        sched.statusUpdate(tid = id, state = TaskState.FINISHED, serializedData = resultBytes)
+        eventually(timeout(1.second), interval(10.milliseconds)) {
+          assert(sched.endedTasks(id) === Success)
+        }
+      }
+      // 1) when SPECULATION_MIN_THRESHOLD is equal 0s, both the task 3 and the task 4 will be
+      // speculated by previous strategy.
+      // 2) when SPECULATION_MIN_THRESHOLD is equal 10s and runtime(20s) is above 10s, the task 3
+      //  will be evaluated an inefficient task to speculate, but the task 4 will not.
+      // 3) when SPECULATION_MIN_THRESHOLD is equal 50s, the task 3 and the task 4 runtime(20s) is
+      // less than (50s) and no needs to speculate at all.
+      if (speculativeAllDurations.contains(minDuration)) {
+        assert(manager.checkSpeculatableTasks(minDuration))
+        assert(sched.speculativeTasks.toSet === Set(3, 4))
+      } else if (speculativeInefficientDurations.contains(minDuration)) {
+        assert(manager.checkSpeculatableTasks(minDuration))
+        assert(sched.speculativeTasks.toSet === Set(3))
+      } else {
+        assert(!manager.checkSpeculatableTasks(minDuration))
+        assert(sched.speculativeTasks.toSet === Set.empty)
+      }
+    }
+  }
+
+  test("SPARK-32170: test SPECULATION_EFFICIENCY_TASK_PROCESS_RATE_MULTIPLIER for " +
+    "speculating inefficient tasks") {
+    // set the speculation multiplier to be 0, so speculative tasks are launched based on
+    // minTimeToSpeculation parameter to checkSpeculatableTasks
+    val conf = new SparkConf()
+      .set(config.SPECULATION_MULTIPLIER, 0.0)
+      .set(config.SPECULATION_ENABLED, true)
+    sc = new SparkContext("local", "test", conf)
+    val ser = sc.env.closureSerializer.newInstance()
+    Seq(0.5, 0.8).foreach(processMultiplier => {
+      sc.conf.set(config.SPECULATION_EFFICIENCY_TASK_PROCESS_RATE_MULTIPLIER, processMultiplier)
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val taskSet = FakeTask.createTaskSet(4)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
+      val taskMetricsByTask = createTaskMetrics(taskSet, Set(3), efficientMultiplier = 0.6)
+      val blockManagerId = BlockManagerId("exec1", "localhost", 12345)
+      // offer resources for 4 tasks to start
+      for ((k, v) <- List(
+        "exec1" -> "host1",
+        "exec1" -> "host1",
+        "exec2" -> "host2",
+        "exec2" -> "host2")) {
+        val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+        assert(taskOption.isDefined)
+        val task = taskOption.get
+        sched.taskIdToTaskSetManager.put(task.taskId, manager)
+      }
+      clock.advance(RUNTIME)
+      // complete the 3 tasks and leave 1 task in running
+      val taskMetrics: TaskMetrics =
+        ser.deserialize(ByteBuffer.wrap(ser.serialize(taskMetricsByTask(3)).array()))
+      sched.executorHeartbeatReceived("exec1", Array((3, taskMetrics.internalAccums)),
+        blockManagerId, mutable.Map.empty[(Int, Int), ExecutorMetrics])
+      for (id <- Set(0, 1, 2)) {
+        val resultBytes = ser.serialize(createTaskResult(id, taskMetricsByTask(id).internalAccums))
+        sched.statusUpdate(tid = id, state = TaskState.FINISHED, serializedData = resultBytes)
+        eventually(timeout(1.second), interval(10.milliseconds)) {
+          assert(sched.endedTasks(id) === Success)
+        }
+      }
+      // 0.5 < 0.6 < 0.8
+      if (processMultiplier == 0.8) {
+        assert(manager.checkSpeculatableTasks(15000))
+        assert(sched.speculativeTasks.toSet === Set(3))
+      } else {
+        assert(!manager.checkSpeculatableTasks(15000))
+        assert(sched.speculativeTasks.toSet === Set.empty)
+      }
+    })
+  }
+
+  test("SPARK-32170: test SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR for " +
+    "speculating tasks") {
+    // set the speculation multiplier to be 0, so speculative tasks are launched based on
+    // minTimeToSpeculation parameter to checkSpeculatableTasks
+    val conf = new SparkConf()
+      .set(config.SPECULATION_MULTIPLIER, 0.0)
+      .set(config.SPECULATION_ENABLED, true)
+      .set(config.SPECULATION_EFFICIENCY_TASK_PROCESS_RATE_MULTIPLIER, 0.5)
+    sc = new SparkContext("local", "test", conf)
+    val ser = sc.env.closureSerializer.newInstance()
+    Seq(1.0, 2.0).foreach(factor => {
+      sc.conf.set(config.SPECULATION_EFFICIENCY_TASK_DURATION_FACTOR, factor)
+      sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+      val taskSet = FakeTask.createTaskSet(4)
+      val clock = new ManualClock()
+      val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
+      val taskMetricsByTask = createTaskMetrics(taskSet, Set(3), efficientMultiplier = 0.6)
+      val blockManagerId = BlockManagerId("exec1", "localhost", 12345)
+      // offer resources for 4 tasks to start
+      for ((k, v) <- List(
+        "exec1" -> "host1",
+        "exec1" -> "host1",
+        "exec2" -> "host2",
+        "exec2" -> "host2")) {
+        val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
+        assert(taskOption.isDefined)
+        val task = taskOption.get
+        sched.taskIdToTaskSetManager.put(task.taskId, manager)
+      }
+      clock.advance(RUNTIME)
+      // complete the 3 tasks and leave 1 task in running
+      val taskMetrics: TaskMetrics =
+        ser.deserialize(ByteBuffer.wrap(ser.serialize(taskMetricsByTask(3)).array()))
+      sched.executorHeartbeatReceived("exec1", Array((3, taskMetrics.internalAccums)),
+        blockManagerId, mutable.Map.empty[(Int, Int), ExecutorMetrics])
+      for (id <- Set(0, 1, 2)) {
+        val resultBytes = ser.serialize(createTaskResult(id, taskMetricsByTask(id).internalAccums))
+        sched.statusUpdate(tid = id, state = TaskState.FINISHED, serializedData = resultBytes)
+        eventually(timeout(1.second), interval(10.milliseconds)) {
+          assert(sched.endedTasks(id) === Success)
+        }
+      }
+      // runtimeMs(20s) > 15s(1 * 15s)
+      if (factor == 1.0) {
+        assert(manager.checkSpeculatableTasks(15000))
+        assert(sched.speculativeTasks.toSet === Set(3))
+      } else {
+        // runtimeMs(20s) < 30s(2 * 15s)
+        assert(!manager.checkSpeculatableTasks(15000))
+        assert(sched.speculativeTasks.toSet === Set.empty)
+      }
+    })
+  }
+
   test("SPARK-37580: Reset numFailures when one of task attempts succeeds") {
     sc = new SparkContext("local", "test")
     // Set the speculation multiplier to be 0 so speculative tasks are launched immediately
diff --git a/docs/configuration.md b/docs/configuration.md
index a6d2a8b9d52..fd189aa88b6 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -2476,6 +2476,41 @@ Apart from these, the following properties are also available, and may be useful
   </td>
   <td>3.0.0</td>
 </tr>
+<tr>
+  <td><code>spark.speculation.efficiency.processRateMultiplier</code></td>
+  <td>0.75</td>
+  <td>
+    A multiplier that used when evaluating inefficient tasks. The higher the multiplier
+    is, the more tasks will be possibly considered as inefficient.
+  </td>
+  <td>3.4.0</td>
+</tr>
+<tr>
+  <td><code>spark.speculation.efficiency.longRunTaskFactor</code></td>
+  <td>2</td>
+  <td>
+    A task will be speculated anyway as long as its duration has exceeded the value of multiplying
+    the factor and the time threshold (either be <code>spark.speculation.multiplier</code>
+    * successfulTaskDurations.median or <code>spark.speculation.minTaskRuntime</code>) regardless
+    of it's data process rate is good or not. This avoids missing the inefficient tasks when task
+    slow isn't related to data process rate.
+  </td>
+  <td>3.4.0</td>
+</tr>
+<tr>
+  <td><code>spark.speculation.efficiency.enabled</code></td>
+  <td>true</td>
+  <td>
+    When set to true, spark will evaluate the efficiency of task processing through the stage task
+    metrics or its duration, and only need to speculate the inefficient tasks. A task is inefficient
+    when 1)its data process rate is less than the average data process rate of all successful tasks
+    in the stage multiplied by a multiplier or 2)its duration has exceeded the value of multiplying
+     <code>spark.speculation.efficiency.longRunTaskFactor</code> and the time threshold (either be
+     <code>spark.speculation.multiplier</code> * successfulTaskDurations.median or
+    <code>spark.speculation.minTaskRuntime</code>).
+  </td>
+  <td>3.4.0</td>
+</tr>
 <tr>
   <td><code>spark.task.cpus</code></td>
   <td>1</td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org