You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/02/29 20:02:48 UTC

spark git commit: [SPARK-13522][CORE] Executor should kill itself when it's unable to heartbeat to driver more than N times

Repository: spark
Updated Branches:
  refs/heads/master bc65f60ef -> 17a253cbf


[SPARK-13522][CORE] Executor should kill itself when it's unable to heartbeat to driver more than N times

## What changes were proposed in this pull request?

Sometimes, network disconnection event won't be triggered for other potential race conditions that we may not have thought of, then the executor will keep sending heartbeats to driver and won't exit.

This PR adds a new configuration `spark.executor.heartbeat.maxFailures` to kill Executor when it's unable to heartbeat to the driver more than `spark.executor.heartbeat.maxFailures` times.

## How was this patch tested?

unit tests

Author: Shixiong Zhu <sh...@databricks.com>

Closes #11401 from zsxwing/SPARK-13522.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/17a253cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/17a253cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/17a253cb

Branch: refs/heads/master
Commit: 17a253cbf4712dbeab06c454b5142917a1bba78b
Parents: bc65f60
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Mon Feb 29 11:02:45 2016 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Mon Feb 29 11:02:45 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/executor/Executor.scala    | 22 +++++++++++++++++++-
 .../spark/executor/ExecutorExitCode.scala       |  8 +++++++
 2 files changed, 29 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/17a253cb/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index a602fca..86c121f 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -114,6 +114,19 @@ private[spark] class Executor(
   private val heartbeatReceiverRef =
     RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)
 
+  /**
+   * When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES`
+   * times, it should kill itself. The default value is 60. It means we will retry to send
+   * heartbeats about 10 minutes because the heartbeat interval is 10s.
+   */
+  private val HEARTBEAT_MAX_FAILURES = conf.getInt("spark.executor.heartbeat.maxFailures", 60)
+
+  /**
+   * Count the failure times of heartbeat. It should only be acessed in the heartbeat thread. Each
+   * successful heartbeat will reset it to 0.
+   */
+  private var heartbeatFailures = 0
+
   startDriverHeartbeater()
 
   def launchTask(
@@ -461,8 +474,15 @@ private[spark] class Executor(
         logInfo("Told to re-register on heartbeat")
         env.blockManager.reregister()
       }
+      heartbeatFailures = 0
     } catch {
-      case NonFatal(e) => logWarning("Issue communicating with driver in heartbeater", e)
+      case NonFatal(e) =>
+        logWarning("Issue communicating with driver in heartbeater", e)
+        logError(s"Unable to send heartbeats to driver more than $HEARTBEAT_MAX_FAILURES times")
+        heartbeatFailures += 1
+        if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) {
+          System.exit(ExecutorExitCode.HEARTBEAT_FAILURE)
+        }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/17a253cb/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
index ea36fb6..99858f7 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
@@ -39,6 +39,12 @@ object ExecutorExitCode {
   /** ExternalBlockStore failed to create a local temporary directory after many attempts. */
   val EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR = 55
 
+  /**
+   * Executor is unable to send heartbeats to the driver more than
+   * "spark.executor.heartbeat.maxFailures" times.
+   */
+  val HEARTBEAT_FAILURE = 56
+
   def explainExitCode(exitCode: Int): String = {
     exitCode match {
       case UNCAUGHT_EXCEPTION => "Uncaught exception"
@@ -51,6 +57,8 @@ object ExecutorExitCode {
       // TODO: replace external block store with concrete implementation name
       case EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR =>
         "ExternalBlockStore failed to create a local temporary directory."
+      case HEARTBEAT_FAILURE =>
+        "Unable to send heartbeats to driver."
       case _ =>
         "Unknown executor exit code (" + exitCode + ")" + (
           if (exitCode > 128) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org