You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2022/11/17 07:46:54 UTC
[spark] branch master updated: [SPARK-40979][CORE] Keep removed executor info due to decommission
This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a428e44dbc3 [SPARK-40979][CORE] Keep removed executor info due to decommission
a428e44dbc3 is described below
commit a428e44dbc3795454ed8f96e1b165e6028f939f3
Author: Warren Zhu <wa...@gmail.com>
AuthorDate: Thu Nov 17 01:46:43 2022 -0600
[SPARK-40979][CORE] Keep removed executor info due to decommission
### What changes were proposed in this pull request?
Keep removed executor info due to decommission in a separate map. Keep the max map size as 10000 to avoid excessive memory usage.
### Why are the changes needed?
Get info about whether executor is decommission even after this executor is removed
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Updated test in `TaskSchedulerImplSuite`
Closes #38441 from warrenzhu25/ignore-followup.
Authored-by: Warren Zhu <wa...@gmail.com>
Signed-off-by: Mridul <mridul<at>gmail.com>
---
.../org/apache/spark/internal/config/package.scala | 11 +++++++
.../org/apache/spark/scheduler/DAGScheduler.scala | 8 +++--
.../apache/spark/scheduler/TaskSchedulerImpl.scala | 13 +++++++-
.../spark/scheduler/TaskSchedulerImplSuite.scala | 36 +++++++++++++++++++++-
4 files changed, 63 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index ad899d7dfd6..eb6ac8b765b 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2231,6 +2231,17 @@ package object config {
.booleanConf
.createWithDefault(false)
+ private[spark] val SCHEDULER_MAX_RETAINED_REMOVED_EXECUTORS =
+ ConfigBuilder("spark.scheduler.maxRetainedRemovedDecommissionExecutors")
+ .internal()
+ .doc("Max number of removed executors by decommission to retain. This affects " +
+ "whether fetch failure caused by removed decommissioned executors could be ignored " +
+ s"when ${STAGE_IGNORE_DECOMMISSION_FETCH_FAILURE.key} is enabled.")
+ .version("3.4.0")
+ .intConf
+ .checkValue(_ >= 0, "needs to be a non-negative value")
+ .createWithDefault(0)
+
private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
ConfigBuilder("spark.shuffle.push.enabled")
.doc("Set to true to enable push-based shuffle on the client side and this works in " +
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index fb3512619d8..c55d44dfd59 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -2193,9 +2193,11 @@ private[spark] class DAGScheduler(
* Return true when:
* 1. Waiting for decommission start
* 2. Under decommission process
- * Return false when:
- * 1. Stopped or terminated after finishing decommission
- * 2. Under decommission process, then removed by driver with other reasons
+ * 3. Stopped or terminated after finishing decommission
+ * 4. Under decommission process, then removed by driver with other reasons
+ * Return false in case 3 and 4 when removed executors info are not retained.
+ * The max size of removed executors is controlled by
+ * spark.scheduler.maxRetainedRemovedExecutors
*/
private[scheduler] def isExecutorDecommissioningOrDecommissioned(
taskScheduler: TaskScheduler, bmAddress: BlockManagerId): Boolean = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 80b66c4f675..4580ec53289 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -26,6 +26,8 @@ import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, HashSet}
import scala.util.Random
+import com.google.common.cache.CacheBuilder
+
import org.apache.spark._
import org.apache.spark.InternalAccumulator.{input, shuffleRead}
import org.apache.spark.TaskState.TaskState
@@ -145,6 +147,13 @@ private[spark] class TaskSchedulerImpl(
// continue to run even after being asked to decommission, but they will eventually exit.
val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionState]
+ // Keep removed executors due to decommission, so getExecutorDecommissionState
+ // still return correct value even after executor is lost
+ val executorsRemovedByDecom =
+ CacheBuilder.newBuilder()
+ .maximumSize(conf.get(SCHEDULER_MAX_RETAINED_REMOVED_EXECUTORS))
+ .build[String, ExecutorDecommissionState]()
+
def runningTasksByExecutors: Map[String, Int] = synchronized {
executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap
}
@@ -1022,7 +1031,8 @@ private[spark] class TaskSchedulerImpl(
override def getExecutorDecommissionState(executorId: String)
: Option[ExecutorDecommissionState] = synchronized {
- executorsPendingDecommission.get(executorId)
+ executorsPendingDecommission.get(executorId).orElse(
+ Option(executorsRemovedByDecom.getIfPresent(executorId)))
}
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
@@ -1118,6 +1128,7 @@ private[spark] class TaskSchedulerImpl(
}
executorsPendingDecommission.remove(executorId)
+ .foreach(executorsRemovedByDecom.put(executorId, _))
if (reason != LossReasonPending) {
executorIdToHost -= executorId
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index b81f85bd1d7..eec5449bc72 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -1930,13 +1930,15 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
assert(taskSlotsForRp === 4)
}
- private def setupSchedulerForDecommissionTests(clock: Clock, numTasks: Int): TaskSchedulerImpl = {
+ private def setupSchedulerForDecommissionTests(clock: Clock, numTasks: Int,
+ extraConf: Map[String, String] = Map.empty): TaskSchedulerImpl = {
// one task per host
val numHosts = numTasks
val conf = new SparkConf()
.setMaster(s"local[$numHosts]")
.setAppName("TaskSchedulerImplSuite")
.set(config.CPUS_PER_TASK.key, "1")
+ .setAll(extraConf)
sc = new SparkContext(conf)
val maxTaskFailures = sc.conf.get(config.TASK_MAX_FAILURES)
taskScheduler = new TaskSchedulerImpl(sc, maxTaskFailures, clock = clock) {
@@ -2019,6 +2021,38 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
assert(manager.copiesRunning.take(2) === Array(1, 1))
}
+ test("SPARK-40979: Keep removed executor info due to decommission") {
+ val clock = new ManualClock(10000L)
+ val scheduler = setupSchedulerForDecommissionTests(clock, 2,
+ Map(config.SCHEDULER_MAX_RETAINED_REMOVED_EXECUTORS.key -> "1"))
+ val manager = stageToMockTaskSetManager(0)
+ // The task started should be running.
+ assert(manager.copiesRunning.take(2) === Array(1, 1))
+
+ // executor 1 is decommissioned before loosing
+ scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", None))
+ assert(scheduler.getExecutorDecommissionState("executor1").isDefined)
+
+ // executor1 is eventually lost
+ scheduler.executorLost("executor1", ExecutorExited(0, false, "normal"))
+ assert(scheduler.getExecutorDecommissionState("executor1").isDefined)
+
+ // executor 0 is decommissioned before loosing
+ scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("", None))
+ scheduler.executorLost("executor0", ExecutorExited(0, false, "normal"))
+
+ // Only last removed executor is kept as size of removed decommission executors is 1
+ assert(scheduler.getExecutorDecommissionState("executor0").isDefined)
+ assert(scheduler.getExecutorDecommissionState("executor1").isEmpty)
+
+ // Now give it some resources and both tasks should be rerun
+ val taskDescriptions = taskScheduler.resourceOffers(IndexedSeq(
+ WorkerOffer("executor2", "host2", 1), WorkerOffer("executor3", "host3", 1))).flatten
+ assert(taskDescriptions.size === 2)
+ assert(taskDescriptions.map(_.index).sorted == Seq(0, 1))
+ assert(manager.copiesRunning.take(2) === Array(1, 1))
+ }
+
test("SPARK-24818: test delay scheduling for barrier TaskSetManager") {
val clock = new ManualClock()
val conf = new SparkConf().set(config.LEGACY_LOCALITY_WAIT_RESET, false)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org