You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "wang-zhun (Jira)" <ji...@apache.org> on 2021/12/09 09:45:00 UTC

[jira] [Updated] (SPARK-37595) DatasourceV2 `exists ... select *` column push down

     [ https://issues.apache.org/jira/browse/SPARK-37595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

wang-zhun updated SPARK-37595:
------------------------------
    Description: 
datasourcev2表在执行TPCDS时很慢,因为`exists ... select *`不会将裁剪后的列下推到数据源

在 `org.apache.spark.sql.connector.DataSourceV2SQLSuite` 中添加测试
{code:java}

{code}
测试(“datasourcev2 存在”){
{code:java}

{code}
    val t1 = s"${catalogAndNamespace}t1"
{code:java}

{code}
    withTable(t1) {
{code:java}

{code}
      sql(s"CREATE TABLE $t1 (col1 string, col2 string) USING $v2Format")
{code:java}

{code}
      val t2 = s"${catalogAndNamespace}t2"
{code:java}

{code}
      withTable(t2) {
{code:java}

{code}
        sql(s"CREATE TABLE $t2 (col1 string, col2 string) USING $v2Format")
{code:java}

{code}
        val query = sql(s"select * from $t1 where not exist" +
{code:java}

{code}
            s"(select * from $t2 where t1.col1=t2.col1)").queryExecution
{code:java}

{code}
        // scalastyle:off println
{code:java}

{code}
        println(查询。执行计划)
{code:java}

{code}
      }
{code:java}

{code}
    }
{code:java}

{code}
  }AdaptiveSparkPlan isFinalPlan=false
{code:java}

{code}
+- BroadcastHashJoin [col1#17], [col1#19], LeftSemi, BuildRight, false
{code:java}

{code}
   :- 项目 [col1#17, col2#18]
{code:java}

{code}
   : +- BatchScan[col1#17, col2#18] 类 org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []
{code:java}

{code}
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#28]
{code:java}

{code}
      +- 项目 [col1#19]
{code:java}

{code}
         +- BatchScan[col1#19, col2#20] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []
{code:java}



{code}
期望是 `BatchScan[col1#19] class org.apache.spark.sql。 connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []`

原因`Batch("Early Filter and Projection Push-Down" V2ScanRelationPushDown`在`Batch("RewriteSubquery"`之前执行,并行datasourceV2不支持`FileSourceStrategy`

  was:
The datasourcev2 table is very slow when executing TPCDS, because `exists ... select *` will not push down the cropped columns to the data source

Add test in `org.apache.spark.sql.connector.DataSourceV2SQLSuite`
```
test("datasourcev2 exists") {
    val t1 = s"${catalogAndNamespace}t1"
    withTable(t1) {
      sql(s"CREATE TABLE $t1 (col1 string, col2 string) USING $v2Format")
      val t2 = s"${catalogAndNamespace}t2"
      withTable(t2) {
        sql(s"CREATE TABLE $t2 (col1 string, col2 string) USING $v2Format")
        val query = sql(s"select * from $t1 where not exists" +
            s"(select * from $t2 where t1.col1=t2.col1)").queryExecution
        // scalastyle:off println
        println(query.executedPlan)
      }
    }
  }

AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [col1#17], [col1#19], LeftSemi, BuildRight, false
   :- Project [col1#17, col2#18]
   :  +- BatchScan[col1#17, col2#18] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#28]
      +- Project [col1#19]
         +- BatchScan[col1#19, col2#20] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []

Expectation is `BatchScan[col1#19] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []`
```

Reason `Batch("Early Filter and Projection Push-Down" V2ScanRelationPushDown` is executed before `Batch("RewriteSubquery"`, parallel datasourceV2 does not support `FileSourceStrategy`


> DatasourceV2 `exists ... select *` column push down
> ---------------------------------------------------
>
>                 Key: SPARK-37595
>                 URL: https://issues.apache.org/jira/browse/SPARK-37595
>             Project: Spark
>          Issue Type: Wish
>          Components: SQL
>    Affects Versions: 3.1.2, 3.2.0
>            Reporter: wang-zhun
>            Priority: Major
>
> datasourcev2表在执行TPCDS时很慢,因为`exists ... select *`不会将裁剪后的列下推到数据源
> 在 `org.apache.spark.sql.connector.DataSourceV2SQLSuite` 中添加测试
> {code:java}
> {code}
> 测试(“datasourcev2 存在”){
> {code:java}
> {code}
>     val t1 = s"${catalogAndNamespace}t1"
> {code:java}
> {code}
>     withTable(t1) {
> {code:java}
> {code}
>       sql(s"CREATE TABLE $t1 (col1 string, col2 string) USING $v2Format")
> {code:java}
> {code}
>       val t2 = s"${catalogAndNamespace}t2"
> {code:java}
> {code}
>       withTable(t2) {
> {code:java}
> {code}
>         sql(s"CREATE TABLE $t2 (col1 string, col2 string) USING $v2Format")
> {code:java}
> {code}
>         val query = sql(s"select * from $t1 where not exist" +
> {code:java}
> {code}
>             s"(select * from $t2 where t1.col1=t2.col1)").queryExecution
> {code:java}
> {code}
>         // scalastyle:off println
> {code:java}
> {code}
>         println(查询。执行计划)
> {code:java}
> {code}
>       }
> {code:java}
> {code}
>     }
> {code:java}
> {code}
>   }AdaptiveSparkPlan isFinalPlan=false
> {code:java}
> {code}
> +- BroadcastHashJoin [col1#17], [col1#19], LeftSemi, BuildRight, false
> {code:java}
> {code}
>    :- 项目 [col1#17, col2#18]
> {code:java}
> {code}
>    : +- BatchScan[col1#17, col2#18] 类 org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []
> {code:java}
> {code}
>    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#28]
> {code:java}
> {code}
>       +- 项目 [col1#19]
> {code:java}
> {code}
>          +- BatchScan[col1#19, col2#20] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []
> {code:java}
> {code}
> 期望是 `BatchScan[col1#19] class org.apache.spark.sql。 connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []`
> 原因`Batch("Early Filter and Projection Push-Down" V2ScanRelationPushDown`在`Batch("RewriteSubquery"`之前执行,并行datasourceV2不支持`FileSourceStrategy`



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org