You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/12/27 05:56:35 UTC

[GitHub] [spark] jiangxb1987 commented on a change in pull request #26980: [SPARK-27348][CORE] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend

jiangxb1987 commented on a change in pull request #26980: [SPARK-27348][CORE] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
URL: https://github.com/apache/spark/pull/26980#discussion_r361587855
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
 ##########
 @@ -199,14 +202,28 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
       if (now - lastSeenMs > executorTimeoutMs) {
         logWarning(s"Removing executor $executorId with no recent heartbeats: " +
           s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
-        scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
-          s"timed out after ${now - lastSeenMs} ms"))
-          // Asynchronously kill the executor to avoid blocking the current thread
+        // Asynchronously kill the executor to avoid blocking the current thread
         killExecutorThread.submit(new Runnable {
           override def run(): Unit = Utils.tryLogNonFatalError {
             // Note: we want to get an executor back after expiring this one,
             // so do not simply call `sc.killExecutor` here (SPARK-8119)
             sc.killAndReplaceExecutor(executorId)
+            // SPARK-27348: in case of the executors which are not gracefully shut down,
+            // we should remove lost executors from CoarseGrainedSchedulerBackend manually
+            // here to guarantee two things:
+            // 1) call scheduler.executorLost() underlying to fail any tasks assigned to
+            //    those executors to avoid app hang
+            // 2) always remove executor information from CoarseGrainedSchedulerBackend for
+            //    a lost executor
+            sc.schedulerBackend match {
+              case backend: CoarseGrainedSchedulerBackend =>
+                backend.driverEndpoint.send(RemoveExecutor(executorId,
+                  SlaveLost(s"Executor heartbeat timed out after ${now - lastSeenMs} ms")))
+              case _: LocalSchedulerBackend =>
+                  // LocalSchedulerBackend is used locally and only has one single executor
+              case _ => throw new UnsupportedOperationException(
 
 Review comment:
   nit: 
   ```
   case other => throw new UnsupportedOperationException(s"Unknown scheduler backend: ${other.getClass})
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org