You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2014/09/03 15:22:57 UTC

git commit: [SPARK-3187] [yarn] Cleanup allocator code.

Repository: spark
Updated Branches:
  refs/heads/master c64cc435e -> 6a72a3694


[SPARK-3187] [yarn] Cleanup allocator code.

Move all shared logic to the base YarnAllocator class, and leave
the version-specific logic in the version-specific module.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #2169 from vanzin/SPARK-3187 and squashes the following commits:

46c2826 [Marcelo Vanzin] Hide the privates.
4dc9c83 [Marcelo Vanzin] Actually release containers.
8b1a077 [Marcelo Vanzin] Changes to the Yarn alpha allocator.
f3f5f1d [Marcelo Vanzin] [SPARK-3187] [yarn] Cleanup allocator code.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a72a369
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a72a369
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a72a369

Branch: refs/heads/master
Commit: 6a72a36940311fcb3429bd34c8818bc7d513115c
Parents: c64cc43
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Wed Sep 3 08:22:50 2014 -0500
Committer: Thomas Graves <tg...@apache.org>
Committed: Wed Sep 3 08:22:50 2014 -0500

----------------------------------------------------------------------
 .../deploy/yarn/YarnAllocationHandler.scala     | 462 +++----------------
 .../spark/deploy/yarn/YarnAllocator.scala       | 425 ++++++++++++++++-
 .../deploy/yarn/YarnAllocationHandler.scala     | 402 +---------------
 3 files changed, 495 insertions(+), 794 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6a72a369/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 629cd13..9f9e16c 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -17,35 +17,21 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
+import java.util.concurrent.CopyOnWriteArrayList
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.mutable.{ArrayBuffer, HashMap}
 
-import org.apache.spark.{Logging, SparkConf, SparkEnv}
-import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.Utils
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.SplitInfo
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.yarn.api.AMRMProtocol
-import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId}
-import org.apache.hadoop.yarn.api.records.{Container, ContainerId}
-import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest}
-import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest
 import org.apache.hadoop.yarn.util.Records
 
-// TODO:
-// Too many params.
-// Needs to be mt-safe
-// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
-// make it more proactive and decoupled.
-
-// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
-// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
-// more info on how we are requesting for containers.
-
 /**
  * Acquires resources for executors from a ResourceManager and launches executors in new containers.
  */
@@ -56,357 +42,20 @@ private[yarn] class YarnAllocationHandler(
     appAttemptId: ApplicationAttemptId,
     args: ApplicationMasterArguments,
     preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
-  extends YarnAllocator with Logging {
-
-  // These three are locked on allocatedHostToContainersMap. Complementary data structures
-  // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
-  // allocatedContainerToHostMap: container to host mapping.
-  private val allocatedHostToContainersMap =
-    new HashMap[String, collection.mutable.Set[ContainerId]]()
-
-  private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
-
-  // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
-  // allocated node)
-  // As with the two data structures above, tightly coupled with them, and to be locked on
-  // allocatedHostToContainersMap
-  private val allocatedRackCount = new HashMap[String, Int]()
-
-  // Containers which have been released.
-  private val releasedContainerList = new CopyOnWriteArrayList[ContainerId]()
-  // Containers to be released in next request to RM
-  private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
-
-  // Additional memory overhead - in mb.
-  private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
-    YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
-
-  private val numExecutorsRunning = new AtomicInteger()
-  // Used to generate a unique id per executor
-  private val executorIdCounter = new AtomicInteger()
-  private val lastResponseId = new AtomicInteger()
-  private val numExecutorsFailed = new AtomicInteger()
-
-  private val maxExecutors = args.numExecutors
-  private val executorMemory = args.executorMemory
-  private val executorCores = args.executorCores
-  private val (preferredHostToCount, preferredRackToCount) =
-    generateNodeToWeight(conf, preferredNodes)
-
-  def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
-
-  def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
-
-  def isResourceConstraintSatisfied(container: Container): Boolean = {
-    container.getResource.getMemory >= (executorMemory + memoryOverhead)
-  }
-
-  override def allocateResources() = {
-    // We need to send the request only once from what I understand ... but for now, not modifying
-    // this much.
-    val executorsToRequest = Math.max(maxExecutors - numExecutorsRunning.get(), 0)
-
-    // Keep polling the Resource Manager for containers
-    val amResp = allocateExecutorResources(executorsToRequest).getAMResponse
-
-    val _allocatedContainers = amResp.getAllocatedContainers()
-
-    if (_allocatedContainers.size > 0) {
-      logDebug("""
-        Allocated containers: %d
-        Current executor count: %d
-        Containers released: %s
-        Containers to be released: %s
-        Cluster resources: %s
-        """.format(
-          _allocatedContainers.size,
-          numExecutorsRunning.get(),
-          releasedContainerList,
-          pendingReleaseContainers,
-          amResp.getAvailableResources))
-
-      val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
-
-      // Ignore if not satisfying constraints      {
-      for (container <- _allocatedContainers) {
-        if (isResourceConstraintSatisfied(container)) {
-          // allocatedContainers += container
-
-          val host = container.getNodeId.getHost
-          val containers = hostToContainers.getOrElseUpdate(host, new ArrayBuffer[Container]())
-
-          containers += container
-        } else {
-          // Add all ignored containers to released list
-          releasedContainerList.add(container.getId())
-        }
-      }
-
-      // Find the appropriate containers to use. Slightly non trivial groupBy ...
-      val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
-      val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
-      val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
-
-      for (candidateHost <- hostToContainers.keySet)
-      {
-        val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
-        val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
-
-        var remainingContainers = hostToContainers.get(candidateHost).getOrElse(null)
-        assert(remainingContainers != null)
-
-        if (requiredHostCount >= remainingContainers.size){
-          // Since we got <= required containers, add all to dataLocalContainers
-          dataLocalContainers.put(candidateHost, remainingContainers)
-          // all consumed
-          remainingContainers = null
-        } else if (requiredHostCount > 0) {
-          // Container list has more containers than we need for data locality.
-          // Split into two : data local container count of (remainingContainers.size -
-          // requiredHostCount) and rest as remainingContainer
-          val (dataLocal, remaining) = remainingContainers.splitAt(
-            remainingContainers.size - requiredHostCount)
-          dataLocalContainers.put(candidateHost, dataLocal)
-          // remainingContainers = remaining
-
-          // yarn has nasty habit of allocating a tonne of containers on a host - discourage this :
-          // add remaining to release list. If we have insufficient containers, next allocation
-          // cycle will reallocate (but wont treat it as data local)
-          for (container <- remaining) releasedContainerList.add(container.getId())
-          remainingContainers = null
-        }
-
-        // Now rack local
-        if (remainingContainers != null){
-          val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
-
-          if (rack != null){
-            val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
-            val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
-              rackLocalContainers.get(rack).getOrElse(List()).size
-
-
-            if (requiredRackCount >= remainingContainers.size){
-              // Add all to dataLocalContainers
-              dataLocalContainers.put(rack, remainingContainers)
-              // All consumed
-              remainingContainers = null
-            } else if (requiredRackCount > 0) {
-              // container list has more containers than we need for data locality.
-              // Split into two : data local container count of (remainingContainers.size -
-              // requiredRackCount) and rest as remainingContainer
-              val (rackLocal, remaining) = remainingContainers.splitAt(
-                remainingContainers.size - requiredRackCount)
-              val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
-                new ArrayBuffer[Container]())
-
-              existingRackLocal ++= rackLocal
-              remainingContainers = remaining
-            }
-          }
-        }
-
-        // If still not consumed, then it is off rack host - add to that list.
-        if (remainingContainers != null){
-          offRackContainers.put(candidateHost, remainingContainers)
-        }
-      }
-
-      // Now that we have split the containers into various groups, go through them in order :
-      // first host local, then rack local and then off rack (everything else).
-      // Note that the list we create below tries to ensure that not all containers end up within a
-      // host if there are sufficiently large number of hosts/containers.
-
-      val allocatedContainers = new ArrayBuffer[Container](_allocatedContainers.size)
-      allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
-      allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
-      allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
-
-      // Run each of the allocated containers
-      for (container <- allocatedContainers) {
-        val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
-        val executorHostname = container.getNodeId.getHost
-        val containerId = container.getId
-
-        assert( container.getResource.getMemory >=
-          (executorMemory + memoryOverhead))
-
-        if (numExecutorsRunningNow > maxExecutors) {
-          logInfo("""Ignoring container %s at host %s, since we already have the required number of
-            containers for it.""".format(containerId, executorHostname))
-          releasedContainerList.add(containerId)
-          // reset counter back to old value.
-          numExecutorsRunning.decrementAndGet()
-        } else {
-          // Deallocate + allocate can result in reusing id's wrongly - so use a different counter
-          // (executorIdCounter)
-          val executorId = executorIdCounter.incrementAndGet().toString
-          val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
-            SparkEnv.driverActorSystemName,
-            sparkConf.get("spark.driver.host"),
-            sparkConf.get("spark.driver.port"),
-            CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
-          logInfo("launching container on " + containerId + " host " + executorHostname)
-          // Just to be safe, simply remove it from pendingReleaseContainers.
-          // Should not be there, but ..
-          pendingReleaseContainers.remove(containerId)
-
-          val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname)
-          allocatedHostToContainersMap.synchronized {
-            val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
-              new HashSet[ContainerId]())
-
-            containerSet += containerId
-            allocatedContainerToHostMap.put(containerId, executorHostname)
-            if (rack != null) {
-              allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
-            }
-          }
-
-          new Thread(
-            new ExecutorRunnable(container, conf, sparkConf, driverUrl, executorId,
-              executorHostname, executorMemory, executorCores)
-          ).start()
-        }
-      }
-      logDebug("""
-        Finished processing %d containers.
-        Current number of executors running: %d,
-        releasedContainerList: %s,
-        pendingReleaseContainers: %s
-        """.format(
-          allocatedContainers.size,
-          numExecutorsRunning.get(),
-          releasedContainerList,
-          pendingReleaseContainers))
-    }
-
-
-    val completedContainers = amResp.getCompletedContainersStatuses()
-    if (completedContainers.size > 0){
-      logDebug("Completed %d containers, to-be-released: %s".format(
-        completedContainers.size, releasedContainerList))
-      for (completedContainer <- completedContainers){
-        val containerId = completedContainer.getContainerId
-
-        // Was this released by us ? If yes, then simply remove from containerSet and move on.
-        if (pendingReleaseContainers.containsKey(containerId)) {
-          pendingReleaseContainers.remove(containerId)
-        } else {
-          // Simply decrement count - next iteration of ReporterThread will take care of allocating.
-          numExecutorsRunning.decrementAndGet()
-          logInfo("Completed container %s (state: %s, exit status: %s)".format(
-            containerId,
-            completedContainer.getState,
-            completedContainer.getExitStatus()))
-          // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
-          // there are some exit status' we shouldn't necessarily count against us, but for
-          // now I think its ok as none of the containers are expected to exit
-          if (completedContainer.getExitStatus() != 0) {
-            logInfo("Container marked as failed: " + containerId)
-            numExecutorsFailed.incrementAndGet()
-          }
-        }
-
-        allocatedHostToContainersMap.synchronized {
-          if (allocatedContainerToHostMap.containsKey(containerId)) {
-            val host = allocatedContainerToHostMap.get(containerId).getOrElse(null)
-            assert (host != null)
-
-            val containerSet = allocatedHostToContainersMap.get(host).getOrElse(null)
-            assert (containerSet != null)
-
-            containerSet -= containerId
-            if (containerSet.isEmpty) {
-              allocatedHostToContainersMap.remove(host)
-            } else {
-              allocatedHostToContainersMap.update(host, containerSet)
-            }
-
-            allocatedContainerToHostMap -= containerId
-
-            // Doing this within locked context, sigh ... move to outside ?
-            val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
-            if (rack != null) {
-              val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
-              if (rackCount > 0) {
-                allocatedRackCount.put(rack, rackCount)
-              } else {
-                allocatedRackCount.remove(rack)
-              }
-            }
-          }
-        }
-      }
-      logDebug("""
-        Finished processing %d completed containers.
-        Current number of executors running: %d,
-        releasedContainerList: %s,
-        pendingReleaseContainers: %s
-        """.format(
-          completedContainers.size,
-          numExecutorsRunning.get(),
-          releasedContainerList,
-          pendingReleaseContainers))
-    }
-  }
-
-  def createRackResourceRequests(hostContainers: List[ResourceRequest]): List[ResourceRequest] = {
-    // First generate modified racks and new set of hosts under it : then issue requests
-    val rackToCounts = new HashMap[String, Int]()
-
-    // Within this lock - used to read/write to the rack related maps too.
-    for (container <- hostContainers) {
-      val candidateHost = container.getHostName
-      val candidateNumContainers = container.getNumContainers
-      assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
-
-      val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
-      if (rack != null) {
-        var count = rackToCounts.getOrElse(rack, 0)
-        count += candidateNumContainers
-        rackToCounts.put(rack, count)
-      }
-    }
-
-    val requestedContainers: ArrayBuffer[ResourceRequest] =
-      new ArrayBuffer[ResourceRequest](rackToCounts.size)
-    for ((rack, count) <- rackToCounts){
-      requestedContainers +=
-        createResourceRequest(AllocationType.RACK, rack, count,
-          YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
-    }
-
-    requestedContainers.toList
-  }
-
-  def allocatedContainersOnHost(host: String): Int = {
-    var retval = 0
-    allocatedHostToContainersMap.synchronized {
-      retval = allocatedHostToContainersMap.getOrElse(host, Set()).size
-    }
-    retval
-  }
+  extends YarnAllocator(conf, sparkConf, args, preferredNodes) {
 
-  def allocatedContainersOnRack(rack: String): Int = {
-    var retval = 0
-    allocatedHostToContainersMap.synchronized {
-      retval = allocatedRackCount.getOrElse(rack, 0)
-    }
-    retval
-  }
-
-  private def allocateExecutorResources(numExecutors: Int): AllocateResponse = {
+  private val lastResponseId = new AtomicInteger()
+  private val releaseList: CopyOnWriteArrayList[ContainerId] = new CopyOnWriteArrayList()
 
+  override protected def allocateContainers(count: Int): YarnAllocateResponse = {
     var resourceRequests: List[ResourceRequest] = null
 
-      // default.
-    if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
-      logDebug("numExecutors: " + numExecutors + ", host preferences: " +
+    // default.
+    if (count <= 0 || preferredHostToCount.isEmpty) {
+      logDebug("numExecutors: " + count + ", host preferences: " +
         preferredHostToCount.isEmpty)
       resourceRequests = List(createResourceRequest(
-        AllocationType.ANY, null, numExecutors, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
+        AllocationType.ANY, null, count, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
     } else {
       // request for all hosts in preferred nodes and for numExecutors -
       // candidates.size, request by default allocation policy.
@@ -429,7 +78,7 @@ private[yarn] class YarnAllocationHandler(
       val anyContainerRequests: ResourceRequest = createResourceRequest(
         AllocationType.ANY,
         resource = null,
-        numExecutors,
+        count,
         YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
 
       val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
@@ -451,8 +100,8 @@ private[yarn] class YarnAllocationHandler(
     val releasedContainerList = createReleasedContainerList()
     req.addAllReleases(releasedContainerList)
 
-    if (numExecutors > 0) {
-      logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors,
+    if (count > 0) {
+      logInfo("Allocating %d executor containers with %d of memory each.".format(count,
         executorMemory + memoryOverhead))
     } else {
       logDebug("Empty allocation req ..  release : " + releasedContainerList)
@@ -466,9 +115,42 @@ private[yarn] class YarnAllocationHandler(
           request.getPriority,
           request.getCapability))
     }
-    resourceManager.allocate(req)
+    new AlphaAllocateResponse(resourceManager.allocate(req).getAMResponse())
   }
 
+  override protected def releaseContainer(container: Container) = {
+    releaseList.add(container.getId())
+  }
+
+  private def createRackResourceRequests(hostContainers: List[ResourceRequest]):
+    List[ResourceRequest] = {
+    // First generate modified racks and new set of hosts under it : then issue requests
+    val rackToCounts = new HashMap[String, Int]()
+
+    // Within this lock - used to read/write to the rack related maps too.
+    for (container <- hostContainers) {
+      val candidateHost = container.getHostName
+      val candidateNumContainers = container.getNumContainers
+      assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
+
+      val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
+      if (rack != null) {
+        var count = rackToCounts.getOrElse(rack, 0)
+        count += candidateNumContainers
+        rackToCounts.put(rack, count)
+      }
+    }
+
+    val requestedContainers: ArrayBuffer[ResourceRequest] =
+      new ArrayBuffer[ResourceRequest](rackToCounts.size)
+    for ((rack, count) <- rackToCounts){
+      requestedContainers +=
+        createResourceRequest(AllocationType.RACK, rack, count,
+          YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
+    }
+
+    requestedContainers.toList
+  }
 
   private def createResourceRequest(
     requestType: AllocationType.AllocationType,
@@ -521,48 +203,24 @@ private[yarn] class YarnAllocationHandler(
     rsrcRequest
   }
 
-  def createReleasedContainerList(): ArrayBuffer[ContainerId] = {
-
+  private def createReleasedContainerList(): ArrayBuffer[ContainerId] = {
     val retval = new ArrayBuffer[ContainerId](1)
     // Iterator on COW list ...
-    for (container <- releasedContainerList.iterator()){
+    for (container <- releaseList.iterator()){
       retval += container
     }
     // Remove from the original list.
-    if (! retval.isEmpty) {
-      releasedContainerList.removeAll(retval)
-      for (v <- retval) pendingReleaseContainers.put(v, true)
-      logInfo("Releasing " + retval.size + " containers. pendingReleaseContainers : " +
-        pendingReleaseContainers)
+    if (!retval.isEmpty) {
+      releaseList.removeAll(retval)
+      logInfo("Releasing " + retval.size + " containers.")
     }
-
     retval
   }
 
-  // A simple method to copy the split info map.
-  private def generateNodeToWeight(
-    conf: Configuration,
-    input: collection.Map[String, collection.Set[SplitInfo]]) :
-  // host to count, rack to count
-  (Map[String, Int], Map[String, Int]) = {
-
-    if (input == null) return (Map[String, Int](), Map[String, Int]())
-
-    val hostToCount = new HashMap[String, Int]
-    val rackToCount = new HashMap[String, Int]
-
-    for ((host, splits) <- input) {
-      val hostCount = hostToCount.getOrElse(host, 0)
-      hostToCount.put(host, hostCount + splits.size)
-
-      val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
-      if (rack != null){
-        val rackCount = rackToCount.getOrElse(host, 0)
-        rackToCount.put(host, rackCount + splits.size)
-      }
-    }
-
-    (hostToCount.toMap, rackToCount.toMap)
+  private class AlphaAllocateResponse(response: AMResponse) extends YarnAllocateResponse {
+    override def getAllocatedContainers() = response.getAllocatedContainers()
+    override def getAvailableResources() = response.getAvailableResources()
+    override def getCompletedContainersStatuses() = response.getCompletedContainersStatuses()
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6a72a369/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index cad94e5..c74dd1c 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -17,18 +17,431 @@
 
 package org.apache.spark.deploy.yarn
 
+import java.util.{List => JList}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
+
+import org.apache.spark.{Logging, SparkConf, SparkEnv}
+import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+
 object AllocationType extends Enumeration {
   type AllocationType = Value
   val HOST, RACK, ANY = Value
 }
 
+// TODO:
+// Too many params.
+// Needs to be mt-safe
+// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
+// make it more proactive and decoupled.
+
+// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
+// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
+// more info on how we are requesting for containers.
+
 /**
- * Interface that defines a Yarn allocator.
+ * Common code for the Yarn container allocator. Contains all the version-agnostic code to
+ * manage container allocation for a running Spark application.
  */
-trait YarnAllocator {
+private[yarn] abstract class YarnAllocator(
+    conf: Configuration,
+    sparkConf: SparkConf,
+    args: ApplicationMasterArguments,
+    preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
+  extends Logging {
 
-  def allocateResources(): Unit
-  def getNumExecutorsFailed: Int
-  def getNumExecutorsRunning: Int
+  // These three are locked on allocatedHostToContainersMap. Complementary data structures
+  // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
+  // allocatedContainerToHostMap: container to host mapping.
+  private val allocatedHostToContainersMap =
+    new HashMap[String, collection.mutable.Set[ContainerId]]()
 
-}
+  private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
+
+  // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
+  // allocated node)
+  // As with the two data structures above, tightly coupled with them, and to be locked on
+  // allocatedHostToContainersMap
+  private val allocatedRackCount = new HashMap[String, Int]()
+
+  // Containers to be released in next request to RM
+  private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean]
+
+  // Additional memory overhead - in mb.
+  protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+    YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
+
+  // Number of container requests that have been sent to, but not yet allocated by the
+  // ApplicationMaster.
+  private val numPendingAllocate = new AtomicInteger()
+  private val numExecutorsRunning = new AtomicInteger()
+  // Used to generate a unique id per executor
+  private val executorIdCounter = new AtomicInteger()
+  private val numExecutorsFailed = new AtomicInteger()
+
+  private val maxExecutors = args.numExecutors
+
+  protected val executorMemory = args.executorMemory
+  protected val executorCores = args.executorCores
+  protected val (preferredHostToCount, preferredRackToCount) =
+    generateNodeToWeight(conf, preferredNodes)
+
+  def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
+
+  def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
+
+  def allocateResources() = {
+    val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
+
+    if (missing > 0) {
+      numPendingAllocate.addAndGet(missing)
+      logInfo("Will Allocate %d executor containers, each with %d memory".format(
+        missing,
+        (executorMemory + memoryOverhead)))
+    } else {
+      logDebug("Empty allocation request ...")
+    }
+
+    val allocateResponse = allocateContainers(missing)
+    val allocatedContainers = allocateResponse.getAllocatedContainers()
+
+    if (allocatedContainers.size > 0) {
+      var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
+
+      if (numPendingAllocateNow < 0) {
+        numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
+      }
+
+      logDebug("""
+        Allocated containers: %d
+        Current executor count: %d
+        Containers released: %s
+        Cluster resources: %s
+        """.format(
+          allocatedContainers.size,
+          numExecutorsRunning.get(),
+          releasedContainers,
+          allocateResponse.getAvailableResources))
+
+      val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
+
+      for (container <- allocatedContainers) {
+        if (isResourceConstraintSatisfied(container)) {
+          // Add the accepted `container` to the host's list of already accepted,
+          // allocated containers
+          val host = container.getNodeId.getHost
+          val containersForHost = hostToContainers.getOrElseUpdate(host,
+            new ArrayBuffer[Container]())
+          containersForHost += container
+        } else {
+          // Release container, since it doesn't satisfy resource constraints.
+          internalReleaseContainer(container)
+        }
+      }
+
+       // Find the appropriate containers to use.
+      // TODO: Cleanup this group-by...
+      val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
+      val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
+      val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
+
+      for (candidateHost <- hostToContainers.keySet) {
+        val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
+        val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
+
+        val remainingContainersOpt = hostToContainers.get(candidateHost)
+        assert(remainingContainersOpt.isDefined)
+        var remainingContainers = remainingContainersOpt.get
+
+        if (requiredHostCount >= remainingContainers.size) {
+          // Since we have <= required containers, add all remaining containers to
+          // `dataLocalContainers`.
+          dataLocalContainers.put(candidateHost, remainingContainers)
+          // There are no more free containers remaining.
+          remainingContainers = null
+        } else if (requiredHostCount > 0) {
+          // Container list has more containers than we need for data locality.
+          // Split the list into two: one based on the data local container count,
+          // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
+          // containers.
+          val (dataLocal, remaining) = remainingContainers.splitAt(
+            remainingContainers.size - requiredHostCount)
+          dataLocalContainers.put(candidateHost, dataLocal)
+
+          // Invariant: remainingContainers == remaining
+
+          // YARN has a nasty habit of allocating a ton of containers on a host - discourage this.
+          // Add each container in `remaining` to list of containers to release. If we have an
+          // insufficient number of containers, then the next allocation cycle will reallocate
+          // (but won't treat it as data local).
+          // TODO(harvey): Rephrase this comment some more.
+          for (container <- remaining) internalReleaseContainer(container)
+          remainingContainers = null
+        }
+
+        // For rack local containers
+        if (remainingContainers != null) {
+          val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
+          if (rack != null) {
+            val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
+            val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
+              rackLocalContainers.getOrElse(rack, List()).size
+
+            if (requiredRackCount >= remainingContainers.size) {
+              // Add all remaining containers to to `dataLocalContainers`.
+              dataLocalContainers.put(rack, remainingContainers)
+              remainingContainers = null
+            } else if (requiredRackCount > 0) {
+              // Container list has more containers that we need for data locality.
+              // Split the list into two: one based on the data local container count,
+              // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
+              // containers.
+              val (rackLocal, remaining) = remainingContainers.splitAt(
+                remainingContainers.size - requiredRackCount)
+              val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
+                new ArrayBuffer[Container]())
+
+              existingRackLocal ++= rackLocal
+
+              remainingContainers = remaining
+            }
+          }
+        }
+
+        if (remainingContainers != null) {
+          // Not all containers have been consumed - add them to the list of off-rack containers.
+          offRackContainers.put(candidateHost, remainingContainers)
+        }
+      }
+
+      // Now that we have split the containers into various groups, go through them in order:
+      // first host-local, then rack-local, and finally off-rack.
+      // Note that the list we create below tries to ensure that not all containers end up within
+      // a host if there is a sufficiently large number of hosts/containers.
+      val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size)
+      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
+      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
+      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
+
+      // Run each of the allocated containers.
+      for (container <- allocatedContainersToProcess) {
+        val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
+        val executorHostname = container.getNodeId.getHost
+        val containerId = container.getId
+
+        val executorMemoryOverhead = (executorMemory + memoryOverhead)
+        assert(container.getResource.getMemory >= executorMemoryOverhead)
+
+        if (numExecutorsRunningNow > maxExecutors) {
+          logInfo("""Ignoring container %s at host %s, since we already have the required number of
+            containers for it.""".format(containerId, executorHostname))
+          internalReleaseContainer(container)
+          numExecutorsRunning.decrementAndGet()
+        } else {
+          val executorId = executorIdCounter.incrementAndGet().toString
+          val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+            SparkEnv.driverActorSystemName,
+            sparkConf.get("spark.driver.host"),
+            sparkConf.get("spark.driver.port"),
+            CoarseGrainedSchedulerBackend.ACTOR_NAME)
+
+          logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
+
+          // To be safe, remove the container from `releasedContainers`.
+          releasedContainers.remove(containerId)
+
+          val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname)
+          allocatedHostToContainersMap.synchronized {
+            val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
+              new HashSet[ContainerId]())
+
+            containerSet += containerId
+            allocatedContainerToHostMap.put(containerId, executorHostname)
+
+            if (rack != null) {
+              allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
+            }
+          }
+          logInfo("Launching ExecutorRunnable. driverUrl: %s,  executorHostname: %s".format(
+            driverUrl, executorHostname))
+          val executorRunnable = new ExecutorRunnable(
+            container,
+            conf,
+            sparkConf,
+            driverUrl,
+            executorId,
+            executorHostname,
+            executorMemory,
+            executorCores)
+          new Thread(executorRunnable).start()
+        }
+      }
+      logDebug("""
+        Finished allocating %s containers (from %s originally).
+        Current number of executors running: %d,
+        Released containers: %s
+        """.format(
+          allocatedContainersToProcess,
+          allocatedContainers,
+          numExecutorsRunning.get(),
+          releasedContainers))
+    }
+
+    val completedContainers = allocateResponse.getCompletedContainersStatuses()
+    if (completedContainers.size > 0) {
+      logDebug("Completed %d containers".format(completedContainers.size))
+
+      for (completedContainer <- completedContainers) {
+        val containerId = completedContainer.getContainerId
+
+        if (releasedContainers.containsKey(containerId)) {
+          // YarnAllocationHandler already marked the container for release, so remove it from
+          // `releasedContainers`.
+          releasedContainers.remove(containerId)
+        } else {
+          // Decrement the number of executors running. The next iteration of
+          // the ApplicationMaster's reporting thread will take care of allocating.
+          numExecutorsRunning.decrementAndGet()
+          logInfo("Completed container %s (state: %s, exit status: %s)".format(
+            containerId,
+            completedContainer.getState,
+            completedContainer.getExitStatus()))
+          // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
+          // there are some exit status' we shouldn't necessarily count against us, but for
+          // now I think its ok as none of the containers are expected to exit
+          if (completedContainer.getExitStatus() != 0) {
+            logInfo("Container marked as failed: " + containerId)
+            numExecutorsFailed.incrementAndGet()
+          }
+        }
+
+        allocatedHostToContainersMap.synchronized {
+          if (allocatedContainerToHostMap.containsKey(containerId)) {
+            val hostOpt = allocatedContainerToHostMap.get(containerId)
+            assert(hostOpt.isDefined)
+            val host = hostOpt.get
+
+            val containerSetOpt = allocatedHostToContainersMap.get(host)
+            assert(containerSetOpt.isDefined)
+            val containerSet = containerSetOpt.get
+
+            containerSet.remove(containerId)
+            if (containerSet.isEmpty) {
+              allocatedHostToContainersMap.remove(host)
+            } else {
+              allocatedHostToContainersMap.update(host, containerSet)
+            }
+
+            allocatedContainerToHostMap.remove(containerId)
+
+            // TODO: Move this part outside the synchronized block?
+            val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
+            if (rack != null) {
+              val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
+              if (rackCount > 0) {
+                allocatedRackCount.put(rack, rackCount)
+              } else {
+                allocatedRackCount.remove(rack)
+              }
+            }
+          }
+        }
+      }
+      logDebug("""
+        Finished processing %d completed containers.
+        Current number of executors running: %d,
+        Released containers: %s
+        """.format(
+          completedContainers.size,
+          numExecutorsRunning.get(),
+          releasedContainers))
+    }
+  }
+
+  protected def allocatedContainersOnHost(host: String): Int = {
+    var retval = 0
+    allocatedHostToContainersMap.synchronized {
+      retval = allocatedHostToContainersMap.getOrElse(host, Set()).size
+    }
+    retval
+  }
+
+  protected def allocatedContainersOnRack(rack: String): Int = {
+    var retval = 0
+    allocatedHostToContainersMap.synchronized {
+      retval = allocatedRackCount.getOrElse(rack, 0)
+    }
+    retval
+  }
+
+  private def isResourceConstraintSatisfied(container: Container): Boolean = {
+    container.getResource.getMemory >= (executorMemory + memoryOverhead)
+  }
+
+  // A simple method to copy the split info map.
+  private def generateNodeToWeight(
+      conf: Configuration,
+      input: collection.Map[String, collection.Set[SplitInfo]]
+    ): (Map[String, Int], Map[String, Int]) = {
+
+    if (input == null) {
+      return (Map[String, Int](), Map[String, Int]())
+    }
+
+    val hostToCount = new HashMap[String, Int]
+    val rackToCount = new HashMap[String, Int]
+
+    for ((host, splits) <- input) {
+      val hostCount = hostToCount.getOrElse(host, 0)
+      hostToCount.put(host, hostCount + splits.size)
+
+      val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
+      if (rack != null) {
+        val rackCount = rackToCount.getOrElse(host, 0)
+        rackToCount.put(host, rackCount + splits.size)
+      }
+    }
+
+    (hostToCount.toMap, rackToCount.toMap)
+  }
+
+  private def internalReleaseContainer(container: Container) = {
+    releasedContainers.put(container.getId(), true)
+    releaseContainer(container)
+  }
+
+  /**
+   * Called to allocate containers in the cluster.
+   *
+   * @param count Number of containers to allocate.
+   *              If zero, should still contact RM (as a heartbeat).
+   * @return Response to the allocation request.
+   */
+  protected def allocateContainers(count: Int): YarnAllocateResponse
+
+  /** Called to release a previously allocated container. */
+  protected def releaseContainer(container: Container): Unit
+
+  /**
+   * Defines the interface for an allocate response from the RM. This is needed since the alpha
+   * and stable interfaces differ here in ways that cannot be fixed using other routes.
+   */
+  protected trait YarnAllocateResponse {
+
+    def getAllocatedContainers(): JList[Container]
+
+    def getAvailableResources(): Resource
+
+    def getCompletedContainersStatuses(): JList[ContainerStatus]
+
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/6a72a369/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 4d51449..ed31457 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -17,36 +17,19 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
-import java.util.concurrent.atomic.AtomicInteger
-
 import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.mutable.{ArrayBuffer, HashMap}
 
-import org.apache.spark.{Logging, SparkConf, SparkEnv}
-import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.Utils
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.SplitInfo
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId
-import org.apache.hadoop.yarn.api.records.{Container, ContainerId}
-import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest}
-import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.util.Records
 
-// TODO:
-// Too many params.
-// Needs to be mt-safe
-// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
-// make it more proactive and decoupled.
-
-// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
-// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
-// more info on how we are requesting for containers.
-
 /**
  * Acquires resources for executors from a ResourceManager and launches executors in new containers.
  */
@@ -57,329 +40,22 @@ private[yarn] class YarnAllocationHandler(
     appAttemptId: ApplicationAttemptId,
     args: ApplicationMasterArguments,
     preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
-  extends YarnAllocator with Logging {
-
-  // These three are locked on allocatedHostToContainersMap. Complementary data structures
-  // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
-  // allocatedContainerToHostMap: container to host mapping.
-  private val allocatedHostToContainersMap =
-    new HashMap[String, collection.mutable.Set[ContainerId]]()
-
-  private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
-
-  // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
-  // allocated node)
-  // As with the two data structures above, tightly coupled with them, and to be locked on
-  // allocatedHostToContainersMap
-  private val allocatedRackCount = new HashMap[String, Int]()
-
-  // Containers which have been released.
-  private val releasedContainerList = new CopyOnWriteArrayList[ContainerId]()
-  // Containers to be released in next request to RM
-  private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
-
-  // Additional memory overhead - in mb.
-  private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
-    YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
-
-  // Number of container requests that have been sent to, but not yet allocated by the
-  // ApplicationMaster.
-  private val numPendingAllocate = new AtomicInteger()
-  private val numExecutorsRunning = new AtomicInteger()
-  // Used to generate a unique id per executor
-  private val executorIdCounter = new AtomicInteger()
-  private val lastResponseId = new AtomicInteger()
-  private val numExecutorsFailed = new AtomicInteger()
-
-  private val maxExecutors = args.numExecutors
-  private val executorMemory = args.executorMemory
-  private val executorCores = args.executorCores
-  private val (preferredHostToCount, preferredRackToCount) =
-    generateNodeToWeight(conf, preferredNodes)
-
-  override def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
+  extends YarnAllocator(conf, sparkConf, args, preferredNodes) {
 
-  override def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
-
-  def isResourceConstraintSatisfied(container: Container): Boolean = {
-    container.getResource.getMemory >= (executorMemory + memoryOverhead)
-  }
-
-  def releaseContainer(container: Container) {
-    val containerId = container.getId
-    pendingReleaseContainers.put(containerId, true)
-    amClient.releaseAssignedContainer(containerId)
+  override protected def releaseContainer(container: Container) = {
+    amClient.releaseAssignedContainer(container.getId())
   }
 
-  override def allocateResources() = {
-    addResourceRequests(maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get())
+  override protected def allocateContainers(count: Int): YarnAllocateResponse = {
+    addResourceRequests(count)
 
     // We have already set the container request. Poll the ResourceManager for a response.
     // This doubles as a heartbeat if there are no pending container requests.
     val progressIndicator = 0.1f
-    val allocateResponse = amClient.allocate(progressIndicator)
-
-    val allocatedContainers = allocateResponse.getAllocatedContainers()
-    if (allocatedContainers.size > 0) {
-      var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
-
-      if (numPendingAllocateNow < 0) {
-        numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
-      }
-
-      logDebug("""
-        Allocated containers: %d
-        Current executor count: %d
-        Containers released: %s
-        Containers to-be-released: %s
-        Cluster resources: %s
-        """.format(
-          allocatedContainers.size,
-          numExecutorsRunning.get(),
-          releasedContainerList,
-          pendingReleaseContainers,
-          allocateResponse.getAvailableResources))
-
-      val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
-
-      for (container <- allocatedContainers) {
-        if (isResourceConstraintSatisfied(container)) {
-          // Add the accepted `container` to the host's list of already accepted,
-          // allocated containers
-          val host = container.getNodeId.getHost
-          val containersForHost = hostToContainers.getOrElseUpdate(host,
-            new ArrayBuffer[Container]())
-          containersForHost += container
-        } else {
-          // Release container, since it doesn't satisfy resource constraints.
-          releaseContainer(container)
-        }
-      }
-
-       // Find the appropriate containers to use.
-      // TODO: Cleanup this group-by...
-      val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
-      val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
-      val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
-
-      for (candidateHost <- hostToContainers.keySet) {
-        val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
-        val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
-
-        val remainingContainersOpt = hostToContainers.get(candidateHost)
-        assert(remainingContainersOpt.isDefined)
-        var remainingContainers = remainingContainersOpt.get
-
-        if (requiredHostCount >= remainingContainers.size) {
-          // Since we have <= required containers, add all remaining containers to
-          // `dataLocalContainers`.
-          dataLocalContainers.put(candidateHost, remainingContainers)
-          // There are no more free containers remaining.
-          remainingContainers = null
-        } else if (requiredHostCount > 0) {
-          // Container list has more containers than we need for data locality.
-          // Split the list into two: one based on the data local container count,
-          // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
-          // containers.
-          val (dataLocal, remaining) = remainingContainers.splitAt(
-            remainingContainers.size - requiredHostCount)
-          dataLocalContainers.put(candidateHost, dataLocal)
-
-          // Invariant: remainingContainers == remaining
-
-          // YARN has a nasty habit of allocating a ton of containers on a host - discourage this.
-          // Add each container in `remaining` to list of containers to release. If we have an
-          // insufficient number of containers, then the next allocation cycle will reallocate
-          // (but won't treat it as data local).
-          // TODO(harvey): Rephrase this comment some more.
-          for (container <- remaining) releaseContainer(container)
-          remainingContainers = null
-        }
-
-        // For rack local containers
-        if (remainingContainers != null) {
-          val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
-          if (rack != null) {
-            val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
-            val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
-              rackLocalContainers.getOrElse(rack, List()).size
-
-            if (requiredRackCount >= remainingContainers.size) {
-              // Add all remaining containers to to `dataLocalContainers`.
-              dataLocalContainers.put(rack, remainingContainers)
-              remainingContainers = null
-            } else if (requiredRackCount > 0) {
-              // Container list has more containers that we need for data locality.
-              // Split the list into two: one based on the data local container count,
-              // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
-              // containers.
-              val (rackLocal, remaining) = remainingContainers.splitAt(
-                remainingContainers.size - requiredRackCount)
-              val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
-                new ArrayBuffer[Container]())
-
-              existingRackLocal ++= rackLocal
-
-              remainingContainers = remaining
-            }
-          }
-        }
-
-        if (remainingContainers != null) {
-          // Not all containers have been consumed - add them to the list of off-rack containers.
-          offRackContainers.put(candidateHost, remainingContainers)
-        }
-      }
-
-      // Now that we have split the containers into various groups, go through them in order:
-      // first host-local, then rack-local, and finally off-rack.
-      // Note that the list we create below tries to ensure that not all containers end up within
-      // a host if there is a sufficiently large number of hosts/containers.
-      val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size)
-      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
-      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
-      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
-
-      // Run each of the allocated containers.
-      for (container <- allocatedContainersToProcess) {
-        val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
-        val executorHostname = container.getNodeId.getHost
-        val containerId = container.getId
-
-        val executorMemoryOverhead = (executorMemory + memoryOverhead)
-        assert(container.getResource.getMemory >= executorMemoryOverhead)
-
-        if (numExecutorsRunningNow > maxExecutors) {
-          logInfo("""Ignoring container %s at host %s, since we already have the required number of
-            containers for it.""".format(containerId, executorHostname))
-          releaseContainer(container)
-          numExecutorsRunning.decrementAndGet()
-        } else {
-          val executorId = executorIdCounter.incrementAndGet().toString
-          val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
-            SparkEnv.driverActorSystemName,
-            sparkConf.get("spark.driver.host"),
-            sparkConf.get("spark.driver.port"),
-            CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
-          logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
-
-          // To be safe, remove the container from `pendingReleaseContainers`.
-          pendingReleaseContainers.remove(containerId)
-
-          val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname)
-          allocatedHostToContainersMap.synchronized {
-            val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
-              new HashSet[ContainerId]())
-
-            containerSet += containerId
-            allocatedContainerToHostMap.put(containerId, executorHostname)
-
-            if (rack != null) {
-              allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
-            }
-          }
-          logInfo("Launching ExecutorRunnable. driverUrl: %s,  executorHostname: %s".format(
-            driverUrl, executorHostname))
-          val executorRunnable = new ExecutorRunnable(
-            container,
-            conf,
-            sparkConf,
-            driverUrl,
-            executorId,
-            executorHostname,
-            executorMemory,
-            executorCores)
-          new Thread(executorRunnable).start()
-        }
-      }
-      logDebug("""
-        Finished allocating %s containers (from %s originally).
-        Current number of executors running: %d,
-        releasedContainerList: %s,
-        pendingReleaseContainers: %s
-        """.format(
-          allocatedContainersToProcess,
-          allocatedContainers,
-          numExecutorsRunning.get(),
-          releasedContainerList,
-          pendingReleaseContainers))
-    }
-
-    val completedContainers = allocateResponse.getCompletedContainersStatuses()
-    if (completedContainers.size > 0) {
-      logDebug("Completed %d containers".format(completedContainers.size))
-
-      for (completedContainer <- completedContainers) {
-        val containerId = completedContainer.getContainerId
-
-        if (pendingReleaseContainers.containsKey(containerId)) {
-          // YarnAllocationHandler already marked the container for release, so remove it from
-          // `pendingReleaseContainers`.
-          pendingReleaseContainers.remove(containerId)
-        } else {
-          // Decrement the number of executors running. The next iteration of
-          // the ApplicationMaster's reporting thread will take care of allocating.
-          numExecutorsRunning.decrementAndGet()
-          logInfo("Completed container %s (state: %s, exit status: %s)".format(
-            containerId,
-            completedContainer.getState,
-            completedContainer.getExitStatus()))
-          // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
-          // there are some exit status' we shouldn't necessarily count against us, but for
-          // now I think its ok as none of the containers are expected to exit
-          if (completedContainer.getExitStatus() != 0) {
-            logInfo("Container marked as failed: " + containerId)
-            numExecutorsFailed.incrementAndGet()
-          }
-        }
-
-        allocatedHostToContainersMap.synchronized {
-          if (allocatedContainerToHostMap.containsKey(containerId)) {
-            val hostOpt = allocatedContainerToHostMap.get(containerId)
-            assert(hostOpt.isDefined)
-            val host = hostOpt.get
-
-            val containerSetOpt = allocatedHostToContainersMap.get(host)
-            assert(containerSetOpt.isDefined)
-            val containerSet = containerSetOpt.get
-
-            containerSet.remove(containerId)
-            if (containerSet.isEmpty) {
-              allocatedHostToContainersMap.remove(host)
-            } else {
-              allocatedHostToContainersMap.update(host, containerSet)
-            }
-
-            allocatedContainerToHostMap.remove(containerId)
-
-            // TODO: Move this part outside the synchronized block?
-            val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
-            if (rack != null) {
-              val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
-              if (rackCount > 0) {
-                allocatedRackCount.put(rack, rackCount)
-              } else {
-                allocatedRackCount.remove(rack)
-              }
-            }
-          }
-        }
-      }
-      logDebug("""
-        Finished processing %d completed containers.
-        Current number of executors running: %d,
-        releasedContainerList: %s,
-        pendingReleaseContainers: %s
-        """.format(
-          completedContainers.size,
-          numExecutorsRunning.get(),
-          releasedContainerList,
-          pendingReleaseContainers))
-    }
+    new StableAllocateResponse(amClient.allocate(progressIndicator))
   }
 
-  def createRackResourceRequests(
+  private def createRackResourceRequests(
       hostContainers: ArrayBuffer[ContainerRequest]
     ): ArrayBuffer[ContainerRequest] = {
     // Generate modified racks and new set of hosts under it before issuing requests.
@@ -409,22 +85,6 @@ private[yarn] class YarnAllocationHandler(
     requestedContainers
   }
 
-  def allocatedContainersOnHost(host: String): Int = {
-    var retval = 0
-    allocatedHostToContainersMap.synchronized {
-      retval = allocatedHostToContainersMap.getOrElse(host, Set()).size
-    }
-    retval
-  }
-
-  def allocatedContainersOnRack(rack: String): Int = {
-    var retval = 0
-    allocatedHostToContainersMap.synchronized {
-      retval = allocatedRackCount.getOrElse(rack, 0)
-    }
-    retval
-  }
-
   private def addResourceRequests(numExecutors: Int) {
     val containerRequests: List[ContainerRequest] =
       if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
@@ -472,15 +132,6 @@ private[yarn] class YarnAllocationHandler(
       amClient.addContainerRequest(request)
     }
 
-    if (numExecutors > 0) {
-      numPendingAllocate.addAndGet(numExecutors)
-      logInfo("Will Allocate %d executor containers, each with %d memory".format(
-        numExecutors,
-        (executorMemory + memoryOverhead)))
-    } else {
-      logDebug("Empty allocation request ...")
-    }
-
     for (request <- containerRequests) {
       val nodes = request.getNodes
       var hostStr = if (nodes == null || nodes.isEmpty) {
@@ -549,31 +200,10 @@ private[yarn] class YarnAllocationHandler(
     requests
   }
 
-  // A simple method to copy the split info map.
-  private def generateNodeToWeight(
-      conf: Configuration,
-      input: collection.Map[String, collection.Set[SplitInfo]]
-    ): (Map[String, Int], Map[String, Int]) = {
-
-    if (input == null) {
-      return (Map[String, Int](), Map[String, Int]())
-    }
-
-    val hostToCount = new HashMap[String, Int]
-    val rackToCount = new HashMap[String, Int]
-
-    for ((host, splits) <- input) {
-      val hostCount = hostToCount.getOrElse(host, 0)
-      hostToCount.put(host, hostCount + splits.size)
-
-      val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
-      if (rack != null){
-        val rackCount = rackToCount.getOrElse(host, 0)
-        rackToCount.put(host, rackCount + splits.size)
-      }
-    }
-
-    (hostToCount.toMap, rackToCount.toMap)
+  private class StableAllocateResponse(response: AllocateResponse) extends YarnAllocateResponse {
+    override def getAllocatedContainers() = response.getAllocatedContainers()
+    override def getAvailableResources() = response.getAvailableResources()
+    override def getCompletedContainersStatuses() = response.getCompletedContainersStatuses()
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org