You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/07/25 17:21:51 UTC

spark git commit: [SPARK-21383][YARN] Fix the YarnAllocator allocates more Resource

Repository: spark
Updated Branches:
  refs/heads/master 799e13161 -> 8de080d9f


[SPARK-21383][YARN] Fix the YarnAllocator allocates more Resource

When NodeManagers launching Executors,
the `missing` value will exceed the
real value when the launch is slow, this can lead to YARN allocates more resource.

We add the `numExecutorsRunning` when calculate the `missing` to avoid this.

Test by experiment.

Author: DjvuLee <li...@bytedance.com>

Closes #18651 from djvulee/YarnAllocate.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8de080d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8de080d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8de080d9

Branch: refs/heads/master
Commit: 8de080d9f9d3deac7745f9b3428d97595975701d
Parents: 799e131
Author: DjvuLee <li...@bytedance.com>
Authored: Tue Jul 25 10:21:18 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Tue Jul 25 10:21:25 2017 -0700

----------------------------------------------------------------------
 .../spark/deploy/yarn/YarnAllocator.scala       | 50 +++++++++++++-------
 1 file changed, 33 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8de080d9/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index ed77a6e..cc571c3 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn
 
 import java.util.Collections
 import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicInteger
 import java.util.regex.Pattern
 
 import scala.collection.mutable
@@ -30,7 +31,6 @@ import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.log4j.{Level, Logger}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
@@ -80,7 +80,9 @@ private[yarn] class YarnAllocator(
   private val releasedContainers = Collections.newSetFromMap[ContainerId](
     new ConcurrentHashMap[ContainerId, java.lang.Boolean])
 
-  @volatile private var numExecutorsRunning = 0
+  private val numExecutorsRunning = new AtomicInteger(0)
+
+  private val numExecutorsStarting = new AtomicInteger(0)
 
   /**
    * Used to generate a unique ID per executor
@@ -163,7 +165,7 @@ private[yarn] class YarnAllocator(
     clock = newClock
   }
 
-  def getNumExecutorsRunning: Int = numExecutorsRunning
+  def getNumExecutorsRunning: Int = numExecutorsRunning.get()
 
   def getNumExecutorsFailed: Int = synchronized {
     val endTime = clock.getTimeMillis()
@@ -242,7 +244,7 @@ private[yarn] class YarnAllocator(
     if (executorIdToContainer.contains(executorId)) {
       val container = executorIdToContainer.get(executorId).get
       internalReleaseContainer(container)
-      numExecutorsRunning -= 1
+      numExecutorsRunning.decrementAndGet()
     } else {
       logWarning(s"Attempted to kill unknown executor $executorId!")
     }
@@ -267,10 +269,12 @@ private[yarn] class YarnAllocator(
     val allocatedContainers = allocateResponse.getAllocatedContainers()
 
     if (allocatedContainers.size > 0) {
-      logDebug("Allocated containers: %d. Current executor count: %d. Cluster resources: %s."
+      logDebug(("Allocated containers: %d. Current executor count: %d. " +
+        "Launching executor count: %d. Cluster resources: %s.")
         .format(
           allocatedContainers.size,
-          numExecutorsRunning,
+          numExecutorsRunning.get,
+          numExecutorsStarting.get,
           allocateResponse.getAvailableResources))
 
       handleAllocatedContainers(allocatedContainers.asScala)
@@ -281,7 +285,7 @@ private[yarn] class YarnAllocator(
       logDebug("Completed %d containers".format(completedContainers.size))
       processCompletedContainers(completedContainers.asScala)
       logDebug("Finished processing %d completed containers. Current running executor count: %d."
-        .format(completedContainers.size, numExecutorsRunning))
+        .format(completedContainers.size, numExecutorsRunning.get))
     }
   }
 
@@ -294,7 +298,11 @@ private[yarn] class YarnAllocator(
   def updateResourceRequests(): Unit = {
     val pendingAllocate = getPendingAllocate
     val numPendingAllocate = pendingAllocate.size
-    val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
+    val missing = targetNumExecutors - numPendingAllocate -
+      numExecutorsStarting.get - numExecutorsRunning.get
+    logDebug(s"Updating resource requests, target: $targetNumExecutors, " +
+      s"pending: $numPendingAllocate, running: ${numExecutorsRunning.get}, " +
+      s"executorsStarting: ${numExecutorsStarting.get}")
 
     if (missing > 0) {
       logInfo(s"Will request $missing executor container(s), each with " +
@@ -493,7 +501,8 @@ private[yarn] class YarnAllocator(
         s"for executor with ID $executorId")
 
       def updateInternalState(): Unit = synchronized {
-        numExecutorsRunning += 1
+        numExecutorsRunning.incrementAndGet()
+        numExecutorsStarting.decrementAndGet()
         executorIdToContainer(executorId) = container
         containerIdToExecutorId(container.getId) = executorId
 
@@ -503,7 +512,8 @@ private[yarn] class YarnAllocator(
         allocatedContainerToHostMap.put(containerId, executorHostname)
       }
 
-      if (numExecutorsRunning < targetNumExecutors) {
+      if (numExecutorsRunning.get < targetNumExecutors) {
+        numExecutorsStarting.incrementAndGet()
         if (launchContainers) {
           launcherPool.execute(new Runnable {
             override def run(): Unit = {
@@ -523,11 +533,16 @@ private[yarn] class YarnAllocator(
                 ).run()
                 updateInternalState()
               } catch {
-                case NonFatal(e) =>
-                  logError(s"Failed to launch executor $executorId on container $containerId", e)
-                  // Assigned container should be released immediately to avoid unnecessary resource
-                  // occupation.
-                  amClient.releaseAssignedContainer(containerId)
+                case e: Throwable =>
+                  numExecutorsStarting.decrementAndGet()
+                  if (NonFatal(e)) {
+                    logError(s"Failed to launch executor $executorId on container $containerId", e)
+                    // Assigned container should be released immediately
+                    // to avoid unnecessary resource occupation.
+                    amClient.releaseAssignedContainer(containerId)
+                  } else {
+                    throw e
+                  }
               }
             }
           })
@@ -537,7 +552,8 @@ private[yarn] class YarnAllocator(
         }
       } else {
         logInfo(("Skip launching executorRunnable as runnning Excecutors count: %d " +
-          "reached target Executors count: %d.").format(numExecutorsRunning, targetNumExecutors))
+          "reached target Executors count: %d.").format(
+          numExecutorsRunning.get, targetNumExecutors))
       }
     }
   }
@@ -552,7 +568,7 @@ private[yarn] class YarnAllocator(
       val exitReason = if (!alreadyReleased) {
         // Decrement the number of executors running. The next iteration of
         // the ApplicationMaster's reporting thread will take care of allocating.
-        numExecutorsRunning -= 1
+        numExecutorsRunning.decrementAndGet()
         logInfo("Completed container %s%s (state: %s, exit status: %s)".format(
           containerId,
           onHostStr,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org