You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2014/09/13 03:31:17 UTC

git commit: [SPARK-3456] YarnAllocator on alpha can lose container requests to RM

Repository: spark
Updated Branches:
  refs/heads/master af2583826 -> 25311c2c5


[SPARK-3456] YarnAllocator on alpha can lose container requests to RM

Author: Thomas Graves <tg...@apache.org>

Closes #2373 from tgravescs/SPARK-3456 and squashes the following commits:

77e9532 [Thomas Graves] [SPARK-3456] YarnAllocator on alpha can lose container requests to RM


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/25311c2c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/25311c2c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/25311c2c

Branch: refs/heads/master
Commit: 25311c2c545a60eb9dcf704814d4600987852155
Parents: af25838
Author: Thomas Graves <tg...@apache.org>
Authored: Fri Sep 12 20:31:11 2014 -0500
Committer: Thomas Graves <tg...@apache.org>
Committed: Fri Sep 12 20:31:11 2014 -0500

----------------------------------------------------------------------
 .../apache/spark/deploy/yarn/YarnAllocationHandler.scala | 11 ++++++-----
 .../org/apache/spark/deploy/yarn/YarnAllocator.scala     |  8 ++++++--
 .../apache/spark/deploy/yarn/YarnAllocationHandler.scala |  3 ++-
 3 files changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/25311c2c/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 5a1b42c..6c93d85 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -48,16 +48,17 @@ private[yarn] class YarnAllocationHandler(
   private val lastResponseId = new AtomicInteger()
   private val releaseList: CopyOnWriteArrayList[ContainerId] = new CopyOnWriteArrayList()
 
-  override protected def allocateContainers(count: Int): YarnAllocateResponse = {
+  override protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse = {
     var resourceRequests: List[ResourceRequest] = null
 
-    logDebug("numExecutors: " + count)
+    logDebug("asking for additional executors: " + count + " with already pending: " + pending)
+    val totalNumAsk = count + pending
     if (count <= 0) {
       resourceRequests = List()
     } else if (preferredHostToCount.isEmpty) {
         logDebug("host preferences is empty")
         resourceRequests = List(createResourceRequest(
-          AllocationType.ANY, null, count, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
+          AllocationType.ANY, null, totalNumAsk, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
     } else {
       // request for all hosts in preferred nodes and for numExecutors -
       // candidates.size, request by default allocation policy.
@@ -80,7 +81,7 @@ private[yarn] class YarnAllocationHandler(
       val anyContainerRequests: ResourceRequest = createResourceRequest(
         AllocationType.ANY,
         resource = null,
-        count,
+        totalNumAsk,
         YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
 
       val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
@@ -103,7 +104,7 @@ private[yarn] class YarnAllocationHandler(
     req.addAllReleases(releasedContainerList)
 
     if (count > 0) {
-      logInfo("Allocating %d executor containers with %d of memory each.".format(count,
+      logInfo("Allocating %d executor containers with %d of memory each.".format(totalNumAsk,
         executorMemory + memoryOverhead))
     } else {
       logDebug("Empty allocation req ..  release : " + releasedContainerList)

http://git-wip-us.apache.org/repos/asf/spark/blob/25311c2c/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 0b8744f..299e38a 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -112,6 +112,9 @@ private[yarn] abstract class YarnAllocator(
   def allocateResources() = {
     val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
 
+    // this is needed by alpha, do it here since we add numPending right after this
+    val executorsPending = numPendingAllocate.get()
+
     if (missing > 0) {
       numPendingAllocate.addAndGet(missing)
       logInfo("Will Allocate %d executor containers, each with %d memory".format(
@@ -121,7 +124,7 @@ private[yarn] abstract class YarnAllocator(
       logDebug("Empty allocation request ...")
     }
 
-    val allocateResponse = allocateContainers(missing)
+    val allocateResponse = allocateContainers(missing, executorsPending)
     val allocatedContainers = allocateResponse.getAllocatedContainers()
 
     if (allocatedContainers.size > 0) {
@@ -435,9 +438,10 @@ private[yarn] abstract class YarnAllocator(
    *
    * @param count Number of containers to allocate.
    *              If zero, should still contact RM (as a heartbeat).
+   * @param pending Number of containers pending allocate. Only used on alpha.
    * @return Response to the allocation request.
    */
-  protected def allocateContainers(count: Int): YarnAllocateResponse
+  protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse
 
   /** Called to release a previously allocated container. */
   protected def releaseContainer(container: Container): Unit

http://git-wip-us.apache.org/repos/asf/spark/blob/25311c2c/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 5438f15..e44a8db 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -47,7 +47,8 @@ private[yarn] class YarnAllocationHandler(
     amClient.releaseAssignedContainer(container.getId())
   }
 
-  override protected def allocateContainers(count: Int): YarnAllocateResponse = {
+  // pending isn't used on stable as the AMRMClient handles incremental asks
+  override protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse = {
     addResourceRequests(count)
 
     // We have already set the container request. Poll the ResourceManager for a response.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org