You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/08 22:59:45 UTC

[GitHub] [spark] kevin85421 commented on a diff in pull request #37411: [SPARK-39984][CORE] Check workerLastHeartbeat with master before HeartbeatReceiver expires an executor

kevin85421 commented on code in PR #37411:
URL: https://github.com/apache/spark/pull/37411#discussion_r940733267


##########
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:
##########
@@ -199,41 +222,131 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
     removeExecutor(executorRemoved.executorId)
   }
 
+  private def killExecutor(executorId: String, timeout: Long): Unit = {
+    logWarning(s"Removing executor $executorId with no recent heartbeats: " +
+      s"${timeout} ms exceeds timeout $executorTimeoutMs ms")
+    killExecutorThread.submit(new Runnable {
+      override def run(): Unit = Utils.tryLogNonFatalError {
+        // Note: we want to get an executor back after expiring this one,
+        // so do not simply call `sc.killExecutor` here (SPARK-8119)
+        sc.killAndReplaceExecutor(executorId)
+        // SPARK-27348: in case of the executors which are not gracefully shut down,
+        // we should remove lost executors from CoarseGrainedSchedulerBackend manually
+        // here to guarantee two things:
+        // 1) explicitly remove executor information from CoarseGrainedSchedulerBackend for
+        //    a lost executor instead of waiting for disconnect message
+        // 2) call scheduler.executorLost() underlying to fail any tasks assigned to
+        //    those executors to avoid app hang
+        sc.schedulerBackend match {
+          case backend: CoarseGrainedSchedulerBackend =>
+            backend.driverEndpoint.send(RemoveExecutor(executorId,
+              ExecutorProcessLost(
+                s"Executor heartbeat timed out after ${timeout} ms",
+                causedByApp = !sc.conf.get(HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT))))
+
+          // LocalSchedulerBackend is used locally and only has one single executor
+          case _: LocalSchedulerBackend =>
+
+          case other => throw new UnsupportedOperationException(
+            s"Unknown scheduler backend: ${other.getClass}")
+        }
+      }
+    })
+  }
+
   private def expireDeadHosts(): Unit = {
+  /**
+   * [SC-105641]
+   * Originally, the driver’s HeartbeatReceiver will expire an executor if it does not receive any
+   * heartbeat from the executor for 120 seconds. However, 120 seconds is too long, but we will face
+   * other challenges when we try to lower the timeout threshold. To elaborate, when an executor is
+   * performing full GC, it cannot send/reply any message. Next paragraphs describe the solution to
+   * detect network disconnection between driver and executor in a short time.
+   *
+   * An executor is running on a worker but in different JVMs, and a driver is running on a master
+   * but in different JVMs. Hence, the network connection between driver/executor and master/worker
+   * is the same. Because executor and worker are running on different JVMs, worker can still send
+   * heartbeat to master when executor performs GC.
+   *
+   * For new Heartbeat Receiver, if driver does not receive any heartbeat from the executor for
+   * `executorTimeoutMs` (default: 60s) seconds, HeartbeatReceiver will send a request to master to
+   * ask for the latest heartbeat from the worker which the executor runs on `workerLastHeartbeat`.
+   * HeartbeatReceiver can determine whether the heartbeat loss is caused by network issues or other
+   * issues (e.g. GC). If the heartbeat loss is not caused by network issues, the HeartbeatReceiver
+   * will put the executor into a waitingList rather than expiring it immediately.
+   *
+   * [Note]: Definition of `network issues`
+   * Here, the definition `network issues` is the issues that related to network directly. If the
+   * network is connected, the issues do not included in `network issues`. For example, an
+   * executor's JVM is closed by a problematic task, so the JVM will notify driver that the socket
+   * is closed. If the network is connected, driver will receive the notification and trigger the
+   * function `onDisconnected`. This issue is not a `network issue` because the network is
+   * connected.
+   *
+   * [Warning 1]
+   * Worker will send heartbeats to Master every (conf.get(WORKER_TIMEOUT) * 1000 / 4) milliseconds.
+   * Check deploy/worker/Worker.scala for more details. This new mechanism design is based on the
+   * assumption: (executorTimeoutMs / 2) > (conf.get(WORKER_TIMEOUT) * 1000 / 4).
+   *
+   * [Warning 2]
+   * Not every deployment method schedules driver on master.
+   */
     logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.")
     val now = clock.getTimeMillis()
-    for ((executorId, lastSeenMs) <- executorLastSeen) {
-      if (now - lastSeenMs > executorTimeoutMs) {
-        logWarning(s"Removing executor $executorId with no recent heartbeats: " +
-          s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
-        // Asynchronously kill the executor to avoid blocking the current thread
-        killExecutorThread.submit(new Runnable {
-          override def run(): Unit = Utils.tryLogNonFatalError {
-            // Note: we want to get an executor back after expiring this one,
-            // so do not simply call `sc.killExecutor` here (SPARK-8119)
-            sc.killAndReplaceExecutor(executorId)
-            // SPARK-27348: in case of the executors which are not gracefully shut down,
-            // we should remove lost executors from CoarseGrainedSchedulerBackend manually
-            // here to guarantee two things:
-            // 1) explicitly remove executor information from CoarseGrainedSchedulerBackend for
-            //    a lost executor instead of waiting for disconnect message
-            // 2) call scheduler.executorLost() underlying to fail any tasks assigned to
-            //    those executors to avoid app hang
-            sc.schedulerBackend match {
-              case backend: CoarseGrainedSchedulerBackend =>
-                backend.driverEndpoint.send(RemoveExecutor(executorId,
-                  ExecutorProcessLost(
-                    s"Executor heartbeat timed out after ${now - lastSeenMs} ms")))
-
-              // LocalSchedulerBackend is used locally and only has one single executor
-              case _: LocalSchedulerBackend =>
-
-              case other => throw new UnsupportedOperationException(
-                s"Unknown scheduler backend: ${other.getClass}")
-            }
+    if (!sc.conf.get(HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT)) {

Review Comment:
   Resolved.



##########
core/src/main/scala/org/apache/spark/internal/config/Network.scala:
##########
@@ -49,7 +49,13 @@ private[spark] object Network {
     ConfigBuilder("spark.network.timeoutInterval")
       .version("1.3.2")
       .timeConf(TimeUnit.MILLISECONDS)
-      .createWithDefaultString(STORAGE_BLOCKMANAGER_TIMEOUTINTERVAL.defaultValueString)
+      .createWithDefaultString("15s")
+
+  private[spark] val NETWORK_EXECUTOR_TIMEOUT =
+    ConfigBuilder("spark.network.executorTimeout")
+      .version("1.3.0")

Review Comment:
   Resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org