You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2014/07/24 21:46:26 UTC

git commit: [SPARK-2037]: yarn client mode doesn't support spark.yarn.max.executor.failures

Repository: spark
Updated Branches:
  refs/heads/master c960b5051 -> 323a83c52


[SPARK-2037]: yarn client mode doesn't support spark.yarn.max.executor.failures

Author: GuoQiang Li <wi...@qq.com>

Closes #1180 from witgo/SPARK-2037 and squashes the following commits:

3d52411 [GuoQiang Li] review commit
7058f4d [GuoQiang Li] Correctly stop SparkContext
6d0561f [GuoQiang Li] Fix: yarn client mode doesn't support spark.yarn.max.executor.failures


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/323a83c5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/323a83c5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/323a83c5

Branch: refs/heads/master
Commit: 323a83c5235f9289cd9526491d62365df96a429b
Parents: c960b50
Author: GuoQiang Li <wi...@qq.com>
Authored: Thu Jul 24 14:46:10 2014 -0500
Committer: Thomas Graves <tg...@apache.org>
Committed: Thu Jul 24 14:46:10 2014 -0500

----------------------------------------------------------------------
 .../spark/deploy/yarn/ExecutorLauncher.scala    | 80 +++++++++++++-------
 .../cluster/YarnClientSchedulerBackend.scala    | 28 +++++++
 .../spark/deploy/yarn/ExecutorLauncher.scala    | 45 ++++++++---
 3 files changed, 115 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/323a83c5/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index d232c18..184e2ad 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -28,7 +28,6 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 import akka.actor._
 import akka.remote._
-import akka.actor.Terminated
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -57,10 +56,17 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
   private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
 
   private var yarnAllocator: YarnAllocationHandler = _
-  private var driverClosed:Boolean = false
+
+  private var driverClosed: Boolean = false
+  private var isFinished: Boolean = false
+  private var registered: Boolean = false
+
+  // Default to numExecutors * 2, with minimum of 3
+  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
+    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
 
   val securityManager = new SecurityManager(sparkConf)
-  val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
+  val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
     conf = sparkConf, securityManager = securityManager)._1
   var actor: ActorRef = _
 
@@ -97,23 +103,26 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     appAttemptId = getApplicationAttemptId()
     resourceManager = registerWithResourceManager()
 
-    val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
-
-    // Compute number of threads for akka
-    val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
-
-    if (minimumMemory > 0) {
-      val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead",
-        YarnAllocationHandler.MEMORY_OVERHEAD)
-      val numCore = (mem  / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
-
-      if (numCore > 0) {
-        // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
-        // TODO: Uncomment when hadoop is on a version which has this fixed.
-        // args.workerCores = numCore
+    synchronized {
+      if (!isFinished) {
+        val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+        // Compute number of threads for akka
+        val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
+
+        if (minimumMemory > 0) {
+          val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+            YarnAllocationHandler.MEMORY_OVERHEAD)
+          val numCore = (mem  / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
+
+          if (numCore > 0) {
+            // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
+            // TODO: Uncomment when hadoop is on a version which has this fixed.
+            // args.workerCores = numCore
+          }
+        }
+        registered = true
       }
     }
-
     waitForSparkMaster()
     addAmIpFilter()
     // Allocate all containers
@@ -243,11 +252,17 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
       yarnAllocator.allocateContainers(
         math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
+      checkNumExecutorsFailed()
       Thread.sleep(100)
     }
 
     logInfo("All executors have launched.")
-
+  }
+  private def checkNumExecutorsFailed() {
+    if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+      finishApplicationMaster(FinalApplicationStatus.FAILED,
+        "max number of executor failures reached")
+    }
   }
 
   // TODO: We might want to extend this to allocate more containers in case they die !
@@ -257,6 +272,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     val t = new Thread {
       override def run() {
         while (!driverClosed) {
+          checkNumExecutorsFailed()
           val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
           if (missingExecutorCount > 0) {
             logInfo("Allocating " + missingExecutorCount +
@@ -282,15 +298,23 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     yarnAllocator.allocateContainers(0)
   }
 
-  def finishApplicationMaster(status: FinalApplicationStatus) {
-
-    logInfo("finish ApplicationMaster with " + status)
-    val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
-      .asInstanceOf[FinishApplicationMasterRequest]
-    finishReq.setAppAttemptId(appAttemptId)
-    finishReq.setFinishApplicationStatus(status)
-    finishReq.setTrackingUrl(sparkConf.get("spark.driver.appUIHistoryAddress", ""))
-    resourceManager.finishApplicationMaster(finishReq)
+  def finishApplicationMaster(status: FinalApplicationStatus, appMessage: String = "") {
+    synchronized {
+      if (isFinished) {
+        return
+      }
+      logInfo("Unregistering ApplicationMaster with " + status)
+      if (registered) {
+        val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+          .asInstanceOf[FinishApplicationMasterRequest]
+        finishReq.setAppAttemptId(appAttemptId)
+        finishReq.setFinishApplicationStatus(status)
+        finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", ""))
+        finishReq.setDiagnostics(appMessage)
+        resourceManager.finishApplicationMaster(finishReq)
+      }
+      isFinished = true
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/323a83c5/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 77b91f8..f8fb96b 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -37,6 +37,8 @@ private[spark] class YarnClientSchedulerBackend(
 
   var client: Client = null
   var appId: ApplicationId = null
+  var checkerThread: Thread = null
+  var stopping: Boolean = false
 
   private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
       arrayBuf: ArrayBuffer[String]) {
@@ -86,6 +88,7 @@ private[spark] class YarnClientSchedulerBackend(
     client = new Client(args, conf)
     appId = client.runApp()
     waitForApp()
+    checkerThread = yarnApplicationStateCheckerThread()
   }
 
   def waitForApp() {
@@ -116,7 +119,32 @@ private[spark] class YarnClientSchedulerBackend(
     }
   }
 
+  private def yarnApplicationStateCheckerThread(): Thread = {
+    val t = new Thread {
+      override def run() {
+        while (!stopping) {
+          val report = client.getApplicationReport(appId)
+          val state = report.getYarnApplicationState()
+          if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.KILLED
+            || state == YarnApplicationState.FAILED) {
+            logError(s"Yarn application already ended: $state")
+            sc.stop()
+            stopping = true
+          }
+          Thread.sleep(1000L)
+        }
+        checkerThread = null
+        Thread.currentThread().interrupt()
+      }
+    }
+    t.setName("Yarn Application State Checker")
+    t.setDaemon(true)
+    t.start()
+    t
+  }
+
   override def stop() {
+    stopping = true
     super.stop()
     client.stop
     logInfo("Stopped")

http://git-wip-us.apache.org/repos/asf/spark/blob/323a83c5/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index 7158d94..fc7b832 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -19,15 +19,12 @@ package org.apache.spark.deploy.yarn
 
 import java.net.Socket
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 import akka.actor._
 import akka.remote._
-import akka.actor.Terminated
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -57,10 +54,16 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
   private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
 
   private var yarnAllocator: YarnAllocationHandler = _
-  private var driverClosed:Boolean = false
+  private var driverClosed: Boolean = false
+  private var isFinished: Boolean = false
+  private var registered: Boolean = false
 
   private var amClient: AMRMClient[ContainerRequest] = _
 
+  // Default to numExecutors * 2, with minimum of 3
+  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
+    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
+
   val securityManager = new SecurityManager(sparkConf)
   val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
     conf = sparkConf, securityManager = securityManager)._1
@@ -101,7 +104,12 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     amClient.start()
 
     appAttemptId = ApplicationMaster.getApplicationAttemptId()
-    registerApplicationMaster()
+    synchronized {
+      if (!isFinished) {
+        registerApplicationMaster()
+        registered = true
+      }
+    }
 
     waitForSparkMaster()
     addAmIpFilter()
@@ -210,6 +218,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     yarnAllocator.addResourceRequests(args.numExecutors)
     yarnAllocator.allocateResources()
     while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
+      checkNumExecutorsFailed()
       allocateMissingExecutor()
       yarnAllocator.allocateResources()
       Thread.sleep(100)
@@ -228,12 +237,20 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     }
   }
 
+  private def checkNumExecutorsFailed() {
+    if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+      finishApplicationMaster(FinalApplicationStatus.FAILED,
+        "max number of executor failures reached")
+    }
+  }
+
   private def launchReporterThread(_sleepTime: Long): Thread = {
     val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
 
     val t = new Thread {
       override def run() {
         while (!driverClosed) {
+          checkNumExecutorsFailed()
           allocateMissingExecutor()
           logDebug("Sending progress")
           yarnAllocator.allocateResources()
@@ -248,10 +265,18 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     t
   }
 
-  def finishApplicationMaster(status: FinalApplicationStatus) {
-    logInfo("Unregistering ApplicationMaster with " + status)
-    val trackingUrl = sparkConf.get("spark.driver.appUIHistoryAddress", "")
-    amClient.unregisterApplicationMaster(status, "" /* appMessage */ , trackingUrl)
+  def finishApplicationMaster(status: FinalApplicationStatus, appMessage: String = "") {
+    synchronized {
+      if (isFinished) {
+        return
+      }
+      logInfo("Unregistering ApplicationMaster with " + status)
+      if (registered) {
+        val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "")
+        amClient.unregisterApplicationMaster(status, appMessage, trackingUrl)
+      }
+      isFinished = true
+    }
   }
 
 }