You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2016/03/14 19:19:08 UTC

spark git commit: [SPARK-13779][YARN] Avoid cancelling non-local container requests.

Repository: spark
Updated Branches:
  refs/heads/master 45f8053be -> 63f642aea


[SPARK-13779][YARN] Avoid cancelling non-local container requests.

To maximize locality, the YarnAllocator would cancel any requests with a
stale locality preference or no locality preference. This assumed that
the majority of tasks had locality preferences, but may not be the case
when scanning S3. This caused container requests for S3 tasks to be
constantly cancelled and resubmitted.

This changes the allocator's logic to cancel only stale requests and
just enough requests without locality preferences to submit requests
with locality preferences. This avoids cancelling requests without
locality preferences that would be resubmitted without locality
preferences.

We've deployed this patch on our clusters and verified that jobs that couldn't get executors because they kept canceling and resubmitting requests are fixed. Large jobs are running fine.

Author: Ryan Blue <bl...@apache.org>

Closes #11612 from rdblue/SPARK-13779-fix-yarn-allocator-requests.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/63f642ae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/63f642ae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/63f642ae

Branch: refs/heads/master
Commit: 63f642aea31fe0d202ce585681d51e7ac1715ba7
Parents: 45f8053
Author: Ryan Blue <bl...@apache.org>
Authored: Mon Mar 14 11:18:32 2016 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Mon Mar 14 11:18:37 2016 -0700

----------------------------------------------------------------------
 .../spark/deploy/yarn/YarnAllocator.scala       | 58 ++++++++++++++++----
 1 file changed, 46 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/63f642ae/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index a96cb49..e34cd8d 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -265,25 +265,52 @@ private[yarn] class YarnAllocator(
       // For locality unmatched and locality free container requests, cancel these container
       // requests, since required locality preference has been changed, recalculating using
       // container placement strategy.
-      val (localityMatched, localityUnMatched, localityFree) = splitPendingAllocationsByLocality(
+      val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
         hostToLocalTaskCounts, pendingAllocate)
 
-      // Remove the outdated container request and recalculate the requested container number
-      localityUnMatched.foreach(amClient.removeContainerRequest)
-      localityFree.foreach(amClient.removeContainerRequest)
-      val updatedNumContainer = missing + localityUnMatched.size + localityFree.size
+      // cancel "stale" requests for locations that are no longer needed
+      staleRequests.foreach { stale =>
+        amClient.removeContainerRequest(stale)
+      }
+      val cancelledContainers = staleRequests.size
+      logInfo(s"Canceled $cancelledContainers container requests (locality no longer needed)")
+
+      // consider the number of new containers and cancelled stale containers available
+      val availableContainers = missing + cancelledContainers
+
+      // to maximize locality, include requests with no locality preference that can be cancelled
+      val potentialContainers = availableContainers + anyHostRequests.size
 
       val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
-        updatedNumContainer, numLocalityAwareTasks, hostToLocalTaskCounts,
-          allocatedHostToContainersMap, localityMatched)
+        potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,
+          allocatedHostToContainersMap, localRequests)
+
+      val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]
+      containerLocalityPreferences.foreach {
+        case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
+          newLocalityRequests.append(createContainerRequest(resource, nodes, racks))
+        case _ =>
+      }
 
-      for (locality <- containerLocalityPreferences) {
-        val request = createContainerRequest(resource, locality.nodes, locality.racks)
+      if (availableContainers >= newLocalityRequests.size) {
+        // more containers are available than needed for locality, fill in requests for any host
+        for (i <- 0 until (availableContainers - newLocalityRequests.size)) {
+          newLocalityRequests.append(createContainerRequest(resource, null, null))
+        }
+      } else {
+        val numToCancel = newLocalityRequests.size - availableContainers
+        // cancel some requests without locality preferences to schedule more local containers
+        anyHostRequests.slice(0, numToCancel).foreach { nonLocal =>
+          amClient.removeContainerRequest(nonLocal)
+        }
+        logInfo(s"Canceled $numToCancel container requests for any host to resubmit with locality")
+      }
+
+      newLocalityRequests.foreach { request =>
         amClient.addContainerRequest(request)
-        val nodes = request.getNodes
-        val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.asScala.last
-        logInfo(s"Container request (host: $hostStr, capability: $resource)")
+        logInfo(s"Submitted container request (host: ${hostStr(request)}, capability: $resource)")
       }
+
     } else if (missing < 0) {
       val numToCancel = math.min(numPendingAllocate, -missing)
       logInfo(s"Canceling requests for $numToCancel executor containers")
@@ -298,6 +325,13 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  private def hostStr(request: ContainerRequest): String = {
+    Option(request.getNodes) match {
+      case Some(nodes) => nodes.asScala.mkString(",")
+      case None => "Any"
+    }
+  }
+
   /**
    * Creates a container request, handling the reflection required to use YARN features that were
    * added in recent versions.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org