You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Chenghao Lyu <ch...@cs.umass.edu> on 2022/09/29 12:53:41 UTC

Depolying stage-level scheduling for Spark SQL

Hi,

I plan to deploy the stage-level scheduling for Spark SQL to apply some fine-grained optimizations over the DAG of stages. However, I am blocked by the following issues:

1. The current stage-level scheduling supports RDD APIs only. So is there a way to reuse the stage-level scheduling for Spark SQL? E.g., how to expose the RDD code (the transformations and actions) from a Spark SQL (with SQL syntax)?
2. We do not quite understand why a Spark SQL could trigger multiple jobs, and have some RDDs regenerated, as posted in here. Can anyone give us some insight on the reasons and whether we can avoid the RDD regeneration to save execution time?

Thanks in advance.

Cheers,
Chenghao

Re: Depolying stage-level scheduling for Spark SQL

Posted by Tom Graves <tg...@yahoo.com.INVALID>.
 1) In my opinion this is to complex for the average user. In this case I'm assuming you have some sort of optimizer that would apply and do it automatically for the user?  If its just in the research stage of things can you just modify Spark to do experiments?
2) I think the main thing is having the heuristics and logic for changing what the user requested.  it sounds like you might be working on a component to do this but I didn't read the paper you pointed to yet either.
Also note there are already plugin points into Spark to add rules to optimizer and physical plan for columnar, it sounds to me you might be working on something that might fit better as a plugin if it automatically figures out what it thinks the best thing is.  If this is the case I go back to number 1 above, can you modify spark to have the plugin point you need to do your experimentation to see if it makes sense.
Tom    On Friday, September 30, 2022, 11:31:35 AM CDT, Chenghao Lyu <ch...@cs.umass.edu> wrote:  
 
 Thanks for the clarification Tom!

A bit more backgrounds for what we want to do: we have proposed a fine-grained (stage-level) resource optimization approach in VLDB22 https://www.vldb.org/pvldb/vol15/p3098-lyu.pdf and would like to try it over Spark. Our approach can recommend the resource configuration for each stage automatically (by using ML and our optimization framework), and we would like to see how to embed it in Spark. Initially, we consider that there is no AQE to make it simpler. 

Now I see the problem in two folds (In both cases, the stage-level configurations will be automatically configured by our algorithm with the the upper and lower bounds of each tunable resource given by a user):

(1) If AQE is disabled in Spark SQL, and hence the RDD DAG will not be changed after the physical plan is selected, do you think it is feasible and worth exposing the RDDs and reusing the existing stage-level scheduling API for optimization? 
(2) If AQE is enabled in Spark SQL, I would agree and prefer to add the stage-level resource optimization inside the AQE. Since I am not very experienced with the AQE part, would you list more potential challenges it may lead to? 

Thanks in advance and I would really appreciate it if you could give us more feedback!
Cheers, ChenghaoOn Sep 30, 2022, 4:22 PM +0200, Tom Graves <tg...@yahoo.com.INVALID>, wrote:

see the original SPIP for as to why we only support RDD: https://issues.apache.org/jira/browse/SPARK-27495

The main problem is exactly what you are referring to. The RDD level is not exposed to the user when using SQL or Dataframe API. This is on purpose and user shouldn't have to know anything about the underlying impelementation using RDDs. Especially with AQE and other optimizations that could change things. You may start out with one physical plan and AQE can change it along the way, so how does user change RDD at that point?   It would be very difficult to expose this to the user and I don't think it should be.  I think we would have to come up with some other way to apply stage level scheduling to SQL/dataframe, or like mentioned in original issue if AQE gets smart enough it would just do it for the user, but lots of factors that come into play that make that difficult as well.
Tom On Friday, September 30, 2022, 04:15:36 AM CDT, Chenghao Lyu <ch...@cs.umass.edu> wrote:

Thanks for the reply! 

To clarify, for issue 2, it could still break apart a query into multiple jobs without AQE — I have turned off the AQE in my posted example. 

For 1, an end user just needs to turn on/off a knob to use the stage-level scheduling for Spark SQL — I am considering adding a component between the Spark SQL module and the Spark Core model to optimize the stage-level resource. 

Yes, SQL is declarative. It uses a sequence of components (such as a logical planner, physical planner, and CBO) to get a selected physical plan. The RDDs (with the transformations) are generated based on the selected physical plan for execution. For now, we could only get the top-level RDD of the DAG of RDDs by `spark.sql(q1).queryExecution.toRdd`, but it is not enough to make stage-level scheduling decisions. The stage-level resources are profiled based on the RDDs. If we could expose the all RDDs instead of the top-level RDD, it seems possible to apply the stage-level scheduling here.


P.S. let me attach the link for the RDD regeneration explicitly in case it is not shown on the mail-list website: https://stackoverflow.com/questions/73895506/how-to-avoid-rdd-regeneration-in-spark-sql
Cheers,ChenghaoOn Sep 29, 2022, 5:22 PM +0200, Herman van Hovell <he...@databricks.com>, wrote:

I think issue 2 is caused by adaptive query execution. This will break apart queries into multiple jobs, each subsequent job will generate a RDD that is based on previous ones. 
As for 1. I am not sure how much you want to expose to an end user here. SQL is declarative, and it does not specify how a query should be executed. I can imagine that you might use different resources for different types of stages, e.g. a scan stage and more compute heavy stages. This, IMO, should be based on analysis and costing the plan. For this RDD only stage level scheduling should be sufficient.
On Thu, Sep 29, 2022 at 8:56 AM Chenghao Lyu <ch...@cs.umass.edu> wrote:

Hi, 

I plan to deploy the stage-level scheduling for Spark SQL to apply some fine-grained optimizations over the DAG of stages. However, I am blocked by the following issues:   
   - The current stage-level scheduling supports RDD APIs only. So is there a way to reuse the stage-level scheduling for Spark SQL? E.g., how to expose the RDD code (the transformations and actions) from a Spark SQL (with SQL syntax)?
   - We do not quite understand why a Spark SQL could trigger multiple jobs, and have some RDDs regenerated, as posted in here. Can anyone give us some insight on the reasons and whether we can avoid the RDD regeneration to save execution time? 
Thanks in advance.
Cheers, Chenghao


 

Re: Depolying stage-level scheduling for Spark SQL

Posted by Chenghao Lyu <ch...@cs.umass.edu>.
Thanks for the clarification Tom!

A bit more backgrounds for what we want to do: we have proposed a fine-grained (stage-level) resource optimization approach in VLDB22 https://www.vldb.org/pvldb/vol15/p3098-lyu.pdf and would like to try it over Spark. Our approach can recommend the resource configuration for each stage automatically (by using ML and our optimization framework), and we would like to see how to embed it in Spark. Initially, we consider that there is no AQE to make it simpler.

Now I see the problem in two folds (In both cases, the stage-level configurations will be automatically configured by our algorithm with the the upper and lower bounds of each tunable resource given by a user):

(1) If AQE is disabled in Spark SQL, and hence the RDD DAG will not be changed after the physical plan is selected, do you think it is feasible and worth exposing the RDDs and reusing the existing stage-level scheduling API for optimization?
(2) If AQE is enabled in Spark SQL, I would agree and prefer to add the stage-level resource optimization inside the AQE. Since I am not very experienced with the AQE part, would you list more potential challenges it may lead to?

Thanks in advance and I would really appreciate it if you could give us more feedback!

Cheers,
Chenghao
On Sep 30, 2022, 4:22 PM +0200, Tom Graves <tg...@yahoo.com.INVALID>, wrote:
> see the original SPIP for as to why we only support RDD: https://issues.apache.org/jira/browse/SPARK-27495
>
>
> The main problem is exactly what you are referring to. The RDD level is not exposed to the user when using SQL or Dataframe API. This is on purpose and user shouldn't have to know anything about the underlying impelementation using RDDs. Especially with AQE and other optimizations that could change things. You may start out with one physical plan and AQE can change it along the way, so how does user change RDD at that point?   It would be very difficult to expose this to the user and I don't think it should be.  I think we would have to come up with some other way to apply stage level scheduling to SQL/dataframe, or like mentioned in original issue if AQE gets smart enough it would just do it for the user, but lots of factors that come into play that make that difficult as well.
>
> Tom
> On Friday, September 30, 2022, 04:15:36 AM CDT, Chenghao Lyu <ch...@cs.umass.edu> wrote:
>
>
> Thanks for the reply!
>
> To clarify, for issue 2, it could still break apart a query into multiple jobs without AQE — I have turned off the AQE in my posted example.
>
> For 1, an end user just needs to turn on/off a knob to use the stage-level scheduling for Spark SQL — I am considering adding a component between the Spark SQL module and the Spark Core model to optimize the stage-level resource.
>
> Yes, SQL is declarative. It uses a sequence of components (such as a logical planner, physical planner, and CBO) to get a selected physical plan. The RDDs (with the transformations) are generated based on the selected physical plan for execution. For now, we could only get the top-level RDD of the DAG of RDDs by `spark.sql(q1).queryExecution.toRdd`, but it is not enough to make stage-level scheduling decisions. The stage-level resources are profiled based on the RDDs. If we could expose the all RDDs instead of the top-level RDD, it seems possible to apply the stage-level scheduling here.
>
>
> P.S. let me attach the link for the RDD regeneration explicitly in case it is not shown on the mail-list website: https://stackoverflow.com/questions/73895506/how-to-avoid-rdd-regeneration-in-spark-sql
>
> Cheers,
> Chenghao
> On Sep 29, 2022, 5:22 PM +0200, Herman van Hovell <he...@databricks.com>, wrote:
> > I think issue 2 is caused by adaptive query execution. This will break apart queries into multiple jobs, each subsequent job will generate a RDD that is based on previous ones.
> >
> > As for 1. I am not sure how much you want to expose to an end user here. SQL is declarative, and it does not specify how a query should be executed. I can imagine that you might use different resources for different types of stages, e.g. a scan stage and more compute heavy stages. This, IMO, should be based on analysis and costing the plan. For this RDD only stage level scheduling should be sufficient.
> >
> > On Thu, Sep 29, 2022 at 8:56 AM Chenghao Lyu <ch...@cs.umass.edu> wrote:
> > > Hi,
> > >
> > > I plan to deploy the stage-level scheduling for Spark SQL to apply some fine-grained optimizations over the DAG of stages. However, I am blocked by the following issues:
> > >
> > > 1. The current stage-level scheduling supports RDD APIs only. So is there a way to reuse the stage-level scheduling for Spark SQL? E.g., how to expose the RDD code (the transformations and actions) from a Spark SQL (with SQL syntax)?
> > > 2. We do not quite understand why a Spark SQL could trigger multiple jobs, and have some RDDs regenerated, as posted in here. Can anyone give us some insight on the reasons and whether we can avoid the RDD regeneration to save execution time?
> > >
> > > Thanks in advance.
> > >
> > > Cheers,
> > > Chenghao

Re: Depolying stage-level scheduling for Spark SQL

Posted by Tom Graves <tg...@yahoo.com.INVALID>.
 see the original SPIP for as to why we only support RDD: https://issues.apache.org/jira/browse/SPARK-27495

The main problem is exactly what you are referring to. The RDD level is not exposed to the user when using SQL or Dataframe API. This is on purpose and user shouldn't have to know anything about the underlying impelementation using RDDs. Especially with AQE and other optimizations that could change things. You may start out with one physical plan and AQE can change it along the way, so how does user change RDD at that point?   It would be very difficult to expose this to the user and I don't think it should be.  I think we would have to come up with some other way to apply stage level scheduling to SQL/dataframe, or like mentioned in original issue if AQE gets smart enough it would just do it for the user, but lots of factors that come into play that make that difficult as well.
Tom     On Friday, September 30, 2022, 04:15:36 AM CDT, Chenghao Lyu <ch...@cs.umass.edu> wrote:  
 
 Thanks for the reply! 

To clarify, for issue 2, it could still break apart a query into multiple jobs without AQE — I have turned off the AQE in my posted example. 

For 1, an end user just needs to turn on/off a knob to use the stage-level scheduling for Spark SQL — I am considering adding a component between the Spark SQL module and the Spark Core model to optimize the stage-level resource. 

Yes, SQL is declarative. It uses a sequence of components (such as a logical planner, physical planner, and CBO) to get a selected physical plan. The RDDs (with the transformations) are generated based on the selected physical plan for execution. For now, we could only get the top-level RDD of the DAG of RDDs by `spark.sql(q1).queryExecution.toRdd`, but it is not enough to make stage-level scheduling decisions. The stage-level resources are profiled based on the RDDs. If we could expose the all RDDs instead of the top-level RDD, it seems possible to apply the stage-level scheduling here.


P.S. let me attach the link for the RDD regeneration explicitly in case it is not shown on the mail-list website: https://stackoverflow.com/questions/73895506/how-to-avoid-rdd-regeneration-in-spark-sql
Cheers,ChenghaoOn Sep 29, 2022, 5:22 PM +0200, Herman van Hovell <he...@databricks.com>, wrote:

I think issue 2 is caused by adaptive query execution. This will break apart queries into multiple jobs, each subsequent job will generate a RDD that is based on previous ones. 
As for 1. I am not sure how much you want to expose to an end user here. SQL is declarative, and it does not specify how a query should be executed. I can imagine that you might use different resources for different types of stages, e.g. a scan stage and more compute heavy stages. This, IMO, should be based on analysis and costing the plan. For this RDD only stage level scheduling should be sufficient.
On Thu, Sep 29, 2022 at 8:56 AM Chenghao Lyu <ch...@cs.umass.edu> wrote:

Hi, 

I plan to deploy the stage-level scheduling for Spark SQL to apply some fine-grained optimizations over the DAG of stages. However, I am blocked by the following issues:   
   - The current stage-level scheduling supports RDD APIs only. So is there a way to reuse the stage-level scheduling for Spark SQL? E.g., how to expose the RDD code (the transformations and actions) from a Spark SQL (with SQL syntax)?
   - We do not quite understand why a Spark SQL could trigger multiple jobs, and have some RDDs regenerated, as posted in here. Can anyone give us some insight on the reasons and whether we can avoid the RDD regeneration to save execution time? 
Thanks in advance.
Cheers, Chenghao

  

Re: Depolying stage-level scheduling for Spark SQL

Posted by Chenghao Lyu <ch...@cs.umass.edu>.
Thanks for the reply!

To clarify, for issue 2, it could still break apart a query into multiple jobs without AQE — I have turned off the AQE in my posted example.

For 1, an end user just needs to turn on/off a knob to use the stage-level scheduling for Spark SQL — I am considering adding a component between the Spark SQL module and the Spark Core model to optimize the stage-level resource.

Yes, SQL is declarative. It uses a sequence of components (such as a logical planner, physical planner, and CBO) to get a selected physical plan. The RDDs (with the transformations) are generated based on the selected physical plan for execution. For now, we could only get the top-level RDD of the DAG of RDDs by `spark.sql(q1).queryExecution.toRdd`, but it is not enough to make stage-level scheduling decisions. The stage-level resources are profiled based on the RDDs. If we could expose the all RDDs instead of the top-level RDD, it seems possible to apply the stage-level scheduling here.


P.S. let me attach the link for the RDD regeneration explicitly in case it is not shown on the mail-list website: https://stackoverflow.com/questions/73895506/how-to-avoid-rdd-regeneration-in-spark-sql

Cheers,
Chenghao
On Sep 29, 2022, 5:22 PM +0200, Herman van Hovell <he...@databricks.com>, wrote:
> I think issue 2 is caused by adaptive query execution. This will break apart queries into multiple jobs, each subsequent job will generate a RDD that is based on previous ones.
>
> As for 1. I am not sure how much you want to expose to an end user here. SQL is declarative, and it does not specify how a query should be executed. I can imagine that you might use different resources for different types of stages, e.g. a scan stage and more compute heavy stages. This, IMO, should be based on analysis and costing the plan. For this RDD only stage level scheduling should be sufficient.
>
> > On Thu, Sep 29, 2022 at 8:56 AM Chenghao Lyu <ch...@cs.umass.edu> wrote:
> > > Hi,
> > >
> > > I plan to deploy the stage-level scheduling for Spark SQL to apply some fine-grained optimizations over the DAG of stages. However, I am blocked by the following issues:
> > >
> > > 1. The current stage-level scheduling supports RDD APIs only. So is there a way to reuse the stage-level scheduling for Spark SQL? E.g., how to expose the RDD code (the transformations and actions) from a Spark SQL (with SQL syntax)?
> > > 2. We do not quite understand why a Spark SQL could trigger multiple jobs, and have some RDDs regenerated, as posted in here. Can anyone give us some insight on the reasons and whether we can avoid the RDD regeneration to save execution time?
> > >
> > > Thanks in advance.
> > >
> > > Cheers,
> > > Chenghao

Re: Depolying stage-level scheduling for Spark SQL

Posted by Herman van Hovell <he...@databricks.com.INVALID>.
I think issue 2 is caused by adaptive query execution. This will break
apart queries into multiple jobs, each subsequent job will generate a RDD
that is based on previous ones.

As for 1. I am not sure how much you want to expose to an end user here.
SQL is declarative, and it does not specify how a query should be executed.
I can imagine that you might use different resources for different types of
stages, e.g. a scan stage and more compute heavy stages. This, IMO, should
be based on analysis and costing the plan. For this RDD only stage level
scheduling should be sufficient.

On Thu, Sep 29, 2022 at 8:56 AM Chenghao Lyu <ch...@cs.umass.edu> wrote:

> Hi,
>
> I plan to deploy the stage-level scheduling for Spark SQL to apply some
> fine-grained optimizations over the DAG of stages. However, I am blocked by
> the following issues:
>
>    1. The current stage-level scheduling
>    <https://spark.apache.org/docs/latest/configuration.html#stage-level-scheduling-overview> supports
>    RDD APIs only. So is there a way to reuse the stage-level scheduling for
>    Spark SQL? E.g., how to expose the RDD code (the transformations and
>    actions) from a Spark SQL (with SQL syntax)?
>    2. We do not quite understand why a Spark SQL could trigger multiple
>    jobs, and have some RDDs regenerated, as posted in *here*
>    <https://stackoverflow.com/questions/73895506/how-to-avoid-rdd-regeneration-in-spark-sql>
>    . Can anyone give us some insight on the reasons and whether we can
>    avoid the RDD regeneration to save execution time?
>
> Thanks in advance.
>
> Cheers,
> Chenghao
>