You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ad...@apache.org on 2014/05/30 19:11:11 UTC
git commit: [SPARK-1901] worker should make sure executor has exited
before updating executor's info
Repository: spark
Updated Branches:
refs/heads/branch-1.0 80721fb45 -> 1696a4470
[SPARK-1901] worker should make sure executor has exited before updating executor's info
https://issues.apache.org/jira/browse/SPARK-1901
Author: Zhen Peng <zh...@baidu.com>
Closes #854 from zhpengg/bugfix-worker-kills-executor and squashes the following commits:
21d380b [Zhen Peng] add some error messages
506cea6 [Zhen Peng] add some docs for killProcess()
a0b9860 [Zhen Peng] [SPARK-1901] worker should make sure executor has exited before updating executor's info
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1696a447
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1696a447
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1696a447
Branch: refs/heads/branch-1.0
Commit: 1696a44704b8efa6515da9fa311f8acd9dda970e
Parents: 80721fb
Author: Zhen Peng <zh...@baidu.com>
Authored: Fri May 30 10:11:02 2014 -0700
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Fri May 30 10:11:02 2014 -0700
----------------------------------------------------------------------
.../spark/deploy/worker/ExecutorRunner.scala | 20 ++++++++++++--------
1 file changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1696a447/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 2051403..d27e0e1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -61,17 +61,23 @@ private[spark] class ExecutorRunner(
// Shutdown hook that kills actors on shutdown.
shutdownHook = new Thread() {
override def run() {
- killProcess()
+ killProcess(Some("Worker shutting down"))
}
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
}
- private def killProcess() {
+ /**
+ * kill executor process, wait for exit and notify worker to update resource status
+ *
+ * @param message the exception message which caused the executor's death
+ */
+ private def killProcess(message: Option[String]) {
if (process != null) {
logInfo("Killing process!")
process.destroy()
- process.waitFor()
+ val exitCode = process.waitFor()
+ worker ! ExecutorStateChanged(appId, execId, state, message, Some(exitCode))
}
}
@@ -82,7 +88,6 @@ private[spark] class ExecutorRunner(
workerThread.interrupt()
workerThread = null
state = ExecutorState.KILLED
- worker ! ExecutorStateChanged(appId, execId, state, None, None)
Runtime.getRuntime.removeShutdownHook(shutdownHook)
}
}
@@ -148,14 +153,13 @@ private[spark] class ExecutorRunner(
} catch {
case interrupted: InterruptedException => {
logInfo("Runner thread for executor " + fullId + " interrupted")
- killProcess()
+ state = ExecutorState.KILLED
+ killProcess(None)
}
case e: Exception => {
logError("Error running executor", e)
- killProcess()
state = ExecutorState.FAILED
- val message = e.getClass + ": " + e.getMessage
- worker ! ExecutorStateChanged(appId, execId, state, Some(message), None)
+ killProcess(Some(e.toString))
}
}
}