You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2014/06/10 17:37:17 UTC
git commit: [SPARK-1978] In some cases,
spark-yarn does not automatically restart the failed container
Repository: spark
Updated Branches:
refs/heads/master a9a461c59 -> 884ca718b
[SPARK-1978] In some cases, spark-yarn does not automatically restart the failed container
Author: witgo <wi...@qq.com>
Closes #921 from witgo/allocateExecutors and squashes the following commits:
bc3aa66 [witgo] review commit
8800eba [witgo] Merge branch 'master' of https://github.com/apache/spark into allocateExecutors
32ac7af [witgo] review commit
056b8c7 [witgo] Merge branch 'master' of https://github.com/apache/spark into allocateExecutors
04c6f7e [witgo] Merge branch 'master' into allocateExecutors
aff827c [witgo] review commit
5c376e0 [witgo] Merge branch 'master' of https://github.com/apache/spark into allocateExecutors
1faf4f4 [witgo] Merge branch 'master' into allocateExecutors
3c464bd [witgo] add time limit to allocateExecutors
e00b656 [witgo] In some cases, yarn does not automatically restart the container
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/884ca718
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/884ca718
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/884ca718
Branch: refs/heads/master
Commit: 884ca718b24f0bbe93358f2a366463b4e4d31f49
Parents: a9a461c
Author: witgo <wi...@qq.com>
Authored: Tue Jun 10 10:34:57 2014 -0500
Committer: Thomas Graves <tg...@apache.org>
Committed: Tue Jun 10 10:34:57 2014 -0500
----------------------------------------------------------------------
.../spark/deploy/yarn/ApplicationMaster.scala | 39 +++++++++++---------
.../spark/deploy/yarn/ExecutorLauncher.scala | 22 ++++++-----
2 files changed, 34 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/884ca718/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index c1dfe3f..33a60d9 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -252,15 +252,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
try {
logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
- // TODO: This is a bit ugly. Can we make it nicer?
- // TODO: Handle container failure
yarnAllocator.addResourceRequests(args.numExecutors)
+ yarnAllocator.allocateResources()
// Exits the loop if the user thread exits.
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
- if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
- finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of executor failures reached")
- }
+ checkNumExecutorsFailed()
+ allocateMissingExecutor()
yarnAllocator.allocateResources()
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100)
@@ -289,23 +286,31 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
}
+ private def allocateMissingExecutor() {
+ val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
+ yarnAllocator.getNumPendingAllocate
+ if (missingExecutorCount > 0) {
+ logInfo("Allocating %d containers to make up for (potentially) lost containers".
+ format(missingExecutorCount))
+ yarnAllocator.addResourceRequests(missingExecutorCount)
+ }
+ }
+
+ private def checkNumExecutorsFailed() {
+ if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+ finishApplicationMaster(FinalApplicationStatus.FAILED,
+ "max number of executor failures reached")
+ }
+ }
+
private def launchReporterThread(_sleepTime: Long): Thread = {
val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
val t = new Thread {
override def run() {
while (userThread.isAlive) {
- if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
- finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of executor failures reached")
- }
- val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
- yarnAllocator.getNumPendingAllocate
- if (missingExecutorCount > 0) {
- logInfo("Allocating %d containers to make up for (potentially) lost containers".
- format(missingExecutorCount))
- yarnAllocator.addResourceRequests(missingExecutorCount)
- }
+ checkNumExecutorsFailed()
+ allocateMissingExecutor()
sendProgress()
Thread.sleep(sleepTime)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/884ca718/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index a4ce876..d93e5bb 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -200,17 +200,25 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
- // TODO: This is a bit ugly. Can we make it nicer?
- // TODO: Handle container failure
-
yarnAllocator.addResourceRequests(args.numExecutors)
+ yarnAllocator.allocateResources()
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
+ allocateMissingExecutor()
yarnAllocator.allocateResources()
Thread.sleep(100)
}
logInfo("All executors have launched.")
+ }
+ private def allocateMissingExecutor() {
+ val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
+ yarnAllocator.getNumPendingAllocate
+ if (missingExecutorCount > 0) {
+ logInfo("Allocating %d containers to make up for (potentially) lost containers".
+ format(missingExecutorCount))
+ yarnAllocator.addResourceRequests(missingExecutorCount)
+ }
}
// TODO: We might want to extend this to allocate more containers in case they die !
@@ -220,13 +228,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
val t = new Thread {
override def run() {
while (!driverClosed) {
- val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
- yarnAllocator.getNumPendingAllocate
- if (missingExecutorCount > 0) {
- logInfo("Allocating %d containers to make up for (potentially) lost containers".
- format(missingExecutorCount))
- yarnAllocator.addResourceRequests(missingExecutorCount)
- }
+ allocateMissingExecutor()
sendProgress()
Thread.sleep(sleepTime)
}