You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/07/25 17:21:51 UTC
spark git commit: [SPARK-21383][YARN] Fix the YarnAllocator allocates
more Resource
Repository: spark
Updated Branches:
refs/heads/master 799e13161 -> 8de080d9f
[SPARK-21383][YARN] Fix the YarnAllocator allocates more Resource
When NodeManagers launching Executors,
the `missing` value will exceed the
real value when the launch is slow, this can lead to YARN allocates more resource.
We add the `numExecutorsRunning` when calculate the `missing` to avoid this.
Test by experiment.
Author: DjvuLee <li...@bytedance.com>
Closes #18651 from djvulee/YarnAllocate.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8de080d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8de080d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8de080d9
Branch: refs/heads/master
Commit: 8de080d9f9d3deac7745f9b3428d97595975701d
Parents: 799e131
Author: DjvuLee <li...@bytedance.com>
Authored: Tue Jul 25 10:21:18 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Tue Jul 25 10:21:25 2017 -0700
----------------------------------------------------------------------
.../spark/deploy/yarn/YarnAllocator.scala | 50 +++++++++++++-------
1 file changed, 33 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8de080d9/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index ed77a6e..cc571c3 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn
import java.util.Collections
import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern
import scala.collection.mutable
@@ -30,7 +31,6 @@ import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
@@ -80,7 +80,9 @@ private[yarn] class YarnAllocator(
private val releasedContainers = Collections.newSetFromMap[ContainerId](
new ConcurrentHashMap[ContainerId, java.lang.Boolean])
- @volatile private var numExecutorsRunning = 0
+ private val numExecutorsRunning = new AtomicInteger(0)
+
+ private val numExecutorsStarting = new AtomicInteger(0)
/**
* Used to generate a unique ID per executor
@@ -163,7 +165,7 @@ private[yarn] class YarnAllocator(
clock = newClock
}
- def getNumExecutorsRunning: Int = numExecutorsRunning
+ def getNumExecutorsRunning: Int = numExecutorsRunning.get()
def getNumExecutorsFailed: Int = synchronized {
val endTime = clock.getTimeMillis()
@@ -242,7 +244,7 @@ private[yarn] class YarnAllocator(
if (executorIdToContainer.contains(executorId)) {
val container = executorIdToContainer.get(executorId).get
internalReleaseContainer(container)
- numExecutorsRunning -= 1
+ numExecutorsRunning.decrementAndGet()
} else {
logWarning(s"Attempted to kill unknown executor $executorId!")
}
@@ -267,10 +269,12 @@ private[yarn] class YarnAllocator(
val allocatedContainers = allocateResponse.getAllocatedContainers()
if (allocatedContainers.size > 0) {
- logDebug("Allocated containers: %d. Current executor count: %d. Cluster resources: %s."
+ logDebug(("Allocated containers: %d. Current executor count: %d. " +
+ "Launching executor count: %d. Cluster resources: %s.")
.format(
allocatedContainers.size,
- numExecutorsRunning,
+ numExecutorsRunning.get,
+ numExecutorsStarting.get,
allocateResponse.getAvailableResources))
handleAllocatedContainers(allocatedContainers.asScala)
@@ -281,7 +285,7 @@ private[yarn] class YarnAllocator(
logDebug("Completed %d containers".format(completedContainers.size))
processCompletedContainers(completedContainers.asScala)
logDebug("Finished processing %d completed containers. Current running executor count: %d."
- .format(completedContainers.size, numExecutorsRunning))
+ .format(completedContainers.size, numExecutorsRunning.get))
}
}
@@ -294,7 +298,11 @@ private[yarn] class YarnAllocator(
def updateResourceRequests(): Unit = {
val pendingAllocate = getPendingAllocate
val numPendingAllocate = pendingAllocate.size
- val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
+ val missing = targetNumExecutors - numPendingAllocate -
+ numExecutorsStarting.get - numExecutorsRunning.get
+ logDebug(s"Updating resource requests, target: $targetNumExecutors, " +
+ s"pending: $numPendingAllocate, running: ${numExecutorsRunning.get}, " +
+ s"executorsStarting: ${numExecutorsStarting.get}")
if (missing > 0) {
logInfo(s"Will request $missing executor container(s), each with " +
@@ -493,7 +501,8 @@ private[yarn] class YarnAllocator(
s"for executor with ID $executorId")
def updateInternalState(): Unit = synchronized {
- numExecutorsRunning += 1
+ numExecutorsRunning.incrementAndGet()
+ numExecutorsStarting.decrementAndGet()
executorIdToContainer(executorId) = container
containerIdToExecutorId(container.getId) = executorId
@@ -503,7 +512,8 @@ private[yarn] class YarnAllocator(
allocatedContainerToHostMap.put(containerId, executorHostname)
}
- if (numExecutorsRunning < targetNumExecutors) {
+ if (numExecutorsRunning.get < targetNumExecutors) {
+ numExecutorsStarting.incrementAndGet()
if (launchContainers) {
launcherPool.execute(new Runnable {
override def run(): Unit = {
@@ -523,11 +533,16 @@ private[yarn] class YarnAllocator(
).run()
updateInternalState()
} catch {
- case NonFatal(e) =>
- logError(s"Failed to launch executor $executorId on container $containerId", e)
- // Assigned container should be released immediately to avoid unnecessary resource
- // occupation.
- amClient.releaseAssignedContainer(containerId)
+ case e: Throwable =>
+ numExecutorsStarting.decrementAndGet()
+ if (NonFatal(e)) {
+ logError(s"Failed to launch executor $executorId on container $containerId", e)
+ // Assigned container should be released immediately
+ // to avoid unnecessary resource occupation.
+ amClient.releaseAssignedContainer(containerId)
+ } else {
+ throw e
+ }
}
}
})
@@ -537,7 +552,8 @@ private[yarn] class YarnAllocator(
}
} else {
logInfo(("Skip launching executorRunnable as runnning Excecutors count: %d " +
- "reached target Executors count: %d.").format(numExecutorsRunning, targetNumExecutors))
+ "reached target Executors count: %d.").format(
+ numExecutorsRunning.get, targetNumExecutors))
}
}
}
@@ -552,7 +568,7 @@ private[yarn] class YarnAllocator(
val exitReason = if (!alreadyReleased) {
// Decrement the number of executors running. The next iteration of
// the ApplicationMaster's reporting thread will take care of allocating.
- numExecutorsRunning -= 1
+ numExecutorsRunning.decrementAndGet()
logInfo("Completed container %s%s (state: %s, exit status: %s)".format(
containerId,
onHostStr,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org