You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Thomas Graves (JIRA)" <ji...@apache.org> on 2019/04/19 20:29:00 UTC

[jira] [Comment Edited] (SPARK-27495) Support Stage level resource configuration and scheduling

    [ https://issues.apache.org/jira/browse/SPARK-27495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16822166#comment-16822166 ] 

Thomas Graves edited comment on SPARK-27495 at 4/19/19 8:28 PM:
----------------------------------------------------------------

Unfortunately the link to the original design doc was removed from  https://issues.apache.org/jira/browse/SPARK-24615 but just for reference there was some good discussions about this in comments    https://issues.apache.org/jira/browse/SPARK-24615?focusedCommentId=16528293&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16528293 all the way through https://issues.apache.org/jira/browse/SPARK-24615?focusedCommentId=16567393&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16567393

Just to summarize the proposal there after the discussions it was something like:
{code:java}
val rdd2 = rdd1.withResources(....).mapPartitions(){code}
Possibly getting more detailed:
 rdd.withResources
 .prefer("/gpu/k80", 2) // prefix of resource logical name, amount .require("/cpu", 1)
 .require("/memory", 8192000000)
 .require("/disk", 1000000000000)
 The withResources would apply to just that stage.  If there were conflicting resources in say like a join
 val rddA = rdd.withResources.mapPartitions()

val rddB = rdd.withResources.mapPartitions()

val rddC = rddA.join(rddB)
 Then you would have to resolve that conflict by either choosing largest or merging the requirements obviously choosing the largest of the two.

There are also a lot of corner cases we need to handle such as mentioned:

 

 

 
{code:java}
So which means RDDs with different resources requirements in one stage may have conflicts. For example: rdd1.withResources.mapPartitions { xxx }.withResources.mapPartitions { xxx }.collect, resources in rdd1 may be different from map rdd, so currently what I can think is that:
1. always pick the latter with warning log to say that multiple different resources in one stage is illegal.
 2. fail the stage with warning log to say that multiple different resources in one stage is illegal.
 3. merge conflicts with maximum resources needs. For example rdd1 requires 3 gpus per task, rdd2 requires 4 gpus per task, then the merged requirement would be 4 gpus per task. (This is the high level description, details will be per partition based merging) [chosen].
 
{code}
 

 

One thing that was brought up multiple times is how a user will really know what stage it applies to.  Users aren't necessarily going to realize where the stage boundaries are.  I don't think anyone has a good solution to this.

Also I think for this to really be useful it has to be tied into dynamic allocation.  Without that they can just use the application level task configs we are adding in  SPARK-24615.

Of course the original proposal was only for RDDs as well. That was because it goes with barrier scheduling and I think the dataset/dataframe api is even harder to know where stage boundaries are because catalyst can optimize a bunch of things.

 


was (Author: tgraves):
Unfortunately the link to the original design doc was removed from  https://issues.apache.org/jira/browse/SPARK-24615 but just for reference there was some good discussions about this in comments    https://issues.apache.org/jira/browse/SPARK-24615?focusedCommentId=16528293&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16528293 all the way through https://issues.apache.org/jira/browse/SPARK-24615?focusedCommentId=16567393&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16567393

Just to summarize the proposal there after the discussions it was something like:
{code:java}
val rdd2 = rdd1.withResources(....).mapPartitions(){code}
Possibly getting more detailed:
rdd.withResources
  .prefer("/gpu/k80", 2) // prefix of resource logical name, amount  .require("/cpu", 1)
  .require("/memory", 8192000000)
  .require("/disk", 1000000000000)
The withResources would apply to just that stage.  If there were conflicting resources in say like a join
val rddA = rdd.withResources.mapPartitions()

val rddB = rdd.withResources.mapPartitions()

val rddC = rddA.join(rddB)
Then you would have to resolve that conflict by either choosing largest or merging the requirements obviously choosing the largest of the two.

There are also a lot of corner cases we need to handle such as mentioned:

 

 

 
{noformat}
So which means RDDs with different resources requirements in one stage may have conflicts. For example: rdd1.withResources.mapPartitions { xxx }.withResources.mapPartitions { xxx }.collect, resources in rdd1 may be different from map rdd, so currently what I can think is that:
 
1. always pick the latter with warning log to say that multiple different resources in one stage is illegal.
 2. fail the stage with warning log to say that multiple different resources in one stage is illegal.
 3. merge conflicts with maximum resources needs. For example rdd1 requires 3 gpus per task, rdd2 requires 4 gpus per task, then the merged requirement would be 4 gpus per task. (This is the high level description, details will be per partition based merging) [chosen].
{noformat}
 

 

 

One thing that was brought up multiple times is how a user will really know what stage it applies to.  Users aren't necessarily going to realize where the stage boundaries are.  I don't think anyone has a good solution to this.

Also I think for this to really be useful it has to be tied into dynamic allocation.  Without that they can just use the application level task configs we are adding in  SPARK-24615.

Of course the original proposal was only for RDDs as well. That was because it goes with barrier scheduling and I think the dataset/dataframe api is even harder to know where stage boundaries are because catalyst can optimize a bunch of things.

 

> Support Stage level resource configuration and scheduling
> ---------------------------------------------------------
>
>                 Key: SPARK-27495
>                 URL: https://issues.apache.org/jira/browse/SPARK-27495
>             Project: Spark
>          Issue Type: Story
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Thomas Graves
>            Priority: Major
>
> Currently Spark supports CPU level scheduling and we are adding in accelerator aware scheduling with https://issues.apache.org/jira/browse/SPARK-24615, but both of those are scheduling via application level configurations.  Meaning there is one configuration that is set for the entire lifetime of the application and the user can't change it between Spark jobs/stages within that application.  
> Many times users have different requirements for different stages of their application so they want to be able to configure at the stage level what resources are required for that stage.
> For example, I might start a spark application which first does some ETL work that needs lots of cores to run many tasks in parallel, then once that is done I want to run some ML job and at that point I want GPU's, less CPU's, and more memory.
> With this Jira we want to add the ability for users to specify the resources for different stages.
> Note that https://issues.apache.org/jira/browse/SPARK-24615 had some discussions on this but this part of it was removed from that.
> We should come up with a proposal on how to do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org