You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by wu...@apache.org on 2022/08/24 12:49:28 UTC

[spark] branch master updated: [SPARK-39957][CORE] Delay onDisconnected to enable Driver receives ExecutorExitCode

This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e051ab7ba1 [SPARK-39957][CORE] Delay onDisconnected to enable Driver receives ExecutorExitCode
0e051ab7ba1 is described below

commit 0e051ab7ba1366e203ffd12f80e34d900c382dd5
Author: Kai-Hsun Chen <ka...@apache.org>
AuthorDate: Wed Aug 24 20:48:57 2022 +0800

    [SPARK-39957][CORE] Delay onDisconnected to enable Driver receives ExecutorExitCode
    
    ### What changes were proposed in this pull request?
    When onDisconnected is triggered,
    
    (1) Delay `RemoveExecutor` for 5 seconds to enable driver receives ExecutorExitCode from slow path
    (2) Prevent task scheduler from assigning tasks on the lost executor. (By adding the executor to `executorsPendingLossReason`)
    
    ### Why are the changes needed?
    There are two methods to detect executor loss.
    
    (1) (fast path) `onDisconnected` Executor -> Driver:
    When Executor closes its JVM, the socket (Netty's channel) will be closed. The function onDisconnected will be triggered when it knows the channel is closed.
    
    (2) (slow path) ExecutorRunner -> Worker -> Master -> Driver (See #37385  for details)
    When executor exits with ExecutorExitCode, the exit code will be passed from ExecutorRunner to Driver.
    
    Because fast path determines the executor loss without the information of ExecutorExitCode, these two methods may categorize same cases into different conclusions. For example, when Executor exits with ExecutorExitCode HEARTBEAT_FAILURE, onDisconnected will consider the executor loss as a task failure, but slow path will consider it as a network failure. Obviously, HEARTBEAT_FAILURE is a network failure.
    
    [Notice]
    For more details about ExecutorExitCode, check #37385 for more details.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    ```bash
    build/sbt "core/testOnly *SparkContextSuite -- -z ExitCode"
    ```
    
    Closes #37400 from kevin85421/SPARK-39957.
    
    Lead-authored-by: Kai-Hsun Chen <ka...@apache.org>
    Co-authored-by: Kai-Hsun Chen <ka...@databricks.com>
    Signed-off-by: Yi Wu <yi...@databricks.com>
---
 .../org/apache/spark/internal/config/package.scala |  9 +++
 .../cluster/CoarseGrainedSchedulerBackend.scala    |  2 +-
 .../cluster/StandaloneSchedulerBackend.scala       | 83 ++++++++++++++++++++--
 .../apache/spark/ExternalShuffleServiceSuite.scala |  2 +
 .../scala/org/apache/spark/SparkContextSuite.scala | 45 ++++++++++++
 5 files changed, 136 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 59d7acc6764..9d1a56843ca 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2413,4 +2413,13 @@ package object config {
       .version("3.3.0")
       .intConf
       .createWithDefault(5)
+
+  private[spark] val EXECUTOR_REMOVE_DELAY =
+    ConfigBuilder("spark.standalone.executorRemoveDelayOnDisconnection")
+      .internal()
+      .doc("The timeout duration for a disconnected executor to wait for the specific disconnect" +
+        "reason before it gets removed. This is only used for Standalone yet.")
+      .version("3.4.0")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("5s")
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 61d67765c8c..f775ee874d0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -97,7 +97,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   private[scheduler] val executorsPendingToRemove = new HashMap[String, Boolean]
 
   // Executors that have been lost, but for which we don't yet know the real exit reason.
-  private val executorsPendingLossReason = new HashSet[String]
+  protected val executorsPendingLossReason = new HashSet[String]
 
   // Executors which are being decommissioned. Maps from executorId to workerHost.
   protected val executorsPendingDecommission = new HashMap[String, Option[String]]
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index befc59b8216..9f71a0fe58c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.scheduler.cluster
 
 import java.util.Locale
-import java.util.concurrent.Semaphore
+import java.util.concurrent.{Semaphore, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.concurrent.Future
@@ -26,13 +26,16 @@ import scala.concurrent.Future
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.deploy.{ApplicationDescription, Command}
 import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener}
+import org.apache.spark.executor.ExecutorExitCode
 import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.internal.config.EXECUTOR_REMOVE_DELAY
 import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
 import org.apache.spark.resource.ResourceProfile
-import org.apache.spark.rpc.RpcEndpointAddress
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress}
 import org.apache.spark.scheduler._
-import org.apache.spark.util.Utils
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
+import org.apache.spark.util.{ThreadUtils, Utils}
 
 /**
  * A [[SchedulerBackend]] implementation for Spark's standalone cluster manager.
@@ -61,6 +64,10 @@ private[spark] class StandaloneSchedulerBackend(
   private val totalExpectedCores = maxCores.getOrElse(0)
   private val defaultProf = sc.resourceProfileManager.defaultResourceProfile
 
+  private val executorDelayRemoveThread =
+    ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-executor-delay-remove-thread")
+  private val _executorRemoveDelay = conf.get(EXECUTOR_REMOVE_DELAY)
+
   override def start(): Unit = {
     super.start()
 
@@ -175,8 +182,13 @@ private[spark] class StandaloneSchedulerBackend(
       exitStatus: Option[Int],
       workerHost: Option[String]): Unit = {
     val reason: ExecutorLossReason = exitStatus match {
+      case Some(ExecutorExitCode.HEARTBEAT_FAILURE) =>
+        ExecutorExited(ExecutorExitCode.HEARTBEAT_FAILURE, exitCausedByApp = false, message)
+      case Some(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) =>
+        ExecutorExited(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR,
+          exitCausedByApp = false, message)
       case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
-      case None => ExecutorProcessLost(message, workerHost)
+      case None => ExecutorProcessLost(message, workerHost, causedByApp = workerHost.isEmpty)
     }
     logInfo("Executor %s removed: %s".format(fullId, message))
     removeExecutor(fullId.split("/")(1), reason)
@@ -257,6 +269,7 @@ private[spark] class StandaloneSchedulerBackend(
   private def stop(finalState: SparkAppHandle.State): Unit = {
     if (stopping.compareAndSet(false, true)) {
       try {
+        executorDelayRemoveThread.shutdownNow()
         super.stop()
         if (client != null) {
           client.stop()
@@ -272,4 +285,66 @@ private[spark] class StandaloneSchedulerBackend(
     }
   }
 
+  override def createDriverEndpoint(): DriverEndpoint = {
+    new StandaloneDriverEndpoint()
+  }
+
+  private class StandaloneDriverEndpoint extends DriverEndpoint {
+    // [SC-104659]: There are two paths to detect executor loss.
+    // (1) (fast path) `onDisconnected`: Executor -> Driver
+    //     When Executor closes its JVM, the socket (Netty's channel) will be closed. The
+    //     function onDisconnected will be triggered when driver knows the channel is closed.
+    //
+    // (2) (slow path) ExecutorRunner -> Worker -> Master -> Driver
+    //     When executor exits with ExecutorExitCode, the exit code will be passed from
+    //     ExecutorRunner to Driver. (Check [SC-104335] PR for details)
+    //
+    // Both path will call the function `removeExecutor` to remove the lost executor. The main
+    // difference between these two paths is ExecutorExitCode. To elaborate, the ExecutorLossReason
+    // of slow path has the information of ExecutorExitCode, but fast path does not have. Hence,
+    // slow path can determine the category of the executor loss with more information.
+    //
+    // Typically, fast path will be triggered prior to slow path. That is, when driver receives the
+    // ExecutorExitCode from slow path, the lost executor has already been removed from
+    // executorDataMap by fast path. Hence, we delay to send RemoveExecutor(executorId, lossReason)
+    // by _executorRemoveDelay milliseconds when the function onDisconnected is triggered, and hope
+    // to receive ExecutorExitCode from slow path during the delay.
+    override def onDisconnected(remoteAddress: RpcAddress): Unit = {
+      addressToExecutorId.get(remoteAddress).foreach { executorId =>
+        // [SC-104659]:
+        // When driver detects executor loss by fast path (`onDisconnected`), we need to notify
+        // task scheduler to avoid assigning new tasks on this lost executor and wait slow path
+        // for `_executorRemoveDelay` seconds. To prevent assigning tasks to the lost executor,
+        // we added the executor to `executorsPendingLossReason`. Hence, the executor will be
+        // filtered out from `activeExecutors` in the function `getWorkerOffers`.
+        executorsPendingLossReason += executorId
+        val lossReason = ExecutorProcessLost("Remote RPC client disassociated. Likely due to " +
+          "containers exceeding thresholds, or network issues. Check driver logs for WARN " +
+          "messages.")
+        val removeExecutorTask = new Runnable() {
+          override def run(): Unit = Utils.tryLogNonFatalError {
+            // If the executor is not removed by slow path, fast path will send a `RemoveExecutor`
+            // message to the scheduler backend.
+            //
+            // [Note]: Here may have race condition because `executorsPendingLossReason` will be
+            //         operated in the following 3 cases for standalone scheduler.
+            //
+            //  1. `removeExecutor`: executorsPendingLossReason -= executorId (remove)
+            //  2. `onDisconnected`: executorsPendingLossReason += executorId (add)
+            //  3. `executorDelayRemoveThread`: executorsPendingLossReason.contains(executorId)
+            //
+            // Case 1 & case 3 may have race condition. Case 2 & case 3 may also have. However,
+            // race condition is okay because `removeExecutor` will check whether the executor is
+            // existing or not. If the executor has been removed, the extra `RemoveExecutor`
+            // message will have no effectiveness.
+            if (executorsPendingLossReason.contains(executorId)) {
+              driverEndpoint.send(RemoveExecutor(executorId, lossReason))
+            }
+          }
+        }
+        executorDelayRemoveThread.schedule(removeExecutorTask,
+          _executorRemoveDelay, TimeUnit.MILLISECONDS)
+      }
+    }
+  }
 }
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index 1ca78d572c7..ef75a7e5e77 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -111,6 +111,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi
     val confWithRddFetchEnabled = conf.clone
       .set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true)
       .set(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
+      .set(config.EXECUTOR_REMOVE_DELAY.key, "0s")
     sc = new SparkContext("local-cluster[1,1,1024]", "test", confWithRddFetchEnabled)
     sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
     sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient])
@@ -183,6 +184,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi
       val confWithLocalDiskReading = conf.clone
         .set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true)
         .set(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED, enabled)
+        .set(config.EXECUTOR_REMOVE_DELAY.key, "0s")
       sc = new SparkContext("local-cluster[1,1,1024]", "test", confWithLocalDiskReading)
       sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
       sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient])
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index c64a4371911..f9869d35382 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -37,6 +37,7 @@ import org.scalatest.concurrent.Eventually
 import org.scalatest.matchers.must.Matchers._
 
 import org.apache.spark.TestUtils._
+import org.apache.spark.executor.ExecutorExitCode
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Tests._
 import org.apache.spark.internal.config.UI._
@@ -1354,6 +1355,50 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
     }.getMessage
     assert(msg.contains("Cannot use the keyword 'proxy' or 'history' in reverse proxy URL"))
   }
+
+  test("SPARK-39957: ExitCode HEARTBEAT_FAILURE should be counted as network failure") {
+    // This test is used to prove that driver will receive executorExitCode before onDisconnected
+    // removes the executor. If the executor is removed by onDisconnected, the executor loss will be
+    // considered as a task failure. Spark will throw a SparkException because TASK_MAX_FAILURES is
+    // 1. On the other hand, driver removes executor with exitCode HEARTBEAT_FAILURE, the loss
+    // should be counted as network failure, and thus the job should not throw SparkException.
+
+    val conf = new SparkConf().set(TASK_MAX_FAILURES, 1)
+    val sc = new SparkContext("local-cluster[1, 1, 1024]", "test-exit-code-heartbeat", conf)
+    val result = sc.parallelize(1 to 10, 1).map { x =>
+      val context = org.apache.spark.TaskContext.get()
+      if (context.taskAttemptId() == 0) {
+        System.exit(ExecutorExitCode.HEARTBEAT_FAILURE)
+      } else {
+        x
+      }
+    }.count()
+    assert(result == 10L)
+    sc.stop()
+  }
+
+  test("SPARK-39957: ExitCode HEARTBEAT_FAILURE will be counted as task failure when" +
+    "EXECUTOR_REMOVE_DELAY is disabled") {
+    // If the executor is removed by onDisconnected, the executor loss will be considered as a task
+    // failure. Spark will throw a SparkException because TASK_MAX_FAILURES is 1.
+
+    val conf = new SparkConf().set(TASK_MAX_FAILURES, 1).set(EXECUTOR_REMOVE_DELAY.key, "0s")
+    val sc = new SparkContext("local-cluster[1, 1, 1024]", "test-exit-code-heartbeat", conf)
+    eventually(timeout(30.seconds), interval(1.seconds)) {
+      val e = intercept[SparkException] {
+        sc.parallelize(1 to 10, 1).map { x =>
+          val context = org.apache.spark.TaskContext.get()
+          if (context.taskAttemptId() == 0) {
+            System.exit(ExecutorExitCode.HEARTBEAT_FAILURE)
+          } else {
+            x
+          }
+        }.count()
+      }
+      assert(e.getMessage.contains("Remote RPC client disassociated"))
+    }
+    sc.stop()
+  }
 }
 
 object SparkContextSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org