You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Peter Toth (Jira)" <ji...@apache.org> on 2019/10/07 17:35:00 UTC

[jira] [Created] (SPARK-29375) Exchange reuse across all subquery levels

Peter Toth created SPARK-29375:
----------------------------------

             Summary: Exchange reuse across all subquery levels
                 Key: SPARK-29375
                 URL: https://issues.apache.org/jira/browse/SPARK-29375
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 3.0.0
            Reporter: Peter Toth


Currently exchange reuse doesn't work across all subquery levels.
 Here is an example query:
{noformat}
SELECT
 (SELECT max(a.key) FROM testData AS a JOIN testData AS b ON b.key = a.key),
 a.key
FROM testData AS a
JOIN testData AS b ON b.key = a.key{noformat}
where the plan is:
{noformat}
*(5) Project [Subquery scalar-subquery#240, [id=#193] AS scalarsubquery()#247, key#13]
:  +- Subquery scalar-subquery#240, [id=#193]
:     +- *(6) HashAggregate(keys=[], functions=[max(key#13)], output=[max(key)#246])
:        +- Exchange SinglePartition, true, [id=#189]
:           +- *(5) HashAggregate(keys=[], functions=[partial_max(key#13)], output=[max#251])
:              +- *(5) Project [key#13]
:                 +- *(5) SortMergeJoin [key#13], [key#243], Inner
:                    :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0
:                    :  +- Exchange hashpartitioning(key#13, 5), true, [id=#145]
:                    :     +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13]
:                    :        +- Scan[obj#12]
:                    +- *(4) Sort [key#243 ASC NULLS FIRST], false, 0
:                       +- ReusedExchange [key#243], Exchange hashpartitioning(key#13, 5), true, [id=#145]
+- *(5) SortMergeJoin [key#13], [key#241], Inner
   :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(key#13, 5), true, [id=#205]
   :     +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13]
   :        +- Scan[obj#12]
   +- *(4) Sort [key#241 ASC NULLS FIRST], false, 0
      +- ReusedExchange [key#241], Exchange hashpartitioning(key#13, 5), true, [id=#205]
{noformat}
but it could be improved as here:
{noformat}
*(5) Project [Subquery scalar-subquery#240, [id=#211] AS scalarsubquery()#247, key#13]
:  +- Subquery scalar-subquery#240, [id=#211]
:     +- *(6) HashAggregate(keys=[], functions=[max(key#13)], output=[max(key)#246])
:        +- Exchange SinglePartition, true, [id=#207]
:           +- *(5) HashAggregate(keys=[], functions=[partial_max(key#13)], output=[max#251])
:              +- *(5) Project [key#13]
:                 +- *(5) SortMergeJoin [key#13], [key#243], Inner
:                    :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0
:                    :  +- Exchange hashpartitioning(key#13, 5), true, [id=#145]
:                    :     +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13]
:                    :        +- Scan[obj#12]
:                    +- *(4) Sort [key#243 ASC NULLS FIRST], false, 0
:                       +- ReusedExchange [key#243], Exchange hashpartitioning(key#13, 5), true, [id=#145]
+- *(5) SortMergeJoin [key#13], [key#241], Inner
   :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0
   :  +- ReusedExchange [key#13], Exchange hashpartitioning(key#13, 5), true, [id=#145]
   +- *(4) Sort [key#241 ASC NULLS FIRST], false, 0
      +- ReusedExchange [key#241], Exchange hashpartitioning(key#13, 5), true, [id=#145]
{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org