You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by "Wang, Carson" <ca...@intel.com> on 2018/07/27 16:48:40 UTC

[DISCUSS] Adaptive execution in Spark SQL

Dear all,

The initial support of adaptive execution[SPARK-9850<https://issues.apache.org/jira/browse/SPARK-9850>] in Spark SQL has been there since Spark 1.6, but there is no more update since then. One of the key features in adaptive execution is to determine the number of reducer automatically at runtime. This is a feature required by many Spark users especially the infrastructure team in many companies, as there are thousands of queries running on the cluster where the shuffle partition number may not be set properly for every query. The same shuffle partition number also doesn't work well for all stages in a query because each stage has different input data size. Other features in adaptive execution include optimizing join strategy at runtime and handling skewed join automatically, which have not been implemented in Spark.

In the current implementation, an Exchange coordinator is used to determine the number of post-shuffle partitions for a stage. However, exchange coordinator is added when Exchange is being added, so it actually lacks a global picture of all shuffle dependencies of a post-shuffle stage.  I.e. for 3 tables' join in a single stage, the same ExchangeCoordinator should be used in three Exchanges but currently two separated ExchangeCoordinator will be added. It also adds additional Exchanges in some cases. So I think it is time to rethink how to better support adaptive execution in Spark SQL. I have proposed a new approach in SPARK-23128<https://issues.apache.org/jira/browse/SPARK-23128>. A document about the idea is described at here<https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing>. The idea about how to changing a sort merge join to a broadcast hash join at runtime is also described in a separated doc<https://docs.google.com/document/d/1WCJ2BmA8_dJL_jmYie_x9ZCrz7r3ZjleJSoX0dlDXaw/edit?usp=sharing>.

The docs have been there for a while, and I also had an implementation based on Spark 2.3 available at https://github.com/Intel-bigdata/spark-adaptive. The code is split into 7 PRs labeled with AE2.3-0x if you look at the pull requests. I asked many partners to evaluate the patch including Baidu, Alibaba, JD.com, etc and received very good feedback. Baidu also shared their result at the Jira. We also finished a 100 TB TPC-DS benchmark earlier using the patch which passed all queries with good performance improvement.

I'd like to call for a review on the docs and even code and we can further discuss in this thread. Thanks very much!

Thanks,
Carson


Re: [DISCUSS] Adaptive execution in Spark SQL

Posted by "Yu, Yucai" <yy...@ebay.com>.
Hi,

I would like to share some experience when using AE in eBay’s data warehouse.


  1.  Saving many manual setting and tuning effort. Setting shuffle.partition one by one query is annoy, with AE, we just need set a big number for all queries.
  2.  Saving memory. With AE, we can start less executors and less tasks for those small stages, it greatly helps save memory. We see many queries has 1.5x improvement on MB-Seconds.
  3.  Reducing execution time when SortMergeJoin to BroadcastHashJoin. Some kinds of queries could really benefit a lot from this, especially when one side of intermediate tables is skewed or quite large, we see 2-3x improvement.
  4.  Reducing number of output files. Based on the partition data size, AE can help merge small files in the last stage. Quite helpful to HDFS.
  5.  Handling skewed join. Data skew is difficult to handle in current Spark engine, because it is quite hard to detect at planning phase. AE can detect the skewed data in the runtime, and dynamically increases parallelism for the skewed task. We see 1.6x improvement on execution time.

AE is landing in our biggest Spark cluster, we can share more real production performance numbers in near future.

Thanks,
Yucai

From: "Wang, Carson" <ca...@intel.com>
Date: Tuesday, July 31, 2018 at 4:06 PM
To: Marco Gaido <ma...@gmail.com>, Wenchen Fan <cl...@gmail.com>
Cc: "xyliyuanjian@gmail.com" <xy...@gmail.com>, Spark dev list <de...@spark.apache.org>
Subject: RE: [DISCUSS] Adaptive execution in Spark SQL

Thanks Marco and Wenchen for reviewing. It sounds good to target this for 3.0.

I can also share more data on the benchmark. In the 100 TB TPC-DS benchmark we performed on a 100-node cluster, we saw 90% of the 103 queries had performance gain, and 46% of them are more than 1.1x faster. Individual query can have up to 3.8x performance gain(q8: 3.8x, q81:2.1x, q30: 2.1x, q51: 1.8x, q61: 1.6x, q60: 1.6x …).  In addition, 5 queries failed earlier can pass in adaptive execution mode successfully. The detailed report is also available here<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsoftware.intel.com%2Fen-us%2Farticles%2Fspark-sql-adaptive-execution-at-100-tb&data=02%7C01%7Cyyu1%40ebay.com%7C073d04a2b6a340e3901b08d5f6bc42c7%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636686210810464807&sdata=dtsXUGaQAQbyCIUHIkjEh%2BPtdX%2BMnZeBuIOb%2FaAjsk0%3D&reserved=0>.

Thanks,
Carson

From: Marco Gaido [mailto:marcogaido91@gmail.com]
Sent: Tuesday, July 31, 2018 3:00 PM
To: Wenchen Fan <cl...@gmail.com>
Cc: xyliyuanjian@gmail.com; Wang, Carson <ca...@intel.com>; Spark dev list <de...@spark.apache.org>
Subject: Re: [DISCUSS] Adaptive execution in Spark SQL

Hi all,

I also like this idea very much and I think it may bring also other performance improvements in the future.

Thanks to everybody who worked on this.

I agree to target this feature for 3.0.

Thanks everybody,
Bests.
Marco

On Tue, 31 Jul 2018, 08:39 Wenchen Fan, <cl...@gmail.com>> wrote:
Hi Carson and Yuanjian,

Thanks for contributing to this project and sharing the production use cases! I believe the adaptive execution will be a very important feature of Spark SQL and will definitely benefit a lot of users.

I went through the design docs and the high-level design totally makes sense to me. Since the code freeze of Spark 2.4 is close, I'm afraid we may not have enough time to review the code and merge it, how about we target this feature to Spark 3.0?

Besides, it would be great if we can have some real benchmark numbers for it.

Thanks,
Wenchen

On Tue, Jul 31, 2018 at 2:26 PM Yuanjian Li <xy...@gmail.com>> wrote:
Thanks Carson, great note!
Actually Baidu has ported this patch in our internal folk. I collected some user cases and performance improve effect during Baidu internal usage of this patch, summarize as following 3 scenario:
1. SortMergeJoin to BroadcastJoin
The SortMergeJoin transform to BroadcastJoin over deeply tree node can bring us 50% to 200% boosting on query performance, and this strategy alway hit the BI scenario like join several tables with filter strategy in subquery
2. Long running application or use Spark as a service
In this case, long running application refers to the duration of application near 1 hour. Using Spark as a service refers to use spark-shell and keep submit sql or use the service of Spark like Zeppelin, Livy or our internal sql service Baidu BigSQL. In such scenario, all spark jobs share same partition number, so enable AE and add configs about expected task info including data size, row number, min\max partition number and etc, will bring us 50%-100% boosting on performance improvement.
3. GraphFrame jobs
The last scenario is the application use GraphFrame, in this case, user has a 2-dimension graph with 1 billion edges, use the connected componentsalgorithm in GraphFrame. With enabling AE, the duration of app reduce from 58min to 32min, almost 100% boosting on performance improvement.

The detailed screenshot and config in the JIRA SPARK-23128<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSPARK-23128&data=02%7C01%7Cyyu1%40ebay.com%7C073d04a2b6a340e3901b08d5f6bc42c7%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636686210810474820&sdata=yz0FmcTbe8vd1sVYU1EJ%2FD2yO%2FuLvRZzMBWe0eUUot8%3D&reserved=0> attached pdf.

Thanks,
Yuanjian Li

Wang, Carson <ca...@intel.com>> 于2018年7月28日周六 上午12:49写道:
Dear all,

The initial support of adaptive execution[SPARK-9850<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSPARK-9850&data=02%7C01%7Cyyu1%40ebay.com%7C073d04a2b6a340e3901b08d5f6bc42c7%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636686210810484828&sdata=kX0ktjuKoxlaMl%2FW%2BlxciIzAbCxHfadiLTMGxzmI6Wc%3D&reserved=0>] in Spark SQL has been there since Spark 1.6, but there is no more update since then. One of the key features in adaptive execution is to determine the number of reducer automatically at runtime. This is a feature required by many Spark users especially the infrastructure team in many companies, as there are thousands of queries running on the cluster where the shuffle partition number may not be set properly for every query. The same shuffle partition number also doesn’t work well for all stages in a query because each stage has different input data size. Other features in adaptive execution include optimizing join strategy at runtime and handling skewed join automatically, which have not been implemented in Spark.

In the current implementation, an Exchange coordinator is used to determine the number of post-shuffle partitions for a stage. However, exchange coordinator is added when Exchange is being added, so it actually lacks a global picture of all shuffle dependencies of a post-shuffle stage.  I.e. for 3 tables’ join in a single stage, the same ExchangeCoordinator should be used in three Exchanges but currently two separated ExchangeCoordinator will be added. It also adds additional Exchanges in some cases. So I think it is time to rethink how to better support adaptive execution in Spark SQL. I have proposed a new approach in SPARK-23128<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSPARK-23128&data=02%7C01%7Cyyu1%40ebay.com%7C073d04a2b6a340e3901b08d5f6bc42c7%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636686210810494835&sdata=BNtk434cEZ9fm%2FeZH1ytRXwGSc2cMIJ1UA60r3qYDs8%3D&reserved=0>. A document about the idea is described at here<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.google.com%2Fdocument%2Fd%2F1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k%2Fedit%3Fusp%3Dsharing&data=02%7C01%7Cyyu1%40ebay.com%7C073d04a2b6a340e3901b08d5f6bc42c7%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636686210810504843&sdata=KBPriP1NUulHFE6D04szN6yaTHALUiIoJp%2FQKNRbMck%3D&reserved=0>. The idea about how to changing a sort merge join to a broadcast hash join at runtime is also described in a separated doc<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.google.com%2Fdocument%2Fd%2F1WCJ2BmA8_dJL_jmYie_x9ZCrz7r3ZjleJSoX0dlDXaw%2Fedit%3Fusp%3Dsharing&data=02%7C01%7Cyyu1%40ebay.com%7C073d04a2b6a340e3901b08d5f6bc42c7%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636686210810514851&sdata=eh%2FRfsTgIEOEmGP2bJj1oKwxHenWZSldPe6K9SmfMhQ%3D&reserved=0>.

The docs have been there for a while, and I also had an implementation based on Spark 2.3 available at https://github.com/Intel-bigdata/spark-adaptive<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2FIntel-bigdata%2Fspark-adaptive&data=02%7C01%7Cyyu1%40ebay.com%7C073d04a2b6a340e3901b08d5f6bc42c7%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636686210810524858&sdata=BEfSDp3Wt%2BhCwCT22gmiIJQSA5AlWUziAd%2Fph9zCevM%3D&reserved=0>. The code is split into 7 PRs labeled with AE2.3-0x if you look at the pull requests. I asked many partners to evaluate the patch including Baidu, Alibaba, JD.com, etc and received very good feedback. Baidu also shared their result at the Jira. We also finished a 100 TB TPC-DS benchmark earlier using the patch which passed all queries with good performance improvement.

I’d like to call for a review on the docs and even code and we can further discuss in this thread. Thanks very much!

Thanks,
Carson


RE: [DISCUSS] Adaptive execution in Spark SQL

Posted by "Wang, Carson" <ca...@intel.com>.
Thanks Marco and Wenchen for reviewing. It sounds good to target this for 3.0.

I can also share more data on the benchmark. In the 100 TB TPC-DS benchmark we performed on a 100-node cluster, we saw 90% of the 103 queries had performance gain, and 46% of them are more than 1.1x faster. Individual query can have up to 3.8x performance gain(q8: 3.8x, q81:2.1x, q30: 2.1x, q51: 1.8x, q61: 1.6x, q60: 1.6x …).  In addition, 5 queries failed earlier can pass in adaptive execution mode successfully. The detailed report is also available here<https://software.intel.com/en-us/articles/spark-sql-adaptive-execution-at-100-tb>.

Thanks,
Carson

From: Marco Gaido [mailto:marcogaido91@gmail.com]
Sent: Tuesday, July 31, 2018 3:00 PM
To: Wenchen Fan <cl...@gmail.com>
Cc: xyliyuanjian@gmail.com; Wang, Carson <ca...@intel.com>; Spark dev list <de...@spark.apache.org>
Subject: Re: [DISCUSS] Adaptive execution in Spark SQL

Hi all,

I also like this idea very much and I think it may bring also other performance improvements in the future.

Thanks to everybody who worked on this.

I agree to target this feature for 3.0.

Thanks everybody,
Bests.
Marco

On Tue, 31 Jul 2018, 08:39 Wenchen Fan, <cl...@gmail.com>> wrote:
Hi Carson and Yuanjian,

Thanks for contributing to this project and sharing the production use cases! I believe the adaptive execution will be a very important feature of Spark SQL and will definitely benefit a lot of users.

I went through the design docs and the high-level design totally makes sense to me. Since the code freeze of Spark 2.4 is close, I'm afraid we may not have enough time to review the code and merge it, how about we target this feature to Spark 3.0?

Besides, it would be great if we can have some real benchmark numbers for it.

Thanks,
Wenchen

On Tue, Jul 31, 2018 at 2:26 PM Yuanjian Li <xy...@gmail.com>> wrote:
Thanks Carson, great note!
Actually Baidu has ported this patch in our internal folk. I collected some user cases and performance improve effect during Baidu internal usage of this patch, summarize as following 3 scenario:
1. SortMergeJoin to BroadcastJoin
The SortMergeJoin transform to BroadcastJoin over deeply tree node can bring us 50% to 200% boosting on query performance, and this strategy alway hit the BI scenario like join several tables with filter strategy in subquery
2. Long running application or use Spark as a service
In this case, long running application refers to the duration of application near 1 hour. Using Spark as a service refers to use spark-shell and keep submit sql or use the service of Spark like Zeppelin, Livy or our internal sql service Baidu BigSQL. In such scenario, all spark jobs share same partition number, so enable AE and add configs about expected task info including data size, row number, min\max partition number and etc, will bring us 50%-100% boosting on performance improvement.
3. GraphFrame jobs
The last scenario is the application use GraphFrame, in this case, user has a 2-dimension graph with 1 billion edges, use the connected componentsalgorithm in GraphFrame. With enabling AE, the duration of app reduce from 58min to 32min, almost 100% boosting on performance improvement.

The detailed screenshot and config in the JIRA SPARK-23128<https://issues.apache.org/jira/browse/SPARK-23128> attached pdf.

Thanks,
Yuanjian Li

Wang, Carson <ca...@intel.com>> 于2018年7月28日周六 上午12:49写道:
Dear all,

The initial support of adaptive execution[SPARK-9850<https://issues.apache.org/jira/browse/SPARK-9850>] in Spark SQL has been there since Spark 1.6, but there is no more update since then. One of the key features in adaptive execution is to determine the number of reducer automatically at runtime. This is a feature required by many Spark users especially the infrastructure team in many companies, as there are thousands of queries running on the cluster where the shuffle partition number may not be set properly for every query. The same shuffle partition number also doesn’t work well for all stages in a query because each stage has different input data size. Other features in adaptive execution include optimizing join strategy at runtime and handling skewed join automatically, which have not been implemented in Spark.

In the current implementation, an Exchange coordinator is used to determine the number of post-shuffle partitions for a stage. However, exchange coordinator is added when Exchange is being added, so it actually lacks a global picture of all shuffle dependencies of a post-shuffle stage.  I.e. for 3 tables’ join in a single stage, the same ExchangeCoordinator should be used in three Exchanges but currently two separated ExchangeCoordinator will be added. It also adds additional Exchanges in some cases. So I think it is time to rethink how to better support adaptive execution in Spark SQL. I have proposed a new approach in SPARK-23128<https://issues.apache.org/jira/browse/SPARK-23128>. A document about the idea is described at here<https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing>. The idea about how to changing a sort merge join to a broadcast hash join at runtime is also described in a separated doc<https://docs.google.com/document/d/1WCJ2BmA8_dJL_jmYie_x9ZCrz7r3ZjleJSoX0dlDXaw/edit?usp=sharing>.

The docs have been there for a while, and I also had an implementation based on Spark 2.3 available at https://github.com/Intel-bigdata/spark-adaptive. The code is split into 7 PRs labeled with AE2.3-0x if you look at the pull requests. I asked many partners to evaluate the patch including Baidu, Alibaba, JD.com, etc and received very good feedback. Baidu also shared their result at the Jira. We also finished a 100 TB TPC-DS benchmark earlier using the patch which passed all queries with good performance improvement.

I’d like to call for a review on the docs and even code and we can further discuss in this thread. Thanks very much!

Thanks,
Carson


Re: [DISCUSS] Adaptive execution in Spark SQL

Posted by Marco Gaido <ma...@gmail.com>.
Hi all,

I also like this idea very much and I think it may bring also other
performance improvements in the future.

Thanks to everybody who worked on this.

I agree to target this feature for 3.0.

Thanks everybody,
Bests.
Marco

On Tue, 31 Jul 2018, 08:39 Wenchen Fan, <cl...@gmail.com> wrote:

> Hi Carson and Yuanjian,
>
> Thanks for contributing to this project and sharing the production use
> cases! I believe the adaptive execution will be a very important feature of
> Spark SQL and will definitely benefit a lot of users.
>
> I went through the design docs and the high-level design totally makes
> sense to me. Since the code freeze of Spark 2.4 is close, I'm afraid we may
> not have enough time to review the code and merge it, how about we target
> this feature to Spark 3.0?
>
> Besides, it would be great if we can have some real benchmark numbers for
> it.
>
> Thanks,
> Wenchen
>
> On Tue, Jul 31, 2018 at 2:26 PM Yuanjian Li <xy...@gmail.com>
> wrote:
>
>> Thanks Carson, great note!
>> Actually Baidu has ported this patch in our internal folk. I collected
>> some user cases and performance improve effect during Baidu internal usage
>> of this patch, summarize as following 3 scenario:
>> 1. SortMergeJoin to BroadcastJoin
>> The SortMergeJoin transform to BroadcastJoin over deeply tree node can
>> bring us 50% to 200% boosting on query performance, and this strategy alway
>> hit the BI scenario like join several tables with filter strategy in
>> subquery
>> 2. Long running application or use Spark as a service
>> In this case, long running application refers to the duration of
>> application near 1 hour. Using Spark as a service refers to use spark-shell
>> and keep submit sql or use the service of Spark like Zeppelin, Livy or our
>> internal sql service Baidu BigSQL. In such scenario, all spark jobs share
>> same partition number, so enable AE and add configs about expected task
>> info including data size, row number, min\max partition number and etc,
>> will bring us 50%-100% boosting on performance improvement.
>> 3. GraphFrame jobs
>> The last scenario is the application use GraphFrame, in this case, user
>> has a 2-dimension graph with 1 billion edges, use the connected
>> componentsalgorithm in GraphFrame. With enabling AE, the duration of app
>> reduce from 58min to 32min, almost 100% boosting on performance improvement.
>>
>> The detailed screenshot and config in the JIRA SPARK-23128
>> <https://issues.apache.org/jira/browse/SPARK-23128> attached pdf.
>>
>> Thanks,
>> Yuanjian Li
>>
>> Wang, Carson <ca...@intel.com> 于2018年7月28日周六 上午12:49写道:
>>
>>> Dear all,
>>>
>>>
>>>
>>> The initial support of adaptive execution[SPARK-9850
>>> <https://issues.apache.org/jira/browse/SPARK-9850>] in Spark SQL has
>>> been there since Spark 1.6, but there is no more update since then. One of
>>> the key features in adaptive execution is to determine the number of
>>> reducer automatically at runtime. This is a feature required by many Spark
>>> users especially the infrastructure team in many companies, as there are
>>> thousands of queries running on the cluster where the shuffle partition
>>> number may not be set properly for every query. The same shuffle partition
>>> number also doesn’t work well for all stages in a query because each stage
>>> has different input data size. Other features in adaptive execution include
>>> optimizing join strategy at runtime and handling skewed join automatically,
>>> which have not been implemented in Spark.
>>>
>>>
>>>
>>> In the current implementation, an Exchange coordinator is used to
>>> determine the number of post-shuffle partitions for a stage. However,
>>> exchange coordinator is added when Exchange is being added, so it actually
>>> lacks a global picture of all shuffle dependencies of a post-shuffle
>>> stage.  I.e. for 3 tables’ join in a single stage, the same
>>> ExchangeCoordinator should be used in three Exchanges but currently two
>>> separated ExchangeCoordinator will be added. It also adds additional
>>> Exchanges in some cases. So I think it is time to rethink how to better
>>> support adaptive execution in Spark SQL. I have proposed a new approach in
>>> SPARK-23128 <https://issues.apache.org/jira/browse/SPARK-23128>. A
>>> document about the idea is described at here
>>> <https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing>.
>>> The idea about how to changing a sort merge join to a broadcast hash join
>>> at runtime is also described in a separated doc
>>> <https://docs.google.com/document/d/1WCJ2BmA8_dJL_jmYie_x9ZCrz7r3ZjleJSoX0dlDXaw/edit?usp=sharing>.
>>>
>>>
>>>
>>>
>>> The docs have been there for a while, and I also had an implementation
>>> based on Spark 2.3 available at
>>> https://github.com/Intel-bigdata/spark-adaptive. The code is split into
>>> 7 PRs labeled with AE2.3-0x if you look at the pull requests. I asked many
>>> partners to evaluate the patch including Baidu, Alibaba, JD.com, etc and
>>> received very good feedback. Baidu also shared their result at the Jira. We
>>> also finished a 100 TB TPC-DS benchmark earlier using the patch which
>>> passed all queries with good performance improvement.
>>>
>>>
>>>
>>> I’d like to call for a review on the docs and even code and we can
>>> further discuss in this thread. Thanks very much!
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Carson
>>>
>>>
>>>
>>

Re: [DISCUSS] Adaptive execution in Spark SQL

Posted by Wenchen Fan <cl...@gmail.com>.
Hi Carson and Yuanjian,

Thanks for contributing to this project and sharing the production use
cases! I believe the adaptive execution will be a very important feature of
Spark SQL and will definitely benefit a lot of users.

I went through the design docs and the high-level design totally makes
sense to me. Since the code freeze of Spark 2.4 is close, I'm afraid we may
not have enough time to review the code and merge it, how about we target
this feature to Spark 3.0?

Besides, it would be great if we can have some real benchmark numbers for
it.

Thanks,
Wenchen

On Tue, Jul 31, 2018 at 2:26 PM Yuanjian Li <xy...@gmail.com> wrote:

> Thanks Carson, great note!
> Actually Baidu has ported this patch in our internal folk. I collected
> some user cases and performance improve effect during Baidu internal usage
> of this patch, summarize as following 3 scenario:
> 1. SortMergeJoin to BroadcastJoin
> The SortMergeJoin transform to BroadcastJoin over deeply tree node can
> bring us 50% to 200% boosting on query performance, and this strategy alway
> hit the BI scenario like join several tables with filter strategy in
> subquery
> 2. Long running application or use Spark as a service
> In this case, long running application refers to the duration of
> application near 1 hour. Using Spark as a service refers to use spark-shell
> and keep submit sql or use the service of Spark like Zeppelin, Livy or our
> internal sql service Baidu BigSQL. In such scenario, all spark jobs share
> same partition number, so enable AE and add configs about expected task
> info including data size, row number, min\max partition number and etc,
> will bring us 50%-100% boosting on performance improvement.
> 3. GraphFrame jobs
> The last scenario is the application use GraphFrame, in this case, user
> has a 2-dimension graph with 1 billion edges, use the connected
> componentsalgorithm in GraphFrame. With enabling AE, the duration of app
> reduce from 58min to 32min, almost 100% boosting on performance improvement.
>
> The detailed screenshot and config in the JIRA SPARK-23128
> <https://issues.apache.org/jira/browse/SPARK-23128> attached pdf.
>
> Thanks,
> Yuanjian Li
>
> Wang, Carson <ca...@intel.com> 于2018年7月28日周六 上午12:49写道:
>
>> Dear all,
>>
>>
>>
>> The initial support of adaptive execution[SPARK-9850
>> <https://issues.apache.org/jira/browse/SPARK-9850>] in Spark SQL has
>> been there since Spark 1.6, but there is no more update since then. One of
>> the key features in adaptive execution is to determine the number of
>> reducer automatically at runtime. This is a feature required by many Spark
>> users especially the infrastructure team in many companies, as there are
>> thousands of queries running on the cluster where the shuffle partition
>> number may not be set properly for every query. The same shuffle partition
>> number also doesn’t work well for all stages in a query because each stage
>> has different input data size. Other features in adaptive execution include
>> optimizing join strategy at runtime and handling skewed join automatically,
>> which have not been implemented in Spark.
>>
>>
>>
>> In the current implementation, an Exchange coordinator is used to
>> determine the number of post-shuffle partitions for a stage. However,
>> exchange coordinator is added when Exchange is being added, so it actually
>> lacks a global picture of all shuffle dependencies of a post-shuffle
>> stage.  I.e. for 3 tables’ join in a single stage, the same
>> ExchangeCoordinator should be used in three Exchanges but currently two
>> separated ExchangeCoordinator will be added. It also adds additional
>> Exchanges in some cases. So I think it is time to rethink how to better
>> support adaptive execution in Spark SQL. I have proposed a new approach in
>> SPARK-23128 <https://issues.apache.org/jira/browse/SPARK-23128>. A
>> document about the idea is described at here
>> <https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing>.
>> The idea about how to changing a sort merge join to a broadcast hash join
>> at runtime is also described in a separated doc
>> <https://docs.google.com/document/d/1WCJ2BmA8_dJL_jmYie_x9ZCrz7r3ZjleJSoX0dlDXaw/edit?usp=sharing>.
>>
>>
>>
>>
>> The docs have been there for a while, and I also had an implementation
>> based on Spark 2.3 available at
>> https://github.com/Intel-bigdata/spark-adaptive. The code is split into
>> 7 PRs labeled with AE2.3-0x if you look at the pull requests. I asked many
>> partners to evaluate the patch including Baidu, Alibaba, JD.com, etc and
>> received very good feedback. Baidu also shared their result at the Jira. We
>> also finished a 100 TB TPC-DS benchmark earlier using the patch which
>> passed all queries with good performance improvement.
>>
>>
>>
>> I’d like to call for a review on the docs and even code and we can
>> further discuss in this thread. Thanks very much!
>>
>>
>>
>> Thanks,
>>
>> Carson
>>
>>
>>
>

Re: [DISCUSS] Adaptive execution in Spark SQL

Posted by Yuanjian Li <xy...@gmail.com>.
Thanks Carson, great note!
Actually Baidu has ported this patch in our internal folk. I collected some
user cases and performance improve effect during Baidu internal usage of
this patch, summarize as following 3 scenario:
1. SortMergeJoin to BroadcastJoin
The SortMergeJoin transform to BroadcastJoin over deeply tree node can
bring us 50% to 200% boosting on query performance, and this strategy alway
hit the BI scenario like join several tables with filter strategy in
subquery
2. Long running application or use Spark as a service
In this case, long running application refers to the duration of
application near 1 hour. Using Spark as a service refers to use spark-shell
and keep submit sql or use the service of Spark like Zeppelin, Livy or our
internal sql service Baidu BigSQL. In such scenario, all spark jobs share
same partition number, so enable AE and add configs about expected task
info including data size, row number, min\max partition number and etc,
will bring us 50%-100% boosting on performance improvement.
3. GraphFrame jobs
The last scenario is the application use GraphFrame, in this case, user has
a 2-dimension graph with 1 billion edges, use the connected
componentsalgorithm in GraphFrame. With enabling AE, the duration of app
reduce from 58min to 32min, almost 100% boosting on performance improvement.

The detailed screenshot and config in the JIRA SPARK-23128
<https://issues.apache.org/jira/browse/SPARK-23128> attached pdf.

Thanks,
Yuanjian Li

Wang, Carson <ca...@intel.com> 于2018年7月28日周六 上午12:49写道:

> Dear all,
>
>
>
> The initial support of adaptive execution[SPARK-9850
> <https://issues.apache.org/jira/browse/SPARK-9850>] in Spark SQL has been
> there since Spark 1.6, but there is no more update since then. One of the
> key features in adaptive execution is to determine the number of reducer
> automatically at runtime. This is a feature required by many Spark users
> especially the infrastructure team in many companies, as there are
> thousands of queries running on the cluster where the shuffle partition
> number may not be set properly for every query. The same shuffle partition
> number also doesn’t work well for all stages in a query because each stage
> has different input data size. Other features in adaptive execution include
> optimizing join strategy at runtime and handling skewed join automatically,
> which have not been implemented in Spark.
>
>
>
> In the current implementation, an Exchange coordinator is used to
> determine the number of post-shuffle partitions for a stage. However,
> exchange coordinator is added when Exchange is being added, so it actually
> lacks a global picture of all shuffle dependencies of a post-shuffle
> stage.  I.e. for 3 tables’ join in a single stage, the same
> ExchangeCoordinator should be used in three Exchanges but currently two
> separated ExchangeCoordinator will be added. It also adds additional
> Exchanges in some cases. So I think it is time to rethink how to better
> support adaptive execution in Spark SQL. I have proposed a new approach in
> SPARK-23128 <https://issues.apache.org/jira/browse/SPARK-23128>. A
> document about the idea is described at here
> <https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing>.
> The idea about how to changing a sort merge join to a broadcast hash join
> at runtime is also described in a separated doc
> <https://docs.google.com/document/d/1WCJ2BmA8_dJL_jmYie_x9ZCrz7r3ZjleJSoX0dlDXaw/edit?usp=sharing>.
>
>
>
>
> The docs have been there for a while, and I also had an implementation
> based on Spark 2.3 available at
> https://github.com/Intel-bigdata/spark-adaptive. The code is split into 7
> PRs labeled with AE2.3-0x if you look at the pull requests. I asked many
> partners to evaluate the patch including Baidu, Alibaba, JD.com, etc and
> received very good feedback. Baidu also shared their result at the Jira. We
> also finished a 100 TB TPC-DS benchmark earlier using the patch which
> passed all queries with good performance improvement.
>
>
>
> I’d like to call for a review on the docs and even code and we can further
> discuss in this thread. Thanks very much!
>
>
>
> Thanks,
>
> Carson
>
>
>