You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/23 03:58:14 UTC
[2/4] git commit: executor creation failed should not make the worker
restart
executor creation failed should not make the worker restart
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f9a95d67
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f9a95d67
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f9a95d67
Branch: refs/heads/master
Commit: f9a95d67365509cdd260858e858e7a9b120c1d1b
Parents: 792d908
Author: CodingCat <zh...@gmail.com>
Authored: Wed Jan 15 19:32:50 2014 -0500
Committer: CodingCat <zh...@gmail.com>
Committed: Mon Jan 20 02:50:30 2014 -0500
----------------------------------------------------------------------
.../org/apache/spark/deploy/worker/Worker.scala | 32 ++++++++++++--------
1 file changed, 20 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f9a95d67/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 312560d..c9e4fc2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -208,18 +208,26 @@ private[spark] class Worker(
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else {
- logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
- // TODO (pwendell): We shuld make sparkHome an Option[String] in
- // ApplicationDescription to be more explicit about this.
- val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath)
- val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
- self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.RUNNING)
- executors(appId + "/" + execId) = manager
- manager.start()
- coresUsed += cores_
- memoryUsed += memory_
- masterLock.synchronized {
- master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
+ try {
+ logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
+ // TODO (pwendell): We shuld make sparkHome an Option[String] in
+ // ApplicationDescription to be more explicit about this.
+ val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath)
+ val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
+ self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.RUNNING)
+ executors(appId + "/" + execId) = manager
+ manager.start()
+ coresUsed += cores_
+ memoryUsed += memory_
+ masterLock.synchronized {
+ master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
+ }
+ } catch {
+ case e: Exception => {
+ masterLock.synchronized {
+ master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
+ }
+ }
}
}