You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/12/20 18:36:13 UTC
[spark] branch branch-2.4 updated: [SPARK-26392][YARN] Cancel
pending allocate requests by taking locality preference into account
This is an automated email from the ASF dual-hosted git repository.
vanzin pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new daeb081 [SPARK-26392][YARN] Cancel pending allocate requests by taking locality preference into account
daeb081 is described below
commit daeb0811058c76e2d6cecb6de5ebe287c3be3a94
Author: Ngone51 <ng...@163.com>
AuthorDate: Thu Dec 20 10:25:52 2018 -0800
[SPARK-26392][YARN] Cancel pending allocate requests by taking locality preference into account
## What changes were proposed in this pull request?
Right now, we cancel pending allocate requests by its sending order. I thing we can take
locality preference into account when do this to perfom least impact on task locality preference.
## How was this patch tested?
N.A.
Closes #23344 from Ngone51/dev-cancel-pending-allocate-requests-by-taking-locality-preference-into-account.
Authored-by: Ngone51 <ng...@163.com>
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
(cherry picked from commit 3d6b44d9ea92dc1eabb8f211176861e51240bf93)
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
.../apache/spark/deploy/yarn/YarnAllocator.scala | 29 +++++++++-------------
1 file changed, 12 insertions(+), 17 deletions(-)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 8a7551d..f4dc80a 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -287,20 +287,20 @@ private[yarn] class YarnAllocator(
s"pending: $numPendingAllocate, running: ${runningExecutors.size}, " +
s"executorsStarting: ${numExecutorsStarting.get}")
+ // Split the pending container request into three groups: locality matched list, locality
+ // unmatched list and non-locality list. Take the locality matched container request into
+ // consideration of container placement, treat as allocated containers.
+ // For locality unmatched and locality free container requests, cancel these container
+ // requests, since required locality preference has been changed, recalculating using
+ // container placement strategy.
+ val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
+ hostToLocalTaskCounts, pendingAllocate)
+
if (missing > 0) {
logInfo(s"Will request $missing executor container(s), each with " +
s"${resource.getVirtualCores} core(s) and " +
s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)")
- // Split the pending container request into three groups: locality matched list, locality
- // unmatched list and non-locality list. Take the locality matched container request into
- // consideration of container placement, treat as allocated containers.
- // For locality unmatched and locality free container requests, cancel these container
- // requests, since required locality preference has been changed, recalculating using
- // container placement strategy.
- val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
- hostToLocalTaskCounts, pendingAllocate)
-
// cancel "stale" requests for locations that are no longer needed
staleRequests.foreach { stale =>
amClient.removeContainerRequest(stale)
@@ -360,14 +360,9 @@ private[yarn] class YarnAllocator(
val numToCancel = math.min(numPendingAllocate, -missing)
logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " +
s"total $targetNumExecutors executors.")
-
- val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)
- if (!matchingRequests.isEmpty) {
- matchingRequests.iterator().next().asScala
- .take(numToCancel).foreach(amClient.removeContainerRequest)
- } else {
- logWarning("Expected to find pending requests, but found none.")
- }
+ // cancel pending allocate requests by taking locality preference into account
+ val cancelRequests = (staleRequests ++ anyHostRequests ++ localRequests).take(numToCancel)
+ cancelRequests.foreach(amClient.removeContainerRequest)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org