You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by vanzin <gi...@git.apache.org> on 2014/08/28 00:28:05 UTC

[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

GitHub user vanzin opened a pull request:

    https://github.com/apache/spark/pull/2169

    [SPARK-3187] [yarn] Cleanup allocator code.

    Move all shared logic to the base YarnAllocator class, and leave
    the version-specific logic in the version-specific module.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vanzin/spark SPARK-3187

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2169.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2169
    
----
commit f3f5f1d9aa982f6a431d8a647d436ba15a5e9ffe
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2014-08-22T23:52:51Z

    [SPARK-3187] [yarn] Cleanup allocator code.
    
    Move all shared logic to the base YarnAllocator class, and leave
    the version-specific logic in the version-specific module.
    
    (This is patch 1/2 and only touches the stable API.)

commit 8b1a077fb9ec994286f69791b5834bb4f0627dca
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2014-08-25T20:39:29Z

    Changes to the Yarn alpha allocator.
    
    Due to incompatibilities in the alpha and stable interfaces, some
    indirection had to be added so that the common code can interpret
    the RM responses.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2169#issuecomment-53650275
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19359/consoleFull) for   PR 2169 at commit [`8b1a077`](https://github.com/apache/spark/commit/8b1a077fb9ec994286f69791b5834bb4f0627dca).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2169#discussion_r16916806
  
    --- Diff: yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala ---
    @@ -521,48 +203,24 @@ private[yarn] class YarnAllocationHandler(
         rsrcRequest
       }
     
    -  def createReleasedContainerList(): ArrayBuffer[ContainerId] = {
    -
    +  private def createReleasedContainerList(): ArrayBuffer[ContainerId] = {
         val retval = new ArrayBuffer[ContainerId](1)
         // Iterator on COW list ...
    -    for (container <- releasedContainerList.iterator()){
    +    for (container <- releaseList.iterator()){
           retval += container
         }
         // Remove from the original list.
    -    if (! retval.isEmpty) {
    -      releasedContainerList.removeAll(retval)
    -      for (v <- retval) pendingReleaseContainers.put(v, true)
    -      logInfo("Releasing " + retval.size + " containers. pendingReleaseContainers : " +
    -        pendingReleaseContainers)
    +    if (!retval.isEmpty) {
    +      releaseList.removeAll(retval)
    +      logInfo("Releasing " + retval.size + " containers.")
         }
    -
         retval
       }
     
    -  // A simple method to copy the split info map.
    -  private def generateNodeToWeight(
    -    conf: Configuration,
    -    input: collection.Map[String, collection.Set[SplitInfo]]) :
    -  // host to count, rack to count
    -  (Map[String, Int], Map[String, Int]) = {
    -
    -    if (input == null) return (Map[String, Int](), Map[String, Int]())
    -
    -    val hostToCount = new HashMap[String, Int]
    -    val rackToCount = new HashMap[String, Int]
    -
    -    for ((host, splits) <- input) {
    -      val hostCount = hostToCount.getOrElse(host, 0)
    -      hostToCount.put(host, hostCount + splits.size)
    -
    -      val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
    -      if (rack != null){
    -        val rackCount = rackToCount.getOrElse(host, 0)
    -        rackToCount.put(host, rackCount + splits.size)
    -      }
    -    }
    -
    -    (hostToCount.toMap, rackToCount.toMap)
    +  class AlphaAllocateResponse(response: AMResponse) extends YarnAllocateResponse {
    --- End diff --
    
    I can add it, but I thought the parent's visibility already took care of hiding this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2169#issuecomment-53654401
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19359/consoleFull) for   PR 2169 at commit [`8b1a077`](https://github.com/apache/spark/commit/8b1a077fb9ec994286f69791b5834bb4f0627dca).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  class AlphaAllocateResponse(response: AMResponse) extends YarnAllocateResponse `
      * `  protected trait YarnAllocateResponse `
      * `  class StableAllocateResponse(response: AllocateResponse) extends YarnAllocateResponse `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2169#issuecomment-53745219
  
    @tgravescs I think they're complementary. This change merges a lot of the logic between the alpha and stable allocators. @sryza's change cleans up the Yarn API usage in the stable allocator by using a few different APIs. So it should still apply on top of these changes (after the obvious conflict resolution).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2169#issuecomment-53909016
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19487/consoleFull) for   PR 2169 at commit [`4dc9c83`](https://github.com/apache/spark/commit/4dc9c839338918266c2d31727f9b7b302c4b3c76).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2169#issuecomment-53908798
  
    Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/2169#issuecomment-54311828
  
    @vanzin Looks like I missed something atleast on 0.23.  I'm running more stuff and it appears if it doesn't get the executors right away it hangs forever - keeps asking for 0 executors even though it doesn't have any.   I haven't tried on 2.x yet but will debug and file jira once I figure out problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2169#issuecomment-53650117
  
    @tgravescs as with the previous change, it would be greatly appreciated if you guys could give this a spin on an yarn-alpha cluster. :-) Code compiles but I don't have a way to test it.
    
    Tested on stable, client and cluster, made sure that executors are restarted properly (and that jobs still fail after enough failures).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/2169


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2169#issuecomment-53909614
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19488/consoleFull) for   PR 2169 at commit [`4dc9c83`](https://github.com/apache/spark/commit/4dc9c839338918266c2d31727f9b7b302c4b3c76).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2169#issuecomment-53922107
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19491/consoleFull) for   PR 2169 at commit [`46c2826`](https://github.com/apache/spark/commit/46c282656bc3726233150f1cf965d140f330aaa7).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2169#discussion_r16897472
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
    @@ -17,18 +17,430 @@
     
     package org.apache.spark.deploy.yarn
     
    +import java.util.{List => JList}
    +import java.util.concurrent.ConcurrentHashMap
    +import java.util.concurrent.atomic.AtomicInteger
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
    +
    +import org.apache.spark.{Logging, SparkConf, SparkEnv}
    +import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +
     object AllocationType extends Enumeration {
       type AllocationType = Value
       val HOST, RACK, ANY = Value
     }
     
    +// TODO:
    +// Too many params.
    +// Needs to be mt-safe
    +// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
    +// make it more proactive and decoupled.
    +
    +// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
    +// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
    +// more info on how we are requesting for containers.
    +
     /**
    - * Interface that defines a Yarn allocator.
    + * Common code for the Yarn container allocator. Contains all the version-agnostic code to
    + * manage container allocation for a running Spark application.
      */
    -trait YarnAllocator {
    +private[yarn] abstract class YarnAllocator(
    +    conf: Configuration,
    +    sparkConf: SparkConf,
    +    args: ApplicationMasterArguments,
    +    preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
    +  extends Logging {
     
    -  def allocateResources(): Unit
    -  def getNumExecutorsFailed: Int
    -  def getNumExecutorsRunning: Int
    +  // These three are locked on allocatedHostToContainersMap. Complementary data structures
    +  // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
    +  // allocatedContainerToHostMap: container to host mapping.
    +  private val allocatedHostToContainersMap =
    +    new HashMap[String, collection.mutable.Set[ContainerId]]()
     
    -}
    +  private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
    +
    +  // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
    +  // allocated node)
    +  // As with the two data structures above, tightly coupled with them, and to be locked on
    +  // allocatedHostToContainersMap
    +  private val allocatedRackCount = new HashMap[String, Int]()
    +
    +  // Containers to be released in next request to RM
    +  private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean]
    +
    +  // Additional memory overhead - in mb.
    +  protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
    +    YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
    +
    +  // Number of container requests that have been sent to, but not yet allocated by the
    +  // ApplicationMaster.
    +  private val numPendingAllocate = new AtomicInteger()
    +  private val numExecutorsRunning = new AtomicInteger()
    +  // Used to generate a unique id per executor
    +  private val executorIdCounter = new AtomicInteger()
    +  private val numExecutorsFailed = new AtomicInteger()
    +
    +  private val maxExecutors = args.numExecutors
    +
    +  protected val executorMemory = args.executorMemory
    +  protected val executorCores = args.executorCores
    +  protected val (preferredHostToCount, preferredRackToCount) =
    +    generateNodeToWeight(conf, preferredNodes)
    +
    +  def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
    +
    +  def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
    +
    +  def allocateResources() = {
    +    val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
    +
    +    if (missing > 0) {
    +      numPendingAllocate.addAndGet(missing)
    +      logInfo("Will Allocate %d executor containers, each with %d memory".format(
    +        missing,
    +        (executorMemory + memoryOverhead)))
    +    } else {
    +      logDebug("Empty allocation request ...")
    +    }
    +
    +    val allocateResponse = allocateContainers(missing)
    +    val allocatedContainers = allocateResponse.getAllocatedContainers()
    +
    +    if (allocatedContainers.size > 0) {
    +      var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
    +
    +      if (numPendingAllocateNow < 0) {
    +        numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
    --- End diff --
    
    I know you just copy and pasted this but this seems wrong. I would have thought it would add back in any < 0.  To set it to 0.  
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2169#discussion_r16897553
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
    @@ -17,18 +17,430 @@
     
     package org.apache.spark.deploy.yarn
     
    +import java.util.{List => JList}
    +import java.util.concurrent.ConcurrentHashMap
    +import java.util.concurrent.atomic.AtomicInteger
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
    +
    +import org.apache.spark.{Logging, SparkConf, SparkEnv}
    +import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +
     object AllocationType extends Enumeration {
       type AllocationType = Value
       val HOST, RACK, ANY = Value
     }
     
    +// TODO:
    +// Too many params.
    +// Needs to be mt-safe
    +// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
    +// make it more proactive and decoupled.
    +
    +// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
    +// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
    +// more info on how we are requesting for containers.
    +
     /**
    - * Interface that defines a Yarn allocator.
    + * Common code for the Yarn container allocator. Contains all the version-agnostic code to
    + * manage container allocation for a running Spark application.
      */
    -trait YarnAllocator {
    +private[yarn] abstract class YarnAllocator(
    +    conf: Configuration,
    +    sparkConf: SparkConf,
    +    args: ApplicationMasterArguments,
    +    preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
    +  extends Logging {
     
    -  def allocateResources(): Unit
    -  def getNumExecutorsFailed: Int
    -  def getNumExecutorsRunning: Int
    +  // These three are locked on allocatedHostToContainersMap. Complementary data structures
    +  // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
    +  // allocatedContainerToHostMap: container to host mapping.
    +  private val allocatedHostToContainersMap =
    +    new HashMap[String, collection.mutable.Set[ContainerId]]()
     
    -}
    +  private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
    +
    +  // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
    +  // allocated node)
    +  // As with the two data structures above, tightly coupled with them, and to be locked on
    +  // allocatedHostToContainersMap
    +  private val allocatedRackCount = new HashMap[String, Int]()
    +
    +  // Containers to be released in next request to RM
    +  private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean]
    +
    +  // Additional memory overhead - in mb.
    +  protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
    +    YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
    +
    +  // Number of container requests that have been sent to, but not yet allocated by the
    +  // ApplicationMaster.
    +  private val numPendingAllocate = new AtomicInteger()
    +  private val numExecutorsRunning = new AtomicInteger()
    +  // Used to generate a unique id per executor
    +  private val executorIdCounter = new AtomicInteger()
    +  private val numExecutorsFailed = new AtomicInteger()
    +
    +  private val maxExecutors = args.numExecutors
    +
    +  protected val executorMemory = args.executorMemory
    +  protected val executorCores = args.executorCores
    +  protected val (preferredHostToCount, preferredRackToCount) =
    +    generateNodeToWeight(conf, preferredNodes)
    +
    +  def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
    +
    +  def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
    +
    +  def allocateResources() = {
    +    val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
    +
    +    if (missing > 0) {
    +      numPendingAllocate.addAndGet(missing)
    +      logInfo("Will Allocate %d executor containers, each with %d memory".format(
    +        missing,
    +        (executorMemory + memoryOverhead)))
    +    } else {
    +      logDebug("Empty allocation request ...")
    +    }
    +
    +    val allocateResponse = allocateContainers(missing)
    +    val allocatedContainers = allocateResponse.getAllocatedContainers()
    +
    +    if (allocatedContainers.size > 0) {
    +      var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
    +
    +      if (numPendingAllocateNow < 0) {
    +        numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
    --- End diff --
    
    Nevermind... it is adding back -1 * (-x)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/2169#issuecomment-54295202
  
    ran some tests against 0.23.  +1 changes  look good.  Thanks @vanzin 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2169#issuecomment-53916598
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19488/consoleFull) for   PR 2169 at commit [`4dc9c83`](https://github.com/apache/spark/commit/4dc9c839338918266c2d31727f9b7b302c4b3c76).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  class AlphaAllocateResponse(response: AMResponse) extends YarnAllocateResponse `
      * `  protected trait YarnAllocateResponse `
      * `  class StableAllocateResponse(response: AllocateResponse) extends YarnAllocateResponse `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2169#discussion_r16916951
  
    --- Diff: yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala ---
    @@ -521,48 +203,24 @@ private[yarn] class YarnAllocationHandler(
         rsrcRequest
       }
     
    -  def createReleasedContainerList(): ArrayBuffer[ContainerId] = {
    -
    +  private def createReleasedContainerList(): ArrayBuffer[ContainerId] = {
         val retval = new ArrayBuffer[ContainerId](1)
         // Iterator on COW list ...
    -    for (container <- releasedContainerList.iterator()){
    +    for (container <- releaseList.iterator()){
           retval += container
         }
         // Remove from the original list.
    -    if (! retval.isEmpty) {
    -      releasedContainerList.removeAll(retval)
    -      for (v <- retval) pendingReleaseContainers.put(v, true)
    -      logInfo("Releasing " + retval.size + " containers. pendingReleaseContainers : " +
    -        pendingReleaseContainers)
    +    if (!retval.isEmpty) {
    +      releaseList.removeAll(retval)
    +      logInfo("Releasing " + retval.size + " containers.")
         }
    -
         retval
       }
     
    -  // A simple method to copy the split info map.
    -  private def generateNodeToWeight(
    -    conf: Configuration,
    -    input: collection.Map[String, collection.Set[SplitInfo]]) :
    -  // host to count, rack to count
    -  (Map[String, Int], Map[String, Int]) = {
    -
    -    if (input == null) return (Map[String, Int](), Map[String, Int]())
    -
    -    val hostToCount = new HashMap[String, Int]
    -    val rackToCount = new HashMap[String, Int]
    -
    -    for ((host, splits) <- input) {
    -      val hostCount = hostToCount.getOrElse(host, 0)
    -      hostToCount.put(host, hostCount + splits.size)
    -
    -      val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
    -      if (rack != null){
    -        val rackCount = rackToCount.getOrElse(host, 0)
    -        rackToCount.put(host, rackCount + splits.size)
    -      }
    -    }
    -
    -    (hostToCount.toMap, rackToCount.toMap)
    +  class AlphaAllocateResponse(response: AMResponse) extends YarnAllocateResponse {
    --- End diff --
    
    I think the only difference is visible to all of yarn (since its private[yarn]) vs just making this inner class totally private.  Its not a big deal.
    
    I'll give this a try on 0.23 once I finish testing the rc for 1.1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2169#discussion_r16898137
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
    @@ -17,18 +17,430 @@
     
     package org.apache.spark.deploy.yarn
     
    +import java.util.{List => JList}
    +import java.util.concurrent.ConcurrentHashMap
    +import java.util.concurrent.atomic.AtomicInteger
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
    +
    +import org.apache.spark.{Logging, SparkConf, SparkEnv}
    +import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +
     object AllocationType extends Enumeration {
       type AllocationType = Value
       val HOST, RACK, ANY = Value
     }
     
    +// TODO:
    +// Too many params.
    +// Needs to be mt-safe
    +// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
    +// make it more proactive and decoupled.
    +
    +// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
    +// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
    +// more info on how we are requesting for containers.
    +
     /**
    - * Interface that defines a Yarn allocator.
    + * Common code for the Yarn container allocator. Contains all the version-agnostic code to
    + * manage container allocation for a running Spark application.
      */
    -trait YarnAllocator {
    +private[yarn] abstract class YarnAllocator(
    +    conf: Configuration,
    +    sparkConf: SparkConf,
    +    args: ApplicationMasterArguments,
    +    preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
    +  extends Logging {
     
    -  def allocateResources(): Unit
    -  def getNumExecutorsFailed: Int
    -  def getNumExecutorsRunning: Int
    +  // These three are locked on allocatedHostToContainersMap. Complementary data structures
    +  // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
    +  // allocatedContainerToHostMap: container to host mapping.
    +  private val allocatedHostToContainersMap =
    +    new HashMap[String, collection.mutable.Set[ContainerId]]()
     
    -}
    +  private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
    +
    +  // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
    +  // allocated node)
    +  // As with the two data structures above, tightly coupled with them, and to be locked on
    +  // allocatedHostToContainersMap
    +  private val allocatedRackCount = new HashMap[String, Int]()
    +
    +  // Containers to be released in next request to RM
    +  private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean]
    +
    +  // Additional memory overhead - in mb.
    +  protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
    +    YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
    +
    +  // Number of container requests that have been sent to, but not yet allocated by the
    +  // ApplicationMaster.
    +  private val numPendingAllocate = new AtomicInteger()
    +  private val numExecutorsRunning = new AtomicInteger()
    +  // Used to generate a unique id per executor
    +  private val executorIdCounter = new AtomicInteger()
    +  private val numExecutorsFailed = new AtomicInteger()
    +
    +  private val maxExecutors = args.numExecutors
    +
    +  protected val executorMemory = args.executorMemory
    +  protected val executorCores = args.executorCores
    +  protected val (preferredHostToCount, preferredRackToCount) =
    +    generateNodeToWeight(conf, preferredNodes)
    +
    +  def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
    +
    +  def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
    +
    +  def allocateResources() = {
    +    val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
    +
    +    if (missing > 0) {
    +      numPendingAllocate.addAndGet(missing)
    +      logInfo("Will Allocate %d executor containers, each with %d memory".format(
    +        missing,
    +        (executorMemory + memoryOverhead)))
    +    } else {
    +      logDebug("Empty allocation request ...")
    +    }
    +
    +    val allocateResponse = allocateContainers(missing)
    +    val allocatedContainers = allocateResponse.getAllocatedContainers()
    +
    +    if (allocatedContainers.size > 0) {
    +      var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
    +
    +      if (numPendingAllocateNow < 0) {
    +        numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
    +      }
    +
    +      logDebug("""
    +        Allocated containers: %d
    +        Current executor count: %d
    +        Containers released: %s
    +        Cluster resources: %s
    +        """.format(
    +          allocatedContainers.size,
    +          numExecutorsRunning.get(),
    +          releasedContainers,
    +          allocateResponse.getAvailableResources))
    +
    +      val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
    +
    +      for (container <- allocatedContainers) {
    +        if (isResourceConstraintSatisfied(container)) {
    +          // Add the accepted `container` to the host's list of already accepted,
    +          // allocated containers
    +          val host = container.getNodeId.getHost
    +          val containersForHost = hostToContainers.getOrElseUpdate(host,
    +            new ArrayBuffer[Container]())
    +          containersForHost += container
    +        } else {
    +          // Release container, since it doesn't satisfy resource constraints.
    +          (container)
    +        }
    +      }
    +
    +       // Find the appropriate containers to use.
    +      // TODO: Cleanup this group-by...
    +      val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
    +      val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
    +      val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
    +
    +      for (candidateHost <- hostToContainers.keySet) {
    +        val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
    +        val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
    +
    +        val remainingContainersOpt = hostToContainers.get(candidateHost)
    +        assert(remainingContainersOpt.isDefined)
    +        var remainingContainers = remainingContainersOpt.get
    +
    +        if (requiredHostCount >= remainingContainers.size) {
    +          // Since we have <= required containers, add all remaining containers to
    +          // `dataLocalContainers`.
    +          dataLocalContainers.put(candidateHost, remainingContainers)
    +          // There are no more free containers remaining.
    +          remainingContainers = null
    +        } else if (requiredHostCount > 0) {
    +          // Container list has more containers than we need for data locality.
    +          // Split the list into two: one based on the data local container count,
    +          // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
    +          // containers.
    +          val (dataLocal, remaining) = remainingContainers.splitAt(
    +            remainingContainers.size - requiredHostCount)
    +          dataLocalContainers.put(candidateHost, dataLocal)
    +
    +          // Invariant: remainingContainers == remaining
    +
    +          // YARN has a nasty habit of allocating a ton of containers on a host - discourage this.
    +          // Add each container in `remaining` to list of containers to release. If we have an
    +          // insufficient number of containers, then the next allocation cycle will reallocate
    +          // (but won't treat it as data local).
    +          // TODO(harvey): Rephrase this comment some more.
    +          for (container <- remaining) (container)
    --- End diff --
    
    same here misisng call to releaseContainer?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2169#discussion_r16897786
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
    @@ -17,18 +17,430 @@
     
     package org.apache.spark.deploy.yarn
     
    +import java.util.{List => JList}
    +import java.util.concurrent.ConcurrentHashMap
    +import java.util.concurrent.atomic.AtomicInteger
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
    +
    +import org.apache.spark.{Logging, SparkConf, SparkEnv}
    +import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +
     object AllocationType extends Enumeration {
       type AllocationType = Value
       val HOST, RACK, ANY = Value
     }
     
    +// TODO:
    +// Too many params.
    +// Needs to be mt-safe
    +// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
    +// make it more proactive and decoupled.
    +
    +// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
    +// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
    +// more info on how we are requesting for containers.
    +
     /**
    - * Interface that defines a Yarn allocator.
    + * Common code for the Yarn container allocator. Contains all the version-agnostic code to
    + * manage container allocation for a running Spark application.
      */
    -trait YarnAllocator {
    +private[yarn] abstract class YarnAllocator(
    +    conf: Configuration,
    +    sparkConf: SparkConf,
    +    args: ApplicationMasterArguments,
    +    preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
    +  extends Logging {
     
    -  def allocateResources(): Unit
    -  def getNumExecutorsFailed: Int
    -  def getNumExecutorsRunning: Int
    +  // These three are locked on allocatedHostToContainersMap. Complementary data structures
    +  // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
    +  // allocatedContainerToHostMap: container to host mapping.
    +  private val allocatedHostToContainersMap =
    +    new HashMap[String, collection.mutable.Set[ContainerId]]()
     
    -}
    +  private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
    +
    +  // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
    +  // allocated node)
    +  // As with the two data structures above, tightly coupled with them, and to be locked on
    +  // allocatedHostToContainersMap
    +  private val allocatedRackCount = new HashMap[String, Int]()
    +
    +  // Containers to be released in next request to RM
    +  private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean]
    +
    +  // Additional memory overhead - in mb.
    +  protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
    +    YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
    +
    +  // Number of container requests that have been sent to, but not yet allocated by the
    +  // ApplicationMaster.
    +  private val numPendingAllocate = new AtomicInteger()
    +  private val numExecutorsRunning = new AtomicInteger()
    +  // Used to generate a unique id per executor
    +  private val executorIdCounter = new AtomicInteger()
    +  private val numExecutorsFailed = new AtomicInteger()
    +
    +  private val maxExecutors = args.numExecutors
    +
    +  protected val executorMemory = args.executorMemory
    +  protected val executorCores = args.executorCores
    +  protected val (preferredHostToCount, preferredRackToCount) =
    +    generateNodeToWeight(conf, preferredNodes)
    +
    +  def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
    +
    +  def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
    +
    +  def allocateResources() = {
    +    val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
    +
    +    if (missing > 0) {
    +      numPendingAllocate.addAndGet(missing)
    +      logInfo("Will Allocate %d executor containers, each with %d memory".format(
    +        missing,
    +        (executorMemory + memoryOverhead)))
    +    } else {
    +      logDebug("Empty allocation request ...")
    +    }
    +
    +    val allocateResponse = allocateContainers(missing)
    +    val allocatedContainers = allocateResponse.getAllocatedContainers()
    +
    +    if (allocatedContainers.size > 0) {
    +      var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
    +
    +      if (numPendingAllocateNow < 0) {
    +        numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
    +      }
    +
    +      logDebug("""
    +        Allocated containers: %d
    +        Current executor count: %d
    +        Containers released: %s
    +        Cluster resources: %s
    +        """.format(
    +          allocatedContainers.size,
    +          numExecutorsRunning.get(),
    +          releasedContainers,
    +          allocateResponse.getAvailableResources))
    +
    +      val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
    +
    +      for (container <- allocatedContainers) {
    +        if (isResourceConstraintSatisfied(container)) {
    +          // Add the accepted `container` to the host's list of already accepted,
    +          // allocated containers
    +          val host = container.getNodeId.getHost
    +          val containersForHost = hostToContainers.getOrElseUpdate(host,
    +            new ArrayBuffer[Container]())
    +          containersForHost += container
    +        } else {
    +          // Release container, since it doesn't satisfy resource constraints.
    +          (container)
    --- End diff --
    
    missing call to releaseContainer?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2169#issuecomment-53927734
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19491/consoleFull) for   PR 2169 at commit [`46c2826`](https://github.com/apache/spark/commit/46c282656bc3726233150f1cf965d140f330aaa7).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  protected trait YarnAllocateResponse `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2169#discussion_r16898249
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
    @@ -17,18 +17,430 @@
     
     package org.apache.spark.deploy.yarn
     
    +import java.util.{List => JList}
    +import java.util.concurrent.ConcurrentHashMap
    +import java.util.concurrent.atomic.AtomicInteger
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
    +
    +import org.apache.spark.{Logging, SparkConf, SparkEnv}
    +import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +
     object AllocationType extends Enumeration {
       type AllocationType = Value
       val HOST, RACK, ANY = Value
     }
     
    +// TODO:
    +// Too many params.
    +// Needs to be mt-safe
    +// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
    +// make it more proactive and decoupled.
    +
    +// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
    +// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
    +// more info on how we are requesting for containers.
    +
     /**
    - * Interface that defines a Yarn allocator.
    + * Common code for the Yarn container allocator. Contains all the version-agnostic code to
    + * manage container allocation for a running Spark application.
      */
    -trait YarnAllocator {
    +private[yarn] abstract class YarnAllocator(
    +    conf: Configuration,
    +    sparkConf: SparkConf,
    +    args: ApplicationMasterArguments,
    +    preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
    +  extends Logging {
     
    -  def allocateResources(): Unit
    -  def getNumExecutorsFailed: Int
    -  def getNumExecutorsRunning: Int
    +  // These three are locked on allocatedHostToContainersMap. Complementary data structures
    +  // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
    +  // allocatedContainerToHostMap: container to host mapping.
    +  private val allocatedHostToContainersMap =
    +    new HashMap[String, collection.mutable.Set[ContainerId]]()
     
    -}
    +  private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
    +
    +  // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
    +  // allocated node)
    +  // As with the two data structures above, tightly coupled with them, and to be locked on
    +  // allocatedHostToContainersMap
    +  private val allocatedRackCount = new HashMap[String, Int]()
    +
    +  // Containers to be released in next request to RM
    +  private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean]
    +
    +  // Additional memory overhead - in mb.
    +  protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
    +    YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
    +
    +  // Number of container requests that have been sent to, but not yet allocated by the
    +  // ApplicationMaster.
    +  private val numPendingAllocate = new AtomicInteger()
    +  private val numExecutorsRunning = new AtomicInteger()
    +  // Used to generate a unique id per executor
    +  private val executorIdCounter = new AtomicInteger()
    +  private val numExecutorsFailed = new AtomicInteger()
    +
    +  private val maxExecutors = args.numExecutors
    +
    +  protected val executorMemory = args.executorMemory
    +  protected val executorCores = args.executorCores
    +  protected val (preferredHostToCount, preferredRackToCount) =
    +    generateNodeToWeight(conf, preferredNodes)
    +
    +  def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
    +
    +  def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
    +
    +  def allocateResources() = {
    +    val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
    +
    +    if (missing > 0) {
    +      numPendingAllocate.addAndGet(missing)
    +      logInfo("Will Allocate %d executor containers, each with %d memory".format(
    +        missing,
    +        (executorMemory + memoryOverhead)))
    +    } else {
    +      logDebug("Empty allocation request ...")
    +    }
    +
    +    val allocateResponse = allocateContainers(missing)
    +    val allocatedContainers = allocateResponse.getAllocatedContainers()
    +
    +    if (allocatedContainers.size > 0) {
    +      var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
    +
    +      if (numPendingAllocateNow < 0) {
    +        numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
    +      }
    +
    +      logDebug("""
    +        Allocated containers: %d
    +        Current executor count: %d
    +        Containers released: %s
    +        Cluster resources: %s
    +        """.format(
    +          allocatedContainers.size,
    +          numExecutorsRunning.get(),
    +          releasedContainers,
    +          allocateResponse.getAvailableResources))
    +
    +      val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
    +
    +      for (container <- allocatedContainers) {
    +        if (isResourceConstraintSatisfied(container)) {
    +          // Add the accepted `container` to the host's list of already accepted,
    +          // allocated containers
    +          val host = container.getNodeId.getHost
    +          val containersForHost = hostToContainers.getOrElseUpdate(host,
    +            new ArrayBuffer[Container]())
    +          containersForHost += container
    +        } else {
    +          // Release container, since it doesn't satisfy resource constraints.
    +          (container)
    +        }
    +      }
    +
    +       // Find the appropriate containers to use.
    +      // TODO: Cleanup this group-by...
    +      val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
    +      val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
    +      val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
    +
    +      for (candidateHost <- hostToContainers.keySet) {
    +        val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
    +        val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
    +
    +        val remainingContainersOpt = hostToContainers.get(candidateHost)
    +        assert(remainingContainersOpt.isDefined)
    +        var remainingContainers = remainingContainersOpt.get
    +
    +        if (requiredHostCount >= remainingContainers.size) {
    +          // Since we have <= required containers, add all remaining containers to
    +          // `dataLocalContainers`.
    +          dataLocalContainers.put(candidateHost, remainingContainers)
    +          // There are no more free containers remaining.
    +          remainingContainers = null
    +        } else if (requiredHostCount > 0) {
    +          // Container list has more containers than we need for data locality.
    +          // Split the list into two: one based on the data local container count,
    +          // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
    +          // containers.
    +          val (dataLocal, remaining) = remainingContainers.splitAt(
    +            remainingContainers.size - requiredHostCount)
    +          dataLocalContainers.put(candidateHost, dataLocal)
    +
    +          // Invariant: remainingContainers == remaining
    +
    +          // YARN has a nasty habit of allocating a ton of containers on a host - discourage this.
    +          // Add each container in `remaining` to list of containers to release. If we have an
    +          // insufficient number of containers, then the next allocation cycle will reallocate
    +          // (but won't treat it as data local).
    +          // TODO(harvey): Rephrase this comment some more.
    +          for (container <- remaining) (container)
    +          remainingContainers = null
    +        }
    +
    +        // For rack local containers
    +        if (remainingContainers != null) {
    +          val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
    +          if (rack != null) {
    +            val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
    +            val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
    +              rackLocalContainers.getOrElse(rack, List()).size
    +
    +            if (requiredRackCount >= remainingContainers.size) {
    +              // Add all remaining containers to to `dataLocalContainers`.
    +              dataLocalContainers.put(rack, remainingContainers)
    +              remainingContainers = null
    +            } else if (requiredRackCount > 0) {
    +              // Container list has more containers that we need for data locality.
    +              // Split the list into two: one based on the data local container count,
    +              // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
    +              // containers.
    +              val (rackLocal, remaining) = remainingContainers.splitAt(
    +                remainingContainers.size - requiredRackCount)
    +              val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
    +                new ArrayBuffer[Container]())
    +
    +              existingRackLocal ++= rackLocal
    +
    +              remainingContainers = remaining
    +            }
    +          }
    +        }
    +
    +        if (remainingContainers != null) {
    +          // Not all containers have been consumed - add them to the list of off-rack containers.
    +          offRackContainers.put(candidateHost, remainingContainers)
    +        }
    +      }
    +
    +      // Now that we have split the containers into various groups, go through them in order:
    +      // first host-local, then rack-local, and finally off-rack.
    +      // Note that the list we create below tries to ensure that not all containers end up within
    +      // a host if there is a sufficiently large number of hosts/containers.
    +      val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size)
    +      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
    +      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
    +      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
    +
    +      // Run each of the allocated containers.
    +      for (container <- allocatedContainersToProcess) {
    +        val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
    +        val executorHostname = container.getNodeId.getHost
    +        val containerId = container.getId
    +
    +        val executorMemoryOverhead = (executorMemory + memoryOverhead)
    +        assert(container.getResource.getMemory >= executorMemoryOverhead)
    +
    +        if (numExecutorsRunningNow > maxExecutors) {
    +          logInfo("""Ignoring container %s at host %s, since we already have the required number of
    +            containers for it.""".format(containerId, executorHostname))
    --- End diff --
    
    Perhaps I'm missing how the release is now working.  previously it added it to be released here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2169#discussion_r16897811
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
    @@ -17,18 +17,430 @@
     
     package org.apache.spark.deploy.yarn
     
    +import java.util.{List => JList}
    +import java.util.concurrent.ConcurrentHashMap
    +import java.util.concurrent.atomic.AtomicInteger
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
    +
    +import org.apache.spark.{Logging, SparkConf, SparkEnv}
    +import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +
     object AllocationType extends Enumeration {
       type AllocationType = Value
       val HOST, RACK, ANY = Value
     }
     
    +// TODO:
    +// Too many params.
    +// Needs to be mt-safe
    +// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
    +// make it more proactive and decoupled.
    +
    +// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
    +// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
    +// more info on how we are requesting for containers.
    +
     /**
    - * Interface that defines a Yarn allocator.
    + * Common code for the Yarn container allocator. Contains all the version-agnostic code to
    + * manage container allocation for a running Spark application.
      */
    -trait YarnAllocator {
    +private[yarn] abstract class YarnAllocator(
    +    conf: Configuration,
    +    sparkConf: SparkConf,
    +    args: ApplicationMasterArguments,
    +    preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
    +  extends Logging {
     
    -  def allocateResources(): Unit
    -  def getNumExecutorsFailed: Int
    -  def getNumExecutorsRunning: Int
    +  // These three are locked on allocatedHostToContainersMap. Complementary data structures
    +  // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
    +  // allocatedContainerToHostMap: container to host mapping.
    +  private val allocatedHostToContainersMap =
    +    new HashMap[String, collection.mutable.Set[ContainerId]]()
     
    -}
    +  private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
    +
    +  // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
    +  // allocated node)
    +  // As with the two data structures above, tightly coupled with them, and to be locked on
    +  // allocatedHostToContainersMap
    +  private val allocatedRackCount = new HashMap[String, Int]()
    +
    +  // Containers to be released in next request to RM
    +  private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean]
    +
    +  // Additional memory overhead - in mb.
    +  protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
    +    YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
    +
    +  // Number of container requests that have been sent to, but not yet allocated by the
    +  // ApplicationMaster.
    +  private val numPendingAllocate = new AtomicInteger()
    +  private val numExecutorsRunning = new AtomicInteger()
    +  // Used to generate a unique id per executor
    +  private val executorIdCounter = new AtomicInteger()
    +  private val numExecutorsFailed = new AtomicInteger()
    +
    +  private val maxExecutors = args.numExecutors
    +
    +  protected val executorMemory = args.executorMemory
    +  protected val executorCores = args.executorCores
    +  protected val (preferredHostToCount, preferredRackToCount) =
    +    generateNodeToWeight(conf, preferredNodes)
    +
    +  def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
    +
    +  def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
    +
    +  def allocateResources() = {
    +    val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
    +
    +    if (missing > 0) {
    +      numPendingAllocate.addAndGet(missing)
    +      logInfo("Will Allocate %d executor containers, each with %d memory".format(
    +        missing,
    +        (executorMemory + memoryOverhead)))
    +    } else {
    +      logDebug("Empty allocation request ...")
    +    }
    +
    +    val allocateResponse = allocateContainers(missing)
    +    val allocatedContainers = allocateResponse.getAllocatedContainers()
    +
    +    if (allocatedContainers.size > 0) {
    +      var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
    +
    +      if (numPendingAllocateNow < 0) {
    +        numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
    +      }
    +
    +      logDebug("""
    +        Allocated containers: %d
    +        Current executor count: %d
    +        Containers released: %s
    +        Cluster resources: %s
    +        """.format(
    +          allocatedContainers.size,
    +          numExecutorsRunning.get(),
    +          releasedContainers,
    +          allocateResponse.getAvailableResources))
    +
    +      val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
    +
    +      for (container <- allocatedContainers) {
    +        if (isResourceConstraintSatisfied(container)) {
    +          // Add the accepted `container` to the host's list of already accepted,
    +          // allocated containers
    +          val host = container.getNodeId.getHost
    +          val containersForHost = hostToContainers.getOrElseUpdate(host,
    +            new ArrayBuffer[Container]())
    +          containersForHost += container
    +        } else {
    +          // Release container, since it doesn't satisfy resource constraints.
    +          (container)
    +        }
    +      }
    +
    +       // Find the appropriate containers to use.
    +      // TODO: Cleanup this group-by...
    +      val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
    +      val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
    +      val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
    +
    +      for (candidateHost <- hostToContainers.keySet) {
    +        val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
    +        val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
    +
    +        val remainingContainersOpt = hostToContainers.get(candidateHost)
    +        assert(remainingContainersOpt.isDefined)
    +        var remainingContainers = remainingContainersOpt.get
    +
    +        if (requiredHostCount >= remainingContainers.size) {
    +          // Since we have <= required containers, add all remaining containers to
    +          // `dataLocalContainers`.
    +          dataLocalContainers.put(candidateHost, remainingContainers)
    +          // There are no more free containers remaining.
    +          remainingContainers = null
    +        } else if (requiredHostCount > 0) {
    +          // Container list has more containers than we need for data locality.
    +          // Split the list into two: one based on the data local container count,
    +          // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
    +          // containers.
    +          val (dataLocal, remaining) = remainingContainers.splitAt(
    +            remainingContainers.size - requiredHostCount)
    +          dataLocalContainers.put(candidateHost, dataLocal)
    +
    +          // Invariant: remainingContainers == remaining
    +
    +          // YARN has a nasty habit of allocating a ton of containers on a host - discourage this.
    +          // Add each container in `remaining` to list of containers to release. If we have an
    +          // insufficient number of containers, then the next allocation cycle will reallocate
    +          // (but won't treat it as data local).
    +          // TODO(harvey): Rephrase this comment some more.
    +          for (container <- remaining) (container)
    +          remainingContainers = null
    +        }
    +
    +        // For rack local containers
    +        if (remainingContainers != null) {
    +          val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
    +          if (rack != null) {
    +            val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
    +            val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
    +              rackLocalContainers.getOrElse(rack, List()).size
    +
    +            if (requiredRackCount >= remainingContainers.size) {
    +              // Add all remaining containers to to `dataLocalContainers`.
    +              dataLocalContainers.put(rack, remainingContainers)
    +              remainingContainers = null
    +            } else if (requiredRackCount > 0) {
    +              // Container list has more containers that we need for data locality.
    +              // Split the list into two: one based on the data local container count,
    +              // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
    +              // containers.
    +              val (rackLocal, remaining) = remainingContainers.splitAt(
    +                remainingContainers.size - requiredRackCount)
    +              val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
    +                new ArrayBuffer[Container]())
    +
    +              existingRackLocal ++= rackLocal
    +
    +              remainingContainers = remaining
    +            }
    +          }
    +        }
    +
    +        if (remainingContainers != null) {
    +          // Not all containers have been consumed - add them to the list of off-rack containers.
    +          offRackContainers.put(candidateHost, remainingContainers)
    +        }
    +      }
    +
    +      // Now that we have split the containers into various groups, go through them in order:
    +      // first host-local, then rack-local, and finally off-rack.
    +      // Note that the list we create below tries to ensure that not all containers end up within
    +      // a host if there is a sufficiently large number of hosts/containers.
    +      val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size)
    +      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
    +      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
    +      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
    +
    +      // Run each of the allocated containers.
    +      for (container <- allocatedContainersToProcess) {
    +        val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
    +        val executorHostname = container.getNodeId.getHost
    +        val containerId = container.getId
    +
    +        val executorMemoryOverhead = (executorMemory + memoryOverhead)
    +        assert(container.getResource.getMemory >= executorMemoryOverhead)
    +
    +        if (numExecutorsRunningNow > maxExecutors) {
    +          logInfo("""Ignoring container %s at host %s, since we already have the required number of
    +            containers for it.""".format(containerId, executorHostname))
    +          numExecutorsRunning.decrementAndGet()
    +        } else {
    +          val executorId = executorIdCounter.incrementAndGet().toString
    +          val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
    +            SparkEnv.driverActorSystemName,
    +            sparkConf.get("spark.driver.host"),
    +            sparkConf.get("spark.driver.port"),
    +            CoarseGrainedSchedulerBackend.ACTOR_NAME)
    +
    +          logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
    +
    +          // To be safe, remove the container from `releasedContainers`.
    +          releasedContainers.remove(containerId)
    +
    +          val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname)
    +          allocatedHostToContainersMap.synchronized {
    +            val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
    +              new HashSet[ContainerId]())
    +
    +            containerSet += containerId
    +            allocatedContainerToHostMap.put(containerId, executorHostname)
    +
    +            if (rack != null) {
    +              allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
    +            }
    +          }
    +          logInfo("Launching ExecutorRunnable. driverUrl: %s,  executorHostname: %s".format(
    +            driverUrl, executorHostname))
    +          val executorRunnable = new ExecutorRunnable(
    +            container,
    +            conf,
    +            sparkConf,
    +            driverUrl,
    +            executorId,
    +            executorHostname,
    +            executorMemory,
    +            executorCores)
    +          new Thread(executorRunnable).start()
    +        }
    +      }
    +      logDebug("""
    +        Finished allocating %s containers (from %s originally).
    +        Current number of executors running: %d,
    +        Released containers: %s
    +        """.format(
    +          allocatedContainersToProcess,
    +          allocatedContainers,
    +          numExecutorsRunning.get(),
    +          releasedContainers))
    +    }
    +
    +    val completedContainers = allocateResponse.getCompletedContainersStatuses()
    +    if (completedContainers.size > 0) {
    +      logDebug("Completed %d containers".format(completedContainers.size))
    +
    +      for (completedContainer <- completedContainers) {
    +        val containerId = completedContainer.getContainerId
    +
    +        if (releasedContainers.containsKey(containerId)) {
    +          // YarnAllocationHandler already marked the container for release, so remove it from
    +          // `releasedContainers`.
    +          releasedContainers.remove(containerId)
    +        } else {
    +          // Decrement the number of executors running. The next iteration of
    +          // the ApplicationMaster's reporting thread will take care of allocating.
    +          numExecutorsRunning.decrementAndGet()
    +          logInfo("Completed container %s (state: %s, exit status: %s)".format(
    +            containerId,
    +            completedContainer.getState,
    +            completedContainer.getExitStatus()))
    +          // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
    +          // there are some exit status' we shouldn't necessarily count against us, but for
    +          // now I think its ok as none of the containers are expected to exit
    +          if (completedContainer.getExitStatus() != 0) {
    +            logInfo("Container marked as failed: " + containerId)
    +            numExecutorsFailed.incrementAndGet()
    +          }
    +        }
    +
    +        allocatedHostToContainersMap.synchronized {
    +          if (allocatedContainerToHostMap.containsKey(containerId)) {
    +            val hostOpt = allocatedContainerToHostMap.get(containerId)
    +            assert(hostOpt.isDefined)
    +            val host = hostOpt.get
    +
    +            val containerSetOpt = allocatedHostToContainersMap.get(host)
    +            assert(containerSetOpt.isDefined)
    +            val containerSet = containerSetOpt.get
    +
    +            containerSet.remove(containerId)
    +            if (containerSet.isEmpty) {
    +              allocatedHostToContainersMap.remove(host)
    +            } else {
    +              allocatedHostToContainersMap.update(host, containerSet)
    +            }
    +
    +            allocatedContainerToHostMap.remove(containerId)
    +
    +            // TODO: Move this part outside the synchronized block?
    +            val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
    +            if (rack != null) {
    +              val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
    +              if (rackCount > 0) {
    +                allocatedRackCount.put(rack, rackCount)
    +              } else {
    +                allocatedRackCount.remove(rack)
    +              }
    +            }
    +          }
    +        }
    +      }
    +      logDebug("""
    +        Finished processing %d completed containers.
    +        Current number of executors running: %d,
    +        Released containers: %s
    +        """.format(
    +          completedContainers.size,
    +          numExecutorsRunning.get(),
    +          releasedContainers))
    +    }
    +  }
    +
    +  protected def allocatedContainersOnHost(host: String): Int = {
    +    var retval = 0
    +    allocatedHostToContainersMap.synchronized {
    +      retval = allocatedHostToContainersMap.getOrElse(host, Set()).size
    +    }
    +    retval
    +  }
    +
    +  protected def allocatedContainersOnRack(rack: String): Int = {
    +    var retval = 0
    +    allocatedHostToContainersMap.synchronized {
    +      retval = allocatedRackCount.getOrElse(rack, 0)
    +    }
    +    retval
    +  }
    +
    +  private def isResourceConstraintSatisfied(container: Container): Boolean = {
    +    container.getResource.getMemory >= (executorMemory + memoryOverhead)
    +  }
    +
    +  // A simple method to copy the split info map.
    +  private def generateNodeToWeight(
    +      conf: Configuration,
    +      input: collection.Map[String, collection.Set[SplitInfo]]
    +    ): (Map[String, Int], Map[String, Int]) = {
    +
    +    if (input == null) {
    +      return (Map[String, Int](), Map[String, Int]())
    +    }
    +
    +    val hostToCount = new HashMap[String, Int]
    +    val rackToCount = new HashMap[String, Int]
    +
    +    for ((host, splits) <- input) {
    +      val hostCount = hostToCount.getOrElse(host, 0)
    +      hostToCount.put(host, hostCount + splits.size)
    +
    +      val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
    +      if (rack != null) {
    +        val rackCount = rackToCount.getOrElse(host, 0)
    +        rackToCount.put(host, rackCount + splits.size)
    +      }
    +    }
    +
    +    (hostToCount.toMap, rackToCount.toMap)
    +  }
    +
    +  private def internalReleaseContainer(container: Container) = {
    --- End diff --
    
    this is never called?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2169#issuecomment-53664874
  
    Test failures look unrelated. Something broken in master maybe?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2169#discussion_r16910507
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
    @@ -17,18 +17,430 @@
     
     package org.apache.spark.deploy.yarn
     
    +import java.util.{List => JList}
    +import java.util.concurrent.ConcurrentHashMap
    +import java.util.concurrent.atomic.AtomicInteger
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
    +
    +import org.apache.spark.{Logging, SparkConf, SparkEnv}
    +import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +
     object AllocationType extends Enumeration {
       type AllocationType = Value
       val HOST, RACK, ANY = Value
     }
     
    +// TODO:
    +// Too many params.
    +// Needs to be mt-safe
    +// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
    +// make it more proactive and decoupled.
    +
    +// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
    +// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
    +// more info on how we are requesting for containers.
    +
     /**
    - * Interface that defines a Yarn allocator.
    + * Common code for the Yarn container allocator. Contains all the version-agnostic code to
    + * manage container allocation for a running Spark application.
      */
    -trait YarnAllocator {
    +private[yarn] abstract class YarnAllocator(
    +    conf: Configuration,
    +    sparkConf: SparkConf,
    +    args: ApplicationMasterArguments,
    +    preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
    +  extends Logging {
     
    -  def allocateResources(): Unit
    -  def getNumExecutorsFailed: Int
    -  def getNumExecutorsRunning: Int
    +  // These three are locked on allocatedHostToContainersMap. Complementary data structures
    +  // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
    +  // allocatedContainerToHostMap: container to host mapping.
    +  private val allocatedHostToContainersMap =
    +    new HashMap[String, collection.mutable.Set[ContainerId]]()
     
    -}
    +  private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
    +
    +  // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
    +  // allocated node)
    +  // As with the two data structures above, tightly coupled with them, and to be locked on
    +  // allocatedHostToContainersMap
    +  private val allocatedRackCount = new HashMap[String, Int]()
    +
    +  // Containers to be released in next request to RM
    +  private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean]
    +
    +  // Additional memory overhead - in mb.
    +  protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
    +    YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
    +
    +  // Number of container requests that have been sent to, but not yet allocated by the
    +  // ApplicationMaster.
    +  private val numPendingAllocate = new AtomicInteger()
    +  private val numExecutorsRunning = new AtomicInteger()
    +  // Used to generate a unique id per executor
    +  private val executorIdCounter = new AtomicInteger()
    +  private val numExecutorsFailed = new AtomicInteger()
    +
    +  private val maxExecutors = args.numExecutors
    +
    +  protected val executorMemory = args.executorMemory
    +  protected val executorCores = args.executorCores
    +  protected val (preferredHostToCount, preferredRackToCount) =
    +    generateNodeToWeight(conf, preferredNodes)
    +
    +  def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
    +
    +  def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
    +
    +  def allocateResources() = {
    +    val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
    +
    +    if (missing > 0) {
    +      numPendingAllocate.addAndGet(missing)
    +      logInfo("Will Allocate %d executor containers, each with %d memory".format(
    +        missing,
    +        (executorMemory + memoryOverhead)))
    +    } else {
    +      logDebug("Empty allocation request ...")
    +    }
    +
    +    val allocateResponse = allocateContainers(missing)
    +    val allocatedContainers = allocateResponse.getAllocatedContainers()
    +
    +    if (allocatedContainers.size > 0) {
    +      var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
    +
    +      if (numPendingAllocateNow < 0) {
    +        numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
    +      }
    +
    +      logDebug("""
    +        Allocated containers: %d
    +        Current executor count: %d
    +        Containers released: %s
    +        Cluster resources: %s
    +        """.format(
    +          allocatedContainers.size,
    +          numExecutorsRunning.get(),
    +          releasedContainers,
    +          allocateResponse.getAvailableResources))
    +
    +      val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
    +
    +      for (container <- allocatedContainers) {
    +        if (isResourceConstraintSatisfied(container)) {
    +          // Add the accepted `container` to the host's list of already accepted,
    +          // allocated containers
    +          val host = container.getNodeId.getHost
    +          val containersForHost = hostToContainers.getOrElseUpdate(host,
    +            new ArrayBuffer[Container]())
    +          containersForHost += container
    +        } else {
    +          // Release container, since it doesn't satisfy resource constraints.
    +          (container)
    +        }
    +      }
    +
    +       // Find the appropriate containers to use.
    +      // TODO: Cleanup this group-by...
    +      val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
    +      val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
    +      val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
    +
    +      for (candidateHost <- hostToContainers.keySet) {
    +        val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
    +        val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
    +
    +        val remainingContainersOpt = hostToContainers.get(candidateHost)
    +        assert(remainingContainersOpt.isDefined)
    +        var remainingContainers = remainingContainersOpt.get
    +
    +        if (requiredHostCount >= remainingContainers.size) {
    +          // Since we have <= required containers, add all remaining containers to
    +          // `dataLocalContainers`.
    +          dataLocalContainers.put(candidateHost, remainingContainers)
    +          // There are no more free containers remaining.
    +          remainingContainers = null
    +        } else if (requiredHostCount > 0) {
    +          // Container list has more containers than we need for data locality.
    +          // Split the list into two: one based on the data local container count,
    +          // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
    +          // containers.
    +          val (dataLocal, remaining) = remainingContainers.splitAt(
    +            remainingContainers.size - requiredHostCount)
    +          dataLocalContainers.put(candidateHost, dataLocal)
    +
    +          // Invariant: remainingContainers == remaining
    +
    +          // YARN has a nasty habit of allocating a ton of containers on a host - discourage this.
    +          // Add each container in `remaining` to list of containers to release. If we have an
    +          // insufficient number of containers, then the next allocation cycle will reallocate
    +          // (but won't treat it as data local).
    +          // TODO(harvey): Rephrase this comment some more.
    +          for (container <- remaining) (container)
    --- End diff --
    
    Hmm, search & replace fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2169#issuecomment-53915884
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19487/consoleFull) for   PR 2169 at commit [`4dc9c83`](https://github.com/apache/spark/commit/4dc9c839338918266c2d31727f9b7b302c4b3c76).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  class AlphaAllocateResponse(response: AMResponse) extends YarnAllocateResponse `
      * `  protected trait YarnAllocateResponse `
      * `  class StableAllocateResponse(response: AllocateResponse) extends YarnAllocateResponse `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/2169#issuecomment-53716263
  
    @vanzin is this purely moving things around again or does it also subsume https://github.com/apache/spark/pull/655/?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3187] [yarn] Cleanup allocator code.

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2169#discussion_r16916722
  
    --- Diff: yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala ---
    @@ -521,48 +203,24 @@ private[yarn] class YarnAllocationHandler(
         rsrcRequest
       }
     
    -  def createReleasedContainerList(): ArrayBuffer[ContainerId] = {
    -
    +  private def createReleasedContainerList(): ArrayBuffer[ContainerId] = {
         val retval = new ArrayBuffer[ContainerId](1)
         // Iterator on COW list ...
    -    for (container <- releasedContainerList.iterator()){
    +    for (container <- releaseList.iterator()){
           retval += container
         }
         // Remove from the original list.
    -    if (! retval.isEmpty) {
    -      releasedContainerList.removeAll(retval)
    -      for (v <- retval) pendingReleaseContainers.put(v, true)
    -      logInfo("Releasing " + retval.size + " containers. pendingReleaseContainers : " +
    -        pendingReleaseContainers)
    +    if (!retval.isEmpty) {
    +      releaseList.removeAll(retval)
    +      logInfo("Releasing " + retval.size + " containers.")
         }
    -
         retval
       }
     
    -  // A simple method to copy the split info map.
    -  private def generateNodeToWeight(
    -    conf: Configuration,
    -    input: collection.Map[String, collection.Set[SplitInfo]]) :
    -  // host to count, rack to count
    -  (Map[String, Int], Map[String, Int]) = {
    -
    -    if (input == null) return (Map[String, Int](), Map[String, Int]())
    -
    -    val hostToCount = new HashMap[String, Int]
    -    val rackToCount = new HashMap[String, Int]
    -
    -    for ((host, splits) <- input) {
    -      val hostCount = hostToCount.getOrElse(host, 0)
    -      hostToCount.put(host, hostCount + splits.size)
    -
    -      val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
    -      if (rack != null){
    -        val rackCount = rackToCount.getOrElse(host, 0)
    -        rackToCount.put(host, rackCount + splits.size)
    -      }
    -    }
    -
    -    (hostToCount.toMap, rackToCount.toMap)
    +  class AlphaAllocateResponse(response: AMResponse) extends YarnAllocateResponse {
    --- End diff --
    
    I don't think anyone outside of this will need this should we make this and the Stable on private.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org