You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2014/06/10 17:37:17 UTC

git commit: [SPARK-1978] In some cases, spark-yarn does not automatically restart the failed container

Repository: spark
Updated Branches:
  refs/heads/master a9a461c59 -> 884ca718b


[SPARK-1978] In some cases, spark-yarn does not automatically restart the failed container

Author: witgo <wi...@qq.com>

Closes #921 from witgo/allocateExecutors and squashes the following commits:

bc3aa66 [witgo] review commit
8800eba [witgo] Merge branch 'master' of https://github.com/apache/spark into allocateExecutors
32ac7af [witgo] review commit
056b8c7 [witgo] Merge branch 'master' of https://github.com/apache/spark into allocateExecutors
04c6f7e [witgo] Merge branch 'master' into allocateExecutors
aff827c [witgo] review commit
5c376e0 [witgo] Merge branch 'master' of https://github.com/apache/spark into allocateExecutors
1faf4f4 [witgo] Merge branch 'master' into allocateExecutors
3c464bd [witgo] add time limit to allocateExecutors
e00b656 [witgo] In some cases, yarn does not automatically restart the container


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/884ca718
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/884ca718
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/884ca718

Branch: refs/heads/master
Commit: 884ca718b24f0bbe93358f2a366463b4e4d31f49
Parents: a9a461c
Author: witgo <wi...@qq.com>
Authored: Tue Jun 10 10:34:57 2014 -0500
Committer: Thomas Graves <tg...@apache.org>
Committed: Tue Jun 10 10:34:57 2014 -0500

----------------------------------------------------------------------
 .../spark/deploy/yarn/ApplicationMaster.scala   | 39 +++++++++++---------
 .../spark/deploy/yarn/ExecutorLauncher.scala    | 22 ++++++-----
 2 files changed, 34 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/884ca718/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index c1dfe3f..33a60d9 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -252,15 +252,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     try {
       logInfo("Allocating " + args.numExecutors + " executors.")
       // Wait until all containers have finished
-      // TODO: This is a bit ugly. Can we make it nicer?
-      // TODO: Handle container failure
       yarnAllocator.addResourceRequests(args.numExecutors)
+      yarnAllocator.allocateResources()
       // Exits the loop if the user thread exits.
       while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
-        if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
-          finishApplicationMaster(FinalApplicationStatus.FAILED,
-            "max number of executor failures reached")
-        }
+        checkNumExecutorsFailed()
+        allocateMissingExecutor()
         yarnAllocator.allocateResources()
         ApplicationMaster.incrementAllocatorLoop(1)
         Thread.sleep(100)
@@ -289,23 +286,31 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     }
   }
 
+  private def allocateMissingExecutor() {
+    val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
+      yarnAllocator.getNumPendingAllocate
+    if (missingExecutorCount > 0) {
+      logInfo("Allocating %d containers to make up for (potentially) lost containers".
+        format(missingExecutorCount))
+      yarnAllocator.addResourceRequests(missingExecutorCount)
+    }
+  }
+
+  private def checkNumExecutorsFailed() {
+    if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+      finishApplicationMaster(FinalApplicationStatus.FAILED,
+        "max number of executor failures reached")
+    }
+  }
+
   private def launchReporterThread(_sleepTime: Long): Thread = {
     val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
 
     val t = new Thread {
       override def run() {
         while (userThread.isAlive) {
-          if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
-            finishApplicationMaster(FinalApplicationStatus.FAILED,
-              "max number of executor failures reached")
-          }
-          val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
-            yarnAllocator.getNumPendingAllocate
-          if (missingExecutorCount > 0) {
-            logInfo("Allocating %d containers to make up for (potentially) lost containers".
-              format(missingExecutorCount))
-            yarnAllocator.addResourceRequests(missingExecutorCount)
-          }
+          checkNumExecutorsFailed()
+          allocateMissingExecutor()
           sendProgress()
           Thread.sleep(sleepTime)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/884ca718/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index a4ce876..d93e5bb 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -200,17 +200,25 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
 
     logInfo("Allocating " + args.numExecutors + " executors.")
     // Wait until all containers have finished
-    // TODO: This is a bit ugly. Can we make it nicer?
-    // TODO: Handle container failure
-
     yarnAllocator.addResourceRequests(args.numExecutors)
+    yarnAllocator.allocateResources()
     while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
+      allocateMissingExecutor()
       yarnAllocator.allocateResources()
       Thread.sleep(100)
     }
 
     logInfo("All executors have launched.")
+  }
 
+  private def allocateMissingExecutor() {
+    val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
+      yarnAllocator.getNumPendingAllocate
+    if (missingExecutorCount > 0) {
+      logInfo("Allocating %d containers to make up for (potentially) lost containers".
+        format(missingExecutorCount))
+      yarnAllocator.addResourceRequests(missingExecutorCount)
+    }
   }
 
   // TODO: We might want to extend this to allocate more containers in case they die !
@@ -220,13 +228,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     val t = new Thread {
       override def run() {
         while (!driverClosed) {
-          val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
-            yarnAllocator.getNumPendingAllocate
-          if (missingExecutorCount > 0) {
-            logInfo("Allocating %d containers to make up for (potentially) lost containers".
-              format(missingExecutorCount))
-            yarnAllocator.addResourceRequests(missingExecutorCount)
-          }
+          allocateMissingExecutor()
           sendProgress()
           Thread.sleep(sleepTime)
         }