You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2022/11/17 07:46:54 UTC

[spark] branch master updated: [SPARK-40979][CORE] Keep removed executor info due to decommission

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a428e44dbc3 [SPARK-40979][CORE] Keep removed executor info due to decommission
a428e44dbc3 is described below

commit a428e44dbc3795454ed8f96e1b165e6028f939f3
Author: Warren Zhu <wa...@gmail.com>
AuthorDate: Thu Nov 17 01:46:43 2022 -0600

    [SPARK-40979][CORE] Keep removed executor info due to decommission
    
    ### What changes were proposed in this pull request?
    Keep removed executor info due to decommission in a separate map. Keep the max map size as 10000 to avoid excessive memory usage.
    
    ### Why are the changes needed?
    Get info about whether executor is decommission even after this executor is removed
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Updated test in `TaskSchedulerImplSuite`
    
    Closes #38441 from warrenzhu25/ignore-followup.
    
    Authored-by: Warren Zhu <wa...@gmail.com>
    Signed-off-by: Mridul <mridul<at>gmail.com>
---
 .../org/apache/spark/internal/config/package.scala | 11 +++++++
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  8 +++--
 .../apache/spark/scheduler/TaskSchedulerImpl.scala | 13 +++++++-
 .../spark/scheduler/TaskSchedulerImplSuite.scala   | 36 +++++++++++++++++++++-
 4 files changed, 63 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index ad899d7dfd6..eb6ac8b765b 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2231,6 +2231,17 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val SCHEDULER_MAX_RETAINED_REMOVED_EXECUTORS =
+    ConfigBuilder("spark.scheduler.maxRetainedRemovedDecommissionExecutors")
+      .internal()
+      .doc("Max number of removed executors by decommission to retain. This affects " +
+        "whether fetch failure caused by removed decommissioned executors could be ignored " +
+        s"when ${STAGE_IGNORE_DECOMMISSION_FETCH_FAILURE.key} is enabled.")
+      .version("3.4.0")
+      .intConf
+      .checkValue(_ >= 0, "needs to be a non-negative value")
+      .createWithDefault(0)
+
   private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
     ConfigBuilder("spark.shuffle.push.enabled")
       .doc("Set to true to enable push-based shuffle on the client side and this works in " +
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index fb3512619d8..c55d44dfd59 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -2193,9 +2193,11 @@ private[spark] class DAGScheduler(
    * Return true when:
    *  1. Waiting for decommission start
    *  2. Under decommission process
-   * Return false when:
-   *  1. Stopped or terminated after finishing decommission
-   *  2. Under decommission process, then removed by driver with other reasons
+   *  3. Stopped or terminated after finishing decommission
+   *  4. Under decommission process, then removed by driver with other reasons
+   * Return false in case 3 and 4 when removed executors info are not retained.
+   * The max size of removed executors is controlled by
+   * spark.scheduler.maxRetainedRemovedExecutors
    */
   private[scheduler] def isExecutorDecommissioningOrDecommissioned(
       taskScheduler: TaskScheduler, bmAddress: BlockManagerId): Boolean = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 80b66c4f675..4580ec53289 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -26,6 +26,8 @@ import scala.collection.mutable
 import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, HashSet}
 import scala.util.Random
 
+import com.google.common.cache.CacheBuilder
+
 import org.apache.spark._
 import org.apache.spark.InternalAccumulator.{input, shuffleRead}
 import org.apache.spark.TaskState.TaskState
@@ -145,6 +147,13 @@ private[spark] class TaskSchedulerImpl(
   // continue to run even after being asked to decommission, but they will eventually exit.
   val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionState]
 
+  // Keep removed executors due to decommission, so getExecutorDecommissionState
+  // still return correct value even after executor is lost
+  val executorsRemovedByDecom =
+    CacheBuilder.newBuilder()
+      .maximumSize(conf.get(SCHEDULER_MAX_RETAINED_REMOVED_EXECUTORS))
+      .build[String, ExecutorDecommissionState]()
+
   def runningTasksByExecutors: Map[String, Int] = synchronized {
     executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap
   }
@@ -1022,7 +1031,8 @@ private[spark] class TaskSchedulerImpl(
 
   override def getExecutorDecommissionState(executorId: String)
     : Option[ExecutorDecommissionState] = synchronized {
-    executorsPendingDecommission.get(executorId)
+    executorsPendingDecommission.get(executorId).orElse(
+      Option(executorsRemovedByDecom.getIfPresent(executorId)))
   }
 
   override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
@@ -1118,6 +1128,7 @@ private[spark] class TaskSchedulerImpl(
     }
 
     executorsPendingDecommission.remove(executorId)
+      .foreach(executorsRemovedByDecom.put(executorId, _))
 
     if (reason != LossReasonPending) {
       executorIdToHost -= executorId
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index b81f85bd1d7..eec5449bc72 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -1930,13 +1930,15 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
     assert(taskSlotsForRp === 4)
   }
 
-  private def setupSchedulerForDecommissionTests(clock: Clock, numTasks: Int): TaskSchedulerImpl = {
+  private def setupSchedulerForDecommissionTests(clock: Clock, numTasks: Int,
+    extraConf: Map[String, String] = Map.empty): TaskSchedulerImpl = {
     // one task per host
     val numHosts = numTasks
     val conf = new SparkConf()
       .setMaster(s"local[$numHosts]")
       .setAppName("TaskSchedulerImplSuite")
       .set(config.CPUS_PER_TASK.key, "1")
+      .setAll(extraConf)
     sc = new SparkContext(conf)
     val maxTaskFailures = sc.conf.get(config.TASK_MAX_FAILURES)
     taskScheduler = new TaskSchedulerImpl(sc, maxTaskFailures, clock = clock) {
@@ -2019,6 +2021,38 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
     assert(manager.copiesRunning.take(2) === Array(1, 1))
   }
 
+  test("SPARK-40979: Keep removed executor info due to decommission") {
+    val clock = new ManualClock(10000L)
+    val scheduler = setupSchedulerForDecommissionTests(clock, 2,
+      Map(config.SCHEDULER_MAX_RETAINED_REMOVED_EXECUTORS.key -> "1"))
+    val manager = stageToMockTaskSetManager(0)
+    // The task started should be running.
+    assert(manager.copiesRunning.take(2) === Array(1, 1))
+
+    // executor 1 is decommissioned before loosing
+    scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", None))
+    assert(scheduler.getExecutorDecommissionState("executor1").isDefined)
+
+    // executor1 is eventually lost
+    scheduler.executorLost("executor1", ExecutorExited(0, false, "normal"))
+    assert(scheduler.getExecutorDecommissionState("executor1").isDefined)
+
+    // executor 0 is decommissioned before loosing
+    scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("", None))
+    scheduler.executorLost("executor0", ExecutorExited(0, false, "normal"))
+
+    // Only last removed executor is kept as size of removed decommission executors is 1
+    assert(scheduler.getExecutorDecommissionState("executor0").isDefined)
+    assert(scheduler.getExecutorDecommissionState("executor1").isEmpty)
+
+    // Now give it some resources and both tasks should be rerun
+    val taskDescriptions = taskScheduler.resourceOffers(IndexedSeq(
+      WorkerOffer("executor2", "host2", 1), WorkerOffer("executor3", "host3", 1))).flatten
+    assert(taskDescriptions.size === 2)
+    assert(taskDescriptions.map(_.index).sorted == Seq(0, 1))
+    assert(manager.copiesRunning.take(2) === Array(1, 1))
+  }
+
   test("SPARK-24818: test delay scheduling for barrier TaskSetManager") {
     val clock = new ManualClock()
     val conf = new SparkConf().set(config.LEGACY_LOCALITY_WAIT_RESET, false)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org