You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2019/01/30 08:04:00 UTC

[jira] [Assigned] (SPARK-26781) Additional exchange gets added

     [ https://issues.apache.org/jira/browse/SPARK-26781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-26781:
------------------------------------

    Assignee:     (was: Apache Spark)

> Additional exchange gets added 
> -------------------------------
>
>                 Key: SPARK-26781
>                 URL: https://issues.apache.org/jira/browse/SPARK-26781
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: Karuppayya
>            Priority: Major
>
> Consider three tables: a(id int), b(id int), c(id, int)
> query:  
> {code:java}
> select * from (select a.id as newid from a join b where a.id = b.id) temp join c on temp.newid = c.id
> {code}
> Plan(physical plan: org.apache.spark.sql.execution.QueryExecution#executedPlan):
>  
> {noformat}
> *(9) SortMergeJoin [newid#1L], [id#6L], Inner
> :- *(6) Sort [newid#1L ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(newid#1L, 200)
> : +- *(5) Project [id#2L AS newid#1L, name#3]
> : +- *(5) SortMergeJoin [id#2L], [id#4L], Inner
> : :- *(2) Sort [id#2L ASC NULLS FIRST], false, 0
> : : +- Exchange hashpartitioning(id#2L, 200)
> : : +- *(1) Project [id#2L, name#3]
> : : +- *(1) Filter isnotnull(id#2L)
> : : +- *(1) FileScan parquet a[id#2L,name#3] Batched: true, DataFilters: [isnotnull(id#2L)], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/spark/a], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string>
> : +- *(4) Sort [id#4L ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(id#4L, 200)
> : +- *(3) Project [id#4L]
> : +- *(3) Filter isnotnull(id#4L)
> : +- *(3) FileScan parquet b[id#4L] Batched: true, DataFilters: [isnotnull(id#4L)], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/spark/b], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
> +- *(8) Sort [id#6L ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(id#6L, 200)
> +- *(7) Project [id#6L, name#7]
> +- *(7) Filter isnotnull(id#6L)
> +- *(7) FileScan parquet \c[id#6L,name#7] Batched: true, DataFilters: [isnotnull(id#6L)], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/spark/c], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string>{noformat}
>  
>  The exchange operator below stage 6 is not required since the data from project is already partitioned based on id.
> An exchange gets added since the outputPartitioning of Project(5) is HashPartitioning on id#2L whereas the requiredPartitioning of Sort(Stage 6) is HashPartitioning on newid#1L which is nothing but alias of id#2L.
> The exchange operator is not required in this case if we are able to compare the attribute id#2L referenced by alias newid#1L 0
> This issue happens in TPC-DS benchmark query - query#2  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org