You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/09/02 06:27:03 UTC

[GitHub] [spark] kevin85421 commented on a diff in pull request #37411: [SPARK-39984][CORE] Check workerLastHeartbeat with master before HeartbeatReceiver expires an executor

kevin85421 commented on code in PR #37411:
URL: https://github.com/apache/spark/pull/37411#discussion_r961328575


##########
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:
##########
@@ -77,17 +77,61 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
 
   private[spark] var scheduler: TaskScheduler = null
 
-  // executor ID -> timestamp of when the last heartbeat from this executor was received
+  /**
+   * [SPARK-39984]
+   * Please make sure the intersection between `executorLastSeen` and `executorExpiryCandidates` is
+   * an empty set. If the intersection is not empty, it is possible to never kill the executor until
+   * the executor recovers. When an executor is in both `executorLastSeen` and
+   * `executorExpiryCandidates`, the value of `workerLastHeartbeat` in `executorExpiryCandidates`
+   * may update if the worker sends heartbeats to master normally.
+   *
+   * `executorLastSeen`:
+   *  - key: executor ID
+   *  - value: timestamp of when the last heartbeat from this executor was received
+   *
+   * `executorExpiryCandidates`: executor ID -> WorkerLastHeartbeat
+   *  - key: executor ID
+   *  - value: timestamp of when the last heartbeat from the worker was received
+   *
+   * when driver does not receive any heartbeat from an executor for `executorTimeoutMs` seconds,
+   * the driver will ask master for the last heartbeat from the worker which the executor is running
+   * on.
+   */
   private val executorLastSeen = new HashMap[String, Long]
+  private val executorExpiryCandidates = new HashMap[String, Long]
 
   private val executorTimeoutMs = sc.conf.get(
     config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT
-  ).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s"))
+  ).getOrElse(
+    sc.conf.get(Network.NETWORK_EXECUTOR_TIMEOUT) match {
+      case Some(executorTimeout) => executorTimeout
+      case None => Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s")
+    }
+  )
 
   private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)
 
   private val executorHeartbeatIntervalMs = sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL)
 
+  /**
+   * Currently, [SPARK-39984] is only for StandaloneSchedulerBackend.
+   *
+   * `checkWorkerLastHeartbeat`: A flag to enable two-phase executor timeout.
+   * `expiryCandidatesTimeout`: The timeout used for executorExpiryCandidates.
+   */
+  private val checkWorkerLastHeartbeat = {

Review Comment:
   Let me clarify the definition of "initial delay". In my understanding, the "initial delay" above means the second argument of `scheduleAtFixedRate` ([Link](https://sourcegraph.com/github.com/apache/spark@b012cb7/-/blob/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala?L107)). Is it correct?
   
   ```scala
   override def onStart(): Unit = {
     timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(
     () => Utils.tryLogNonFatalError { Option(self).foreach(_.ask[Boolean](ExpireDeadHosts)) },
     0, checkTimeoutIntervalMs, TimeUnit.MILLISECONDS)
   }
   ```
   
   I agreed with you that initial delay can significantly decrease the probability of first expireDeadHosts trigger happening before scheduler backend initialization. However, it cannot totally prevent this edge case. Hence, my thought is to define
   `checkWorkerLastHeartbeat` as a function, and we can totally prevent the edge case. Does it make sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org