You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "wang-zhun (Jira)" <ji...@apache.org> on 2021/12/09 09:45:00 UTC
[jira] [Updated] (SPARK-37595) DatasourceV2 `exists ... select *` column push down
[ https://issues.apache.org/jira/browse/SPARK-37595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
wang-zhun updated SPARK-37595:
------------------------------
Description:
datasourcev2表在执行TPCDS时很慢,因为`exists ... select *`不会将裁剪后的列下推到数据源
在 `org.apache.spark.sql.connector.DataSourceV2SQLSuite` 中添加测试
{code:java}
{code}
测试(“datasourcev2 存在”){
{code:java}
{code}
val t1 = s"${catalogAndNamespace}t1"
{code:java}
{code}
withTable(t1) {
{code:java}
{code}
sql(s"CREATE TABLE $t1 (col1 string, col2 string) USING $v2Format")
{code:java}
{code}
val t2 = s"${catalogAndNamespace}t2"
{code:java}
{code}
withTable(t2) {
{code:java}
{code}
sql(s"CREATE TABLE $t2 (col1 string, col2 string) USING $v2Format")
{code:java}
{code}
val query = sql(s"select * from $t1 where not exist" +
{code:java}
{code}
s"(select * from $t2 where t1.col1=t2.col1)").queryExecution
{code:java}
{code}
// scalastyle:off println
{code:java}
{code}
println(查询。执行计划)
{code:java}
{code}
}
{code:java}
{code}
}
{code:java}
{code}
}AdaptiveSparkPlan isFinalPlan=false
{code:java}
{code}
+- BroadcastHashJoin [col1#17], [col1#19], LeftSemi, BuildRight, false
{code:java}
{code}
:- 项目 [col1#17, col2#18]
{code:java}
{code}
: +- BatchScan[col1#17, col2#18] 类 org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []
{code:java}
{code}
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#28]
{code:java}
{code}
+- 项目 [col1#19]
{code:java}
{code}
+- BatchScan[col1#19, col2#20] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []
{code:java}
{code}
期望是 `BatchScan[col1#19] class org.apache.spark.sql。 connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []`
原因`Batch("Early Filter and Projection Push-Down" V2ScanRelationPushDown`在`Batch("RewriteSubquery"`之前执行,并行datasourceV2不支持`FileSourceStrategy`
was:
The datasourcev2 table is very slow when executing TPCDS, because `exists ... select *` will not push down the cropped columns to the data source
Add test in `org.apache.spark.sql.connector.DataSourceV2SQLSuite`
```
test("datasourcev2 exists") {
val t1 = s"${catalogAndNamespace}t1"
withTable(t1) {
sql(s"CREATE TABLE $t1 (col1 string, col2 string) USING $v2Format")
val t2 = s"${catalogAndNamespace}t2"
withTable(t2) {
sql(s"CREATE TABLE $t2 (col1 string, col2 string) USING $v2Format")
val query = sql(s"select * from $t1 where not exists" +
s"(select * from $t2 where t1.col1=t2.col1)").queryExecution
// scalastyle:off println
println(query.executedPlan)
}
}
}
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [col1#17], [col1#19], LeftSemi, BuildRight, false
:- Project [col1#17, col2#18]
: +- BatchScan[col1#17, col2#18] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#28]
+- Project [col1#19]
+- BatchScan[col1#19, col2#20] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []
Expectation is `BatchScan[col1#19] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []`
```
Reason `Batch("Early Filter and Projection Push-Down" V2ScanRelationPushDown` is executed before `Batch("RewriteSubquery"`, parallel datasourceV2 does not support `FileSourceStrategy`
> DatasourceV2 `exists ... select *` column push down
> ---------------------------------------------------
>
> Key: SPARK-37595
> URL: https://issues.apache.org/jira/browse/SPARK-37595
> Project: Spark
> Issue Type: Wish
> Components: SQL
> Affects Versions: 3.1.2, 3.2.0
> Reporter: wang-zhun
> Priority: Major
>
> datasourcev2表在执行TPCDS时很慢,因为`exists ... select *`不会将裁剪后的列下推到数据源
> 在 `org.apache.spark.sql.connector.DataSourceV2SQLSuite` 中添加测试
> {code:java}
> {code}
> 测试(“datasourcev2 存在”){
> {code:java}
> {code}
> val t1 = s"${catalogAndNamespace}t1"
> {code:java}
> {code}
> withTable(t1) {
> {code:java}
> {code}
> sql(s"CREATE TABLE $t1 (col1 string, col2 string) USING $v2Format")
> {code:java}
> {code}
> val t2 = s"${catalogAndNamespace}t2"
> {code:java}
> {code}
> withTable(t2) {
> {code:java}
> {code}
> sql(s"CREATE TABLE $t2 (col1 string, col2 string) USING $v2Format")
> {code:java}
> {code}
> val query = sql(s"select * from $t1 where not exist" +
> {code:java}
> {code}
> s"(select * from $t2 where t1.col1=t2.col1)").queryExecution
> {code:java}
> {code}
> // scalastyle:off println
> {code:java}
> {code}
> println(查询。执行计划)
> {code:java}
> {code}
> }
> {code:java}
> {code}
> }
> {code:java}
> {code}
> }AdaptiveSparkPlan isFinalPlan=false
> {code:java}
> {code}
> +- BroadcastHashJoin [col1#17], [col1#19], LeftSemi, BuildRight, false
> {code:java}
> {code}
> :- 项目 [col1#17, col2#18]
> {code:java}
> {code}
> : +- BatchScan[col1#17, col2#18] 类 org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []
> {code:java}
> {code}
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#28]
> {code:java}
> {code}
> +- 项目 [col1#19]
> {code:java}
> {code}
> +- BatchScan[col1#19, col2#20] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []
> {code:java}
> {code}
> 期望是 `BatchScan[col1#19] class org.apache.spark.sql。 connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []`
> 原因`Batch("Early Filter and Projection Push-Down" V2ScanRelationPushDown`在`Batch("RewriteSubquery"`之前执行,并行datasourceV2不支持`FileSourceStrategy`
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org