You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/03 01:12:46 UTC

[GitHub] [spark] kevin85421 opened a new pull request, #37385: [SPARK-39956][CORE] Determine task failures based on ExecutorExitCode

kevin85421 opened a new pull request, #37385:
URL: https://github.com/apache/spark/pull/37385

   ### What changes were proposed in this pull request?
   ### Q1. How many ExecutorExitCode do we have?
   Based on ExecutorExitCode.scala, we found there are 4 ExecutorExitCode.
   * `DISK_STORE_FAILED_TO_CREATE_DIR`: Executor has a variable BlockManager (`env.blockManager`), and BlockManager will create a DiskBlockManager when it initializes. When a DiskBlockManager is created, it will try to create a local directory. As shown in the following code snippet, if the directory creation is fail, DiskBlockManager will close the JVM with the exit code `DISK_STORE_FAILED_TO_CREATE_DIR`.
   ```scala
   private[spark] val localDirs: Array[File] = createLocalDirs(conf)
   if (localDirs.isEmpty) {
     logError("Failed to create any local dir.")
     System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
   }
   ```
   
   * `EXTERNAL_BLOCK_STORE_FAILED_TO_INITIALIZE`: Unused
   * `EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR`: Unused
   * `HEARTBEAT_FAILURE`: When an Executor is created, it will start a Heartbeater thread to execute the function `reportHeartBeat()` periodically. The function `reportHeartBeat()` will send a heartbeat to driver. If the heartbeater does not receive the response in time, it will increment `heartbeatFailures` by one. When the value of `heartbeatFailures` is higher or equal to `HEARTBEAT_MAX_FAILURES`, the heartbeater will close the JVM with the exit code `ExecutorExitCode.HEARTBEAT_FAILURE`.
   
   ```scala
   if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) {
     logError(s"Exit as unable to send heartbeats to driver " +
       s"more than $HEARTBEAT_MAX_FAILURES times")
     System.exit(ExecutorExitCode.HEARTBEAT_FAILURE)
   }
   ```
   
   ### Q2. How does ExecutorExitCode pass to driver?
   In ExecutorRunner, an Executor will be run as a new process. Hence, Executor is running in a different JVM. When Executor called the function `System.exit(...)` to close Executor's JVM, the exitCode will be returned to ExecutorRunner. Then, Executor will send the exit code to Worker. 
   
   * A high-level overview: **ExecutorRunner -> Worker -> Master -> Driver**
   
   Here, I will talk more details about how does the Executor exit code pass from ExecutorRunner to Driver.
   
   1. ExecutorRunner sends an `ExecutorStateChanged` message to Worker.
   ```scala
   val exitCode = process.waitFor()
   state = ExecutorState.EXITED
   val message = "Command exited with code " + exitCode
   worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
   ```
   
   2. When Worker receives the `ExecutorStateChanged` message, Worker will forward the message to Master.
   ```scala
   syncExecutorStateWithMaster(executorStateChanged)
   ```
   
   3. When Master receives the `ExecutorStateChanged` message, it will use the exit code to create an `ExecutorUpdated` message and send the message to StandaloneAppClient (Driver).
   ```scala
   exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, None))
   ```
   
   4. When StandaloneAppClient receives the `ExecutorUpdated` message, it will call the function `executorRemoved` (SchedulerBackend's function) and the function assumes every executor exit is caused by task failure (`exitCausedByApp = true`).
   ```scala
   listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerHost)
   ```
   
   ```scala
   override def executorRemoved(
       fullId: String,
       message: String,
       exitStatus: Option[Int],
       workerHost: Option[String]): Unit = {
     val reason: ExecutorLossReason = exitStatus match {
       case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
       case None => ExecutorProcessLost(message, workerHost)
     }
     logInfo("Executor %s removed: %s".format(fullId, message))
     removeExecutor(fullId.split("/")(1), reason)
   }
   ```
   
   ### Q3. How to determine `exitCausedByApp` based on ExecutorExitCode?
   * `DISK_STORE_FAILED_TO_CREATE_DIR` => hardware failure => `exitCausedByApp = false`
   * `HEARTBEAT_FAILURE` => network failure => `exitCausedByApp = false`
   
   ### Why are the changes needed?
   There are a lot of possible reasons to cause an executor exit. However, the driver will assume every executor exit is caused by task failure. As shown in the following code snippet, `StandaloneSchedulerBackend` will determine the executor exit is caused by task failure (`exitCausedByApp = true`) no matter the value of ExecutorExitCode (`code`).
   
   ```scala
   override def executorRemoved(
       fullId: String,
       message: String,
       exitStatus: Option[Int],
       workerHost: Option[String]): Unit = {
     val reason: ExecutorLossReason = exitStatus match {
       case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
       case None => ExecutorProcessLost(message, workerHost)
     }
     logInfo("Executor %s removed: %s".format(fullId, message))
     removeExecutor(fullId.split("/")(1), reason)
   }
   ```
   
   The assumption is wrong. For example, when DiskBlockManager on Executor fails to create a directory, it will close executor’s JVM with the exit code `DISK_STORE_FAILED_TO_CREATE_DIR`. Obviously, when the driver received the exit code `DISK_STORE_FAILED_TO_CREATE_DIR`, the executor exit is caused by hardware failure rather than task failure. Hence, this pull request aims to determine whether the executor exit is caused by task failures or not based on ExecutorExitCode.
   
   To address this issue, we should to understand three questions:
   
   * Q1. How many ExecutorExitCode do we have?
   * Q2. How does ExecutorExitCode pass to driver?
   * Q3. How to determine `exitCausedByApp` based on ExecutorExitCode?
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   [TODO] We need to test this PR with the support of other PR. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #37385: [SPARK-39956][CORE] Determine task failures based on ExecutorExitCode

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #37385:
URL: https://github.com/apache/spark/pull/37385#issuecomment-1204764907

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] kevin85421 commented on pull request #37385: [SPARK-39956][CORE] Determine task failures based on ExecutorExitCode

Posted by GitBox <gi...@apache.org>.
kevin85421 commented on PR #37385:
URL: https://github.com/apache/spark/pull/37385#issuecomment-1203373820

   cc. @Ngone51 @jiangxb1987 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] kevin85421 commented on pull request #37385: [SPARK-39956][CORE] Determine task failures based on ExecutorExitCode

Posted by GitBox <gi...@apache.org>.
kevin85421 commented on PR #37385:
URL: https://github.com/apache/spark/pull/37385#issuecomment-1204547388

   This PR is totally covered by #37400. Closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] kevin85421 closed pull request #37385: [SPARK-39956][CORE] Determine task failures based on ExecutorExitCode

Posted by GitBox <gi...@apache.org>.
kevin85421 closed pull request #37385: [SPARK-39956][CORE] Determine task failures based on ExecutorExitCode
URL: https://github.com/apache/spark/pull/37385


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org