You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/12/20 18:36:13 UTC

[spark] branch branch-2.4 updated: [SPARK-26392][YARN] Cancel pending allocate requests by taking locality preference into account

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new daeb081  [SPARK-26392][YARN] Cancel pending allocate requests by taking locality preference into account
daeb081 is described below

commit daeb0811058c76e2d6cecb6de5ebe287c3be3a94
Author: Ngone51 <ng...@163.com>
AuthorDate: Thu Dec 20 10:25:52 2018 -0800

    [SPARK-26392][YARN] Cancel pending allocate requests by taking locality preference into account
    
    ## What changes were proposed in this pull request?
    
    Right now, we cancel pending allocate requests by its sending order. I thing we can take
    
    locality preference into account when do this to perfom least impact on task locality preference.
    
    ## How was this patch tested?
    
    N.A.
    
    Closes #23344 from Ngone51/dev-cancel-pending-allocate-requests-by-taking-locality-preference-into-account.
    
    Authored-by: Ngone51 <ng...@163.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
    (cherry picked from commit 3d6b44d9ea92dc1eabb8f211176861e51240bf93)
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../apache/spark/deploy/yarn/YarnAllocator.scala   | 29 +++++++++-------------
 1 file changed, 12 insertions(+), 17 deletions(-)

diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 8a7551d..f4dc80a 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -287,20 +287,20 @@ private[yarn] class YarnAllocator(
       s"pending: $numPendingAllocate, running: ${runningExecutors.size}, " +
       s"executorsStarting: ${numExecutorsStarting.get}")
 
+    // Split the pending container request into three groups: locality matched list, locality
+    // unmatched list and non-locality list. Take the locality matched container request into
+    // consideration of container placement, treat as allocated containers.
+    // For locality unmatched and locality free container requests, cancel these container
+    // requests, since required locality preference has been changed, recalculating using
+    // container placement strategy.
+    val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
+      hostToLocalTaskCounts, pendingAllocate)
+
     if (missing > 0) {
       logInfo(s"Will request $missing executor container(s), each with " +
         s"${resource.getVirtualCores} core(s) and " +
         s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)")
 
-      // Split the pending container request into three groups: locality matched list, locality
-      // unmatched list and non-locality list. Take the locality matched container request into
-      // consideration of container placement, treat as allocated containers.
-      // For locality unmatched and locality free container requests, cancel these container
-      // requests, since required locality preference has been changed, recalculating using
-      // container placement strategy.
-      val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
-        hostToLocalTaskCounts, pendingAllocate)
-
       // cancel "stale" requests for locations that are no longer needed
       staleRequests.foreach { stale =>
         amClient.removeContainerRequest(stale)
@@ -360,14 +360,9 @@ private[yarn] class YarnAllocator(
       val numToCancel = math.min(numPendingAllocate, -missing)
       logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " +
         s"total $targetNumExecutors executors.")
-
-      val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)
-      if (!matchingRequests.isEmpty) {
-        matchingRequests.iterator().next().asScala
-          .take(numToCancel).foreach(amClient.removeContainerRequest)
-      } else {
-        logWarning("Expected to find pending requests, but found none.")
-      }
+      // cancel pending allocate requests by taking locality preference into account
+      val cancelRequests = (staleRequests ++ anyHostRequests ++ localRequests).take(numToCancel)
+      cancelRequests.foreach(amClient.removeContainerRequest)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org