You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Marcelo Vanzin (JIRA)" <ji...@apache.org> on 2017/07/28 17:36:00 UTC

[jira] [Resolved] (SPARK-21562) Spark may request extra containers if the rpc between YARN and spark is too fast

     [ https://issues.apache.org/jira/browse/SPARK-21562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Marcelo Vanzin resolved SPARK-21562.
------------------------------------
    Resolution: Duplicate

> Spark may request extra containers if the rpc between YARN and spark is too fast
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-21562
>                 URL: https://issues.apache.org/jira/browse/SPARK-21562
>             Project: Spark
>          Issue Type: Bug
>          Components: YARN
>    Affects Versions: 2.2.0
>            Reporter: Wei Chen
>              Labels: YARN
>
> hi huys,
> I find an interesting problem when spark tries to request containers from YARN. 
> Here is the case:
> In YarnAllocator.scala
> 1. this function requests container from YARN only if there are executors are not be requested. 
> {code:java}def updateResourceRequests(): Unit = {
>     val pendingAllocate = getPendingAllocate
>     val numPendingAllocate = pendingAllocate.size
>     val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
>   
>     if (missing > 0) {
>  ......
>     }
>   .....
> }
> {code}
> 2. After the requested containers are allocated(granted through RPC), then it will update the pending queues
>   
> {code:java}
> private def matchContainerToRequest(
>       allocatedContainer: Container,
>       location: String,
>       containersToUse: ArrayBuffer[Container],
>       remaining: ArrayBuffer[Container]): Unit = {
>       .....
>      
>    amClient.removeContainerRequest(containerRequest) //update pending queues
>    
>    .....
> }
> {code}
> 3. After the allocated containers are launched, it will update the running queue
> {code:java}
> private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
>     for (container <- containersToUse) {
>      ....
>     auncherPool.execute(new Runnable {
>             override def run(): Unit = {
>               try {
>                 new ExecutorRunnable(
>                   Some(container),
>                   conf,
>                   sparkConf,
>                   driverUrl,
>                   executorId,
>                   executorHostname,
>                   executorMemory,
>                   executorCores,
>                   appAttemptId.getApplicationId.toString,
>                   securityMgr,
>                   localResources
>                 ).run()
>                 logInfo(s"has launched $containerId")
>                 updateInternalState()   //update running queues
>      ....
>       
> } 
> }{code}
> However, in step 3 it will launch a thread to first launch ExecutorRunnable then update running queue. We found it would take almost 1 sec before the updating running queue function is called(updateInternalState()). So there would be an inconsistent situation here since the pending queue is updated but the running queue is not updated yet due to the launching thread does not reach updateInternalState() yet. If there is an RPC call to amClient.allocate() between this inconsistent interval, then more executors than targetNumExecutors would be requested.
> {noformat}
> Here is an example:
> Initial:
> targetNumExecutors      numPendingAllocate         numExecutorsRunning
> 1                              0                            0
> After first RPC call to amClient.allocate:
> targetNumExecutors      numPendingAllocate         numExecutorsRunning
> 1                              1                             0
> After the first allocated container is granted by YARN
> targetNumExecutors      numPendingAllocate         numExecutorsRunning
> 1                              0(is removed in step 2)      0
> =====>if there is a RPC call here to amClient.allocate(), then more containers are requested,
> however this situation is caused by the inconsistent state.
> After the container is launched in step 3
> targetNumExecutors      numPendingAllocate         numExecutorsRunning
> 1                               0                            1
> {noformat}
> =======================================================================
> I found this problem because I am changing requestType to test some features on YARN's opportunisitc containers(e.g., allocation takes 100ms) which is much faster then guaranteed containers(e.g., allocation takes almost 1s).
> I am not sure if I have a correct understanding.
> Appreciate anyone's help in this issue(correct me if I have miss understanding)
> Wei



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org