You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by sryza <gi...@git.apache.org> on 2014/05/06 02:03:39 UTC

[GitHub] spark pull request: SPARK-1714. Take advantage of AMRMClient APIs ...

GitHub user sryza opened a pull request:

    https://github.com/apache/spark/pull/655

    SPARK-1714. Take advantage of AMRMClient APIs to simplify logic in YarnA...

    ...llocationHandler
    
    This patch does a few things:
    * Uses AMRMClient APIs for matching containers to requests.
    * Calls AMRMClient.removeContainerRequest so that, when we use a container, we don't end up requesting it again.
    * Removes YarnAllocationHandler's host->rack cache.  YARN's RackResolver already does this caching, so this is redundant.
    * Adds tests for basic YarnAllocationHandler functionality.
    * Breaks up allocateResources.
    * A little bit of stylistic cleanup.
    
    
    The patch is lossy.  In particular, it loses the logic for trying to avoid containers bunching up on nodes.  As I understand it, the logic that's gone is two-part:
    * If, in a single response from the RM, we receive a set of containers on a node, and prefer some number of containers on that node greater than 0 but less than the number we received, give back the delta between what we preferred and what we received.
    
    This seems like a weird way to avoid bunching  E.g. it does nothing to avoid bunching when we don't request containers on particular nodes.  I think we can come up with something better.
    * If we receive more containers than the number of executors we desire, make sure that the containers we use are distributed as evenly as possible among the available nodes.
    
    It's rare for YARN to allocate more containers than requested, and when it does, it's only a small number.  In fact, with the current code, because we call allocate() and handle allocations in the same thread, it should never happen.  Having a bunch of logic to deal with this seems unnecessary.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sryza/spark sandy-spark-1714

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/655.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #655
    
----
commit 6bf2b4854cad7e4e6d9f42db7d61fc17a5a3cec9
Author: Sandy Ryza <sa...@cloudera.com>
Date:   2014-04-25T17:00:35Z

    SPARK-1714. Take advantage of AMRMClient APIs to simplify logic in YarnAllocationHandler

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1714. Take advantage of AMRMClient APIs ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/655#issuecomment-42257629
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1714. Take advantage of AMRMClient APIs ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/655#issuecomment-42255465
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1714. Take advantage of AMRMClient APIs ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/655#issuecomment-44567239
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1714. Take advantage of AMRMClient APIs ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/655#issuecomment-42257632
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14688/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1714. Take advantage of AMRMClient APIs ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/655#issuecomment-44572185
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1714. Take advantage of AMRMClient APIs ...

Posted by sryza <gi...@git.apache.org>.
Github user sryza commented on the pull request:

    https://github.com/apache/spark/pull/655#issuecomment-54780393
  
    Unfortunately the cleanup refactored a bunch of common code between yarn-alpha and yarn-stable that no longer would have been common after this patch (because, after 2.2, YARN provides APIs that handle most of the complexity).  If we're planning to drop support for yarn-alpha sometime soon, it might be easier for this to wait on that? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-1714. Take advantage of AMRMClient APIs ...

Posted by sryza <gi...@git.apache.org>.
Github user sryza commented on the pull request:

    https://github.com/apache/spark/pull/655#issuecomment-45571300
  
    I tried testing this manually today and found that the locality stuff doesn't work even without this patch - https://issues.apache.org/jira/browse/SPARK-2089.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1714. Take advantage of AMRMClient APIs ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/655#issuecomment-45254365
  
    cc @mridul as he wrote most of this code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1714. Take advantage of AMRMClient APIs ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/655#issuecomment-42259212
  
    Looks OK to me; I'll test this out separately (too see if it has any effect on the tests I'm currently running).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1714. Take advantage of AMRMClient APIs ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/655#issuecomment-44567226
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1714. Take advantage of AMRMClient APIs ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/655#discussion_r12307994
  
    --- Diff: yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala ---
    @@ -105,278 +96,222 @@ private[yarn] class YarnAllocationHandler(
     
       def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
     
    -  def isResourceConstraintSatisfied(container: Container): Boolean = {
    -    container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
    -  }
    -
       def releaseContainer(container: Container) {
         val containerId = container.getId
    -    pendingReleaseContainers.put(containerId, true)
         amClient.releaseAssignedContainer(containerId)
       }
     
    +  /**
    +   * Heartbeat to the ResourceManager. Passes along any ContainerRequests we've added to the
    +   * AMRMClient. If there are no pending requests, lets the ResourceManager know we're still alive.
    +   */
       def allocateResources() {
    -    // We have already set the container request. Poll the ResourceManager for a response.
    -    // This doubles as a heartbeat if there are no pending container requests.
         val progressIndicator = 0.1f
         val allocateResponse = amClient.allocate(progressIndicator)
     
         val allocatedContainers = allocateResponse.getAllocatedContainers()
         if (allocatedContainers.size > 0) {
    -      var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
    -
    -      if (numPendingAllocateNow < 0) {
    -        numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
    -      }
    -
           logDebug("""
             Allocated containers: %d
             Current executor count: %d
             Containers released: %s
    -        Containers to-be-released: %s
             Cluster resources: %s
    +               """.format(
    +        allocatedContainers.size,
    +        numExecutorsRunning.get(),
    +        releasedContainerList,
    +        allocateResponse.getAvailableResources))
    +
    +      handleAllocatedContainers(allocatedContainers)
    +    }
    +
    +    val completedContainers = allocateResponse.getCompletedContainersStatuses()
    +    if (completedContainers.size > 0) {
    +      logDebug("Completed %d containers".format(completedContainers.size))
    +
    +      processCompletedContainers(completedContainers)
    +
    +      logDebug("""
    +        Finished processing %d completed containers.
    +        Current number of executors running: %d,
    +        releasedContainerList: %s,
             """.format(
    -          allocatedContainers.size,
    +          completedContainers.size,
               numExecutorsRunning.get(),
    -          releasedContainerList,
    -          pendingReleaseContainers,
    -          allocateResponse.getAvailableResources))
    -
    -      val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
    -
    -      for (container <- allocatedContainers) {
    -        if (isResourceConstraintSatisfied(container)) {
    -          // Add the accepted `container` to the host's list of already accepted,
    -          // allocated containers
    -          val host = container.getNodeId.getHost
    -          val containersForHost = hostToContainers.getOrElseUpdate(host,
    -            new ArrayBuffer[Container]())
    -          containersForHost += container
    -        } else {
    -          // Release container, since it doesn't satisfy resource constraints.
    -          releaseContainer(container)
    -        }
    +          releasedContainerList))
    +    }
    +  }
    +
    +  def handleAllocatedContainers(allocatedContainers: Seq[Container]) {
    +    val numPendingAllocateNow = numPendingAllocate.addAndGet(-allocatedContainers.size)
    +
    +    if (numPendingAllocateNow < 0) {
    +      numPendingAllocate.addAndGet(-numPendingAllocateNow)
    +    }
    +
    +    val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
    +
    +    // Match incoming requests by host
    +    val remainingAfterHostMatches = new ArrayBuffer[Container]()
    +    for (allocatedContainer <- allocatedContainers) {
    +      matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
    +        containersToUse, remainingAfterHostMatches)
    +    }
    +
    +    // Match remaining by rack
    +    val remainingAfterRackMatches = new ArrayBuffer[Container]()
    +    for (allocatedContainer <- remainingAfterHostMatches) {
    +      val rack = RackResolver.resolve(conf, allocatedContainer.getNodeId.getHost).getNetworkLocation
    +      matchContainerToRequest(allocatedContainer, rack, containersToUse,
    +        remainingAfterRackMatches)
    +    }
    +
    +    // Assign remaining that are neither node-local nor rack-local
    +    val remainingAfterOffRackMatches = new ArrayBuffer[Container]()
    +    for (allocatedContainer <- remainingAfterRackMatches) {
    +      matchContainerToRequest(allocatedContainer, "*", containersToUse,
    +        remainingAfterOffRackMatches)
    +    }
    +
    +    if (!remainingAfterOffRackMatches.isEmpty) {
    +      logWarning("Received containers that did not satisfy resource constraints: "
    +        + remainingAfterOffRackMatches)
    +      for (container <- remainingAfterOffRackMatches) {
    +        amClient.releaseAssignedContainer(container.getId)
           }
    +    }
     
    -       // Find the appropriate containers to use.
    -      // TODO: Cleanup this group-by...
    -      val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
    -      val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
    -      val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
    -
    -      for (candidateHost <- hostToContainers.keySet) {
    -        val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
    -        val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
    -
    -        val remainingContainersOpt = hostToContainers.get(candidateHost)
    -        assert(remainingContainersOpt.isDefined)
    -        var remainingContainers = remainingContainersOpt.get
    -
    -        if (requiredHostCount >= remainingContainers.size) {
    -          // Since we have <= required containers, add all remaining containers to
    -          // `dataLocalContainers`.
    -          dataLocalContainers.put(candidateHost, remainingContainers)
    -          // There are no more free containers remaining.
    -          remainingContainers = null
    -        } else if (requiredHostCount > 0) {
    -          // Container list has more containers than we need for data locality.
    -          // Split the list into two: one based on the data local container count,
    -          // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
    -          // containers.
    -          val (dataLocal, remaining) = remainingContainers.splitAt(
    -            remainingContainers.size - requiredHostCount)
    -          dataLocalContainers.put(candidateHost, dataLocal)
    -
    -          // Invariant: remainingContainers == remaining
    -
    -          // YARN has a nasty habit of allocating a ton of containers on a host - discourage this.
    -          // Add each container in `remaining` to list of containers to release. If we have an
    -          // insufficient number of containers, then the next allocation cycle will reallocate
    -          // (but won't treat it as data local).
    -          // TODO(harvey): Rephrase this comment some more.
    -          for (container <- remaining) releaseContainer(container)
    -          remainingContainers = null
    -        }
    +    runAllocatedContainers(containersToUse)
    +
    +    logDebug("""
    +        Finished allocating %s containers (from %s originally).
    +        Current number of executors running: %d,
    +        releasedContainerList: %s,
    +             """.format(
    +      containersToUse,
    +      allocatedContainers,
    +      numExecutorsRunning.get(),
    +      releasedContainerList))
    +  }
    +
    +  def runAllocatedContainers(containersToUse: ArrayBuffer[Container]) {
    +    for (container <- containersToUse) {
    +      val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
    +      val executorHostname = container.getNodeId.getHost
    +      val containerId = container.getId
    +
    +      val executorMemoryWithOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
    --- End diff --
    
    nit: parentheses unnecessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1714. Take advantage of AMRMClient APIs ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/655#issuecomment-42255453
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1714. Take advantage of AMRMClient APIs ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/655#issuecomment-54877812
  
    Hopefully not much longer but we need to have official proposal and make sure others aren't using it.  So I'm fine with waiting.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-1714. Take advantage of AMRMClient APIs ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/655#issuecomment-44572188
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15292/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1714. Take advantage of AMRMClient APIs ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/655#issuecomment-54307385
  
    Hey @sryza.  We put in some cleanup jira, would you be able to upmerge this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-1714. Take advantage of AMRMClient APIs ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/655#issuecomment-61993923
  
    @sryza can we just close this out until further work is done?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-1714. Take advantage of AMRMClient APIs ...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/655#discussion_r13227671
  
    --- Diff: yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala ---
    @@ -105,278 +96,222 @@ private[yarn] class YarnAllocationHandler(
     
       def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
     
    -  def isResourceConstraintSatisfied(container: Container): Boolean = {
    -    container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
    -  }
    -
       def releaseContainer(container: Container) {
         val containerId = container.getId
    -    pendingReleaseContainers.put(containerId, true)
         amClient.releaseAssignedContainer(containerId)
       }
     
    +  /**
    +   * Heartbeat to the ResourceManager. Passes along any ContainerRequests we've added to the
    +   * AMRMClient. If there are no pending requests, lets the ResourceManager know we're still alive.
    +   */
       def allocateResources() {
    -    // We have already set the container request. Poll the ResourceManager for a response.
    -    // This doubles as a heartbeat if there are no pending container requests.
         val progressIndicator = 0.1f
         val allocateResponse = amClient.allocate(progressIndicator)
     
         val allocatedContainers = allocateResponse.getAllocatedContainers()
         if (allocatedContainers.size > 0) {
    -      var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
    -
    -      if (numPendingAllocateNow < 0) {
    -        numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
    -      }
    -
           logDebug("""
             Allocated containers: %d
             Current executor count: %d
             Containers released: %s
    -        Containers to-be-released: %s
             Cluster resources: %s
    +               """.format(
    +        allocatedContainers.size,
    +        numExecutorsRunning.get(),
    +        releasedContainerList,
    +        allocateResponse.getAvailableResources))
    +
    +      handleAllocatedContainers(allocatedContainers)
    +    }
    +
    +    val completedContainers = allocateResponse.getCompletedContainersStatuses()
    +    if (completedContainers.size > 0) {
    +      logDebug("Completed %d containers".format(completedContainers.size))
    +
    +      processCompletedContainers(completedContainers)
    +
    +      logDebug("""
    +        Finished processing %d completed containers.
    +        Current number of executors running: %d,
    +        releasedContainerList: %s,
             """.format(
    -          allocatedContainers.size,
    +          completedContainers.size,
               numExecutorsRunning.get(),
    -          releasedContainerList,
    -          pendingReleaseContainers,
    -          allocateResponse.getAvailableResources))
    -
    -      val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
    -
    -      for (container <- allocatedContainers) {
    -        if (isResourceConstraintSatisfied(container)) {
    -          // Add the accepted `container` to the host's list of already accepted,
    -          // allocated containers
    -          val host = container.getNodeId.getHost
    -          val containersForHost = hostToContainers.getOrElseUpdate(host,
    -            new ArrayBuffer[Container]())
    -          containersForHost += container
    -        } else {
    -          // Release container, since it doesn't satisfy resource constraints.
    -          releaseContainer(container)
    -        }
    +          releasedContainerList))
    +    }
    +  }
    +
    +  def handleAllocatedContainers(allocatedContainers: Seq[Container]) {
    +    val numPendingAllocateNow = numPendingAllocate.addAndGet(-allocatedContainers.size)
    +
    +    if (numPendingAllocateNow < 0) {
    +      numPendingAllocate.addAndGet(-numPendingAllocateNow)
    +    }
    +
    +    val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
    +
    +    // Match incoming requests by host
    +    val remainingAfterHostMatches = new ArrayBuffer[Container]()
    +    for (allocatedContainer <- allocatedContainers) {
    +      matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
    +        containersToUse, remainingAfterHostMatches)
    +    }
    +
    +    // Match remaining by rack
    +    val remainingAfterRackMatches = new ArrayBuffer[Container]()
    +    for (allocatedContainer <- remainingAfterHostMatches) {
    +      val rack = RackResolver.resolve(conf, allocatedContainer.getNodeId.getHost).getNetworkLocation
    +      matchContainerToRequest(allocatedContainer, rack, containersToUse,
    +        remainingAfterRackMatches)
    +    }
    +
    +    // Assign remaining that are neither node-local nor rack-local
    +    val remainingAfterOffRackMatches = new ArrayBuffer[Container]()
    +    for (allocatedContainer <- remainingAfterRackMatches) {
    +      matchContainerToRequest(allocatedContainer, "*", containersToUse,
    +        remainingAfterOffRackMatches)
    +    }
    +
    +    if (!remainingAfterOffRackMatches.isEmpty) {
    +      logWarning("Received containers that did not satisfy resource constraints: "
    +        + remainingAfterOffRackMatches)
    +      for (container <- remainingAfterOffRackMatches) {
    +        amClient.releaseAssignedContainer(container.getId)
           }
    +    }
     
    -       // Find the appropriate containers to use.
    -      // TODO: Cleanup this group-by...
    -      val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
    -      val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
    -      val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
    -
    -      for (candidateHost <- hostToContainers.keySet) {
    -        val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
    -        val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
    -
    -        val remainingContainersOpt = hostToContainers.get(candidateHost)
    -        assert(remainingContainersOpt.isDefined)
    -        var remainingContainers = remainingContainersOpt.get
    -
    -        if (requiredHostCount >= remainingContainers.size) {
    -          // Since we have <= required containers, add all remaining containers to
    -          // `dataLocalContainers`.
    -          dataLocalContainers.put(candidateHost, remainingContainers)
    -          // There are no more free containers remaining.
    -          remainingContainers = null
    -        } else if (requiredHostCount > 0) {
    -          // Container list has more containers than we need for data locality.
    -          // Split the list into two: one based on the data local container count,
    -          // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
    -          // containers.
    -          val (dataLocal, remaining) = remainingContainers.splitAt(
    -            remainingContainers.size - requiredHostCount)
    -          dataLocalContainers.put(candidateHost, dataLocal)
    -
    -          // Invariant: remainingContainers == remaining
    -
    -          // YARN has a nasty habit of allocating a ton of containers on a host - discourage this.
    -          // Add each container in `remaining` to list of containers to release. If we have an
    -          // insufficient number of containers, then the next allocation cycle will reallocate
    -          // (but won't treat it as data local).
    -          // TODO(harvey): Rephrase this comment some more.
    -          for (container <- remaining) releaseContainer(container)
    -          remainingContainers = null
    -        }
    +    runAllocatedContainers(containersToUse)
    +
    +    logDebug("""
    +        Finished allocating %s containers (from %s originally).
    +        Current number of executors running: %d,
    +        releasedContainerList: %s,
    +             """.format(
    +      containersToUse,
    +      allocatedContainers,
    +      numExecutorsRunning.get(),
    +      releasedContainerList))
    +  }
    +
    +  def runAllocatedContainers(containersToUse: ArrayBuffer[Container]) {
    +    for (container <- containersToUse) {
    +      val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
    +      val executorHostname = container.getNodeId.getHost
    +      val containerId = container.getId
    +
    +      val executorMemoryWithOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
    +      assert(container.getResource.getMemory >= executorMemoryWithOverhead)
    +
    +      if (numExecutorsRunningNow > maxExecutors) {
    +        logInfo("""Ignoring container %s at host %s, since we already have the required number of
    +            containers.""".format(containerId, executorHostname))
    +        amClient.releaseAssignedContainer(container.getId)
    +        numExecutorsRunning.decrementAndGet()
    +      } else {
    +        val executorId = executorIdCounter.incrementAndGet().toString
    +        val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
    +          sparkConf.get("spark.driver.host"),
    +          sparkConf.get("spark.driver.port"),
    +          CoarseGrainedSchedulerBackend.ACTOR_NAME)
    +
    +        logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
    +
    +        val rack = RackResolver.resolve(conf, executorHostname).getNetworkLocation
    +        allocatedHostToContainersMap.synchronized {
    +          val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
    +            new HashSet[ContainerId]())
    +
    +          containerSet += containerId
    +          allocatedContainerToHostMap.put(containerId, executorHostname)
     
    -        // For rack local containers
    -        if (remainingContainers != null) {
    -          val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
               if (rack != null) {
    -            val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
    -            val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
    -              rackLocalContainers.getOrElse(rack, List()).size
    -
    -            if (requiredRackCount >= remainingContainers.size) {
    -              // Add all remaining containers to to `dataLocalContainers`.
    -              dataLocalContainers.put(rack, remainingContainers)
    -              remainingContainers = null
    -            } else if (requiredRackCount > 0) {
    -              // Container list has more containers that we need for data locality.
    -              // Split the list into two: one based on the data local container count,
    -              // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
    -              // containers.
    -              val (rackLocal, remaining) = remainingContainers.splitAt(
    -                remainingContainers.size - requiredRackCount)
    -              val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
    -                new ArrayBuffer[Container]())
    -
    -              existingRackLocal ++= rackLocal
    -
    -              remainingContainers = remaining
    -            }
    +            allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
               }
             }
    -
    -        if (remainingContainers != null) {
    -          // Not all containers have been consumed - add them to the list of off-rack containers.
    -          offRackContainers.put(candidateHost, remainingContainers)
    -        }
    +        logInfo("Launching ExecutorRunnable. driverUrl: %s,  executorHostname: %s".format(
    --- End diff --
    
    Two Spaces :`driverUrl: %s,  executorHostname`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1714. Take advantage of AMRMClient APIs ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/655#issuecomment-45270390
  
    @sryza did you do any manual testing for locality with this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1714. Take advantage of AMRMClient APIs ...

Posted by sryza <gi...@git.apache.org>.
Github user sryza closed the pull request at:

    https://github.com/apache/spark/pull/655


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-1714. Take advantage of AMRMClient APIs ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/655#discussion_r12308167
  
    --- Diff: yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocationHandlerSuite.scala ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.yarn
    +
    +import org.scalatest.FunSuite
    +import org.scalatest.BeforeAndAfterEach
    +import org.scalatest.matchers.ShouldMatchers
    +
    +import org.apache.hadoop.yarn.client.api.AMRMClient
    +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
    +import org.apache.hadoop.conf.Configuration
    +
    +import org.apache.spark.SparkConf
    +import org.apache.hadoop.net.DNSToSwitchMapping
    +
    +import java.util.{List => JList}
    +import java.util.Arrays
    +import org.apache.hadoop.fs.CommonConfigurationKeysPublic
    +import org.apache.hadoop.yarn.api.records._
    +
    +class MyResolver extends DNSToSwitchMapping {
    +
    +  override def resolve(names: JList[String]): JList[String] = {
    +    if (names.size > 0 && names.get(0) == "host3") Arrays.asList("/rack2")
    +    else Arrays.asList("/rack1")
    +  }
    +
    +  override def reloadCachedMappings() {}
    +
    +  def reloadCachedMappings(names: JList[String]) {}
    +}
    +
    +class YarnAllocationHandlerSuite extends FunSuite with ShouldMatchers with BeforeAndAfterEach {
    +  val conf = new Configuration()
    +  conf.setClass(
    +    CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
    +    classOf[MyResolver], classOf[DNSToSwitchMapping])
    +
    +  val sparkConf = new SparkConf()
    +  sparkConf.set("spark.driver.host", "localhost")
    +  sparkConf.set("spark.driver.port", "4040")
    +
    +  var rmClient: AMRMClient[ContainerRequest] = _
    +
    +  var containerNum = 0
    +
    +  val CONTAINER_RESOURCE = Resource.newInstance(2048 + YarnAllocationHandler.MEMORY_OVERHEAD, 6)
    +
    +  override def beforeEach() {
    +    rmClient = AMRMClient.createAMRMClient()
    +    rmClient.init(conf)
    +    rmClient.start()
    +
    +    // rmClient.registerApplicationMaster("Host", 10000, "")
    --- End diff --
    
    drop this code? or comment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---