You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Abdeali Kothari <ab...@gmail.com> on 2022/01/05 16:10:43 UTC

Re: Spark 3.2 - ReusedExchange not present in join execution plan

Just thought I'd do a quick bump and add the dev mailing list - in case
there is some insight there
Feels like this should be categorized as a bug for spark 3.2.0

On Wed, Dec 29, 2021 at 5:25 PM Abdeali Kothari <ab...@gmail.com>
wrote:

> Hi,
> I am using pyspark for some projects. And one of the things we are doing
> is trying to find the tables/columns being used by Spark using the
> execution plan.
>
> When we upgrade to spark 3.2 - the spark plan seems to be different from
> previous versions - mainly when we are doing joins.
> Below is a reproducible example (you could run the same in versions 2.3 to
> 3.1 to see the difference)
>
> My original data frames have the columns: id#0 and id#4
> But after doing the joins we are seeing new columns id#34 and id#19 which
> are not created from the original dataframes I was working with.
> In previous versions of spark, this used to use a ReusedExchange step
> (shown below)
>
> I was trying to understand if this is expected in spark 3.2 where the
> execution plan seems to be creating a new data source which does not
> originate from df1 and df2 which I provided.
> NOTE: The same happens even if I read from parquet files
>
> In spark 3.2:
> In [1]: import pyspark
>    ...: spark = pyspark.sql.SparkSession.builder.getOrCreate()
>
> In [2]: df1 = spark.createDataFrame([[1, 10], [2, 20]], ['id', 'col1'])
>    ...: df2 = spark.createDataFrame([[1, 11], [2, 22], [2, 222]], ['id',
> 'col2'])
>    ...: df1.explain()
>    ...: df2.explain()
> == Physical Plan ==
> *(1) Scan ExistingRDD[id#0L,col1#1L]
>
> == Physical Plan ==
> *(1) Scan ExistingRDD[id#4L,col2#5L]
>
> In [3]: df3 = df1.join(df2, df1['id'] == df2['id']).drop(df2['id'])
>    ...: df4 = df2.join(df3, df1['id'] == df2['id'])
>    ...: df4.explain()
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- SortMergeJoin [id#4L], [id#0L], Inner
>    :- Sort [id#4L ASC NULLS FIRST], false, 0
>    :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
> [id=#53]
>    :     +- Filter isnotnull(id#4L)
>    :        +- Scan ExistingRDD[id#4L,col2#5L]
>    +- Project [id#0L, col1#1L, col2#20L]
>       +- SortMergeJoin [id#0L], [id#19L], Inner
>          :- Sort [id#0L ASC NULLS FIRST], false, 0
>          :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
> [id=#45]
>          :     +- Filter isnotnull(id#0L)
>          :        +- Scan ExistingRDD[id#0L,col1#1L]
>
>
>
> *         +- Sort [id#19L ASC NULLS FIRST], false, 0            +-
> Exchange hashpartitioning(id#19L, 200), ENSURE_REQUIREMENTS, [id=#46]
>          +- Filter isnotnull(id#19L)                  +- Scan
> ExistingRDD[id#19L,col2#20L]*
>
> In [4]: df1.createOrReplaceTempView('df1')
>    ...: df2.createOrReplaceTempView('df2')
>    ...: df3 = spark.sql("""
>    ...:     SELECT df1.id, df1.col1, df2.col2
>    ...:     FROM df1 JOIN df2 ON df1.id = df2.id
>    ...: """)
>    ...: df3.createOrReplaceTempView('df3')
>    ...: df4 = spark.sql("""
>    ...:     SELECT df2.*, df3.*
>    ...:     FROM df2 JOIN df3 ON df2.id = df3.id
>    ...: """)
>    ...: df4.explain()
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- SortMergeJoin [id#4L], [id#0L], Inner
>    :- Sort [id#4L ASC NULLS FIRST], false, 0
>    :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
> [id=#110]
>    :     +- Filter isnotnull(id#4L)
>    :        +- Scan ExistingRDD[id#4L,col2#5L]
>    +- Project [id#0L, col1#1L, col2#35L]
>       +- SortMergeJoin [id#0L], [id#34L], Inner
>          :- Sort [id#0L ASC NULLS FIRST], false, 0
>          :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
> [id=#102]
>          :     +- Filter isnotnull(id#0L)
>          :        +- Scan ExistingRDD[id#0L,col1#1L]
>
>
>
> *         +- Sort [id#34L ASC NULLS FIRST], false, 0            +-
> Exchange hashpartitioning(id#34L, 200), ENSURE_REQUIREMENTS, [id=#103]
>          +- Filter isnotnull(id#34L)                  +- Scan
> ExistingRDD[id#34L,col2#35L]*
>
>
> Doing this in spark 3.1.1 - the plan is:
>
> *(8) SortMergeJoin [id#4L], [id#0L], Inner
> :- *(2) Sort [id#4L ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS, [id=#56]
> :     +- *(1) Filter isnotnull(id#4L)
> :        +- *(1) Scan ExistingRDD[id#4L,col2#5L]
> +- *(7) Project [id#0L, col1#1L, col2#20L]
>    +- *(7) SortMergeJoin [id#0L], [id#19L], Inner
>       :- *(4) Sort [id#0L ASC NULLS FIRST], false, 0
>       :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
> [id=#62]
>       :     +- *(3) Filter isnotnull(id#0L)
>       :        +- *(3) Scan ExistingRDD[id#0L,col1#1L]
>
> *      +- *(6) Sort [id#19L ASC NULLS FIRST], false, 0         +-
> ReusedExchange [id#19L, col2#20L], Exchange hashpartitioning(id#4L, 200),
> ENSURE_REQUIREMENTS, [id=#56]*
>
>

Re: Spark 3.2 - ReusedExchange not present in join execution plan

Posted by Abdeali Kothari <ab...@gmail.com>.
Thanks a lot for the reply Albert.

On looking at it and reading about it further - I do see that
"AdaptiveSparkPlan isFinalPlan=false" is mentioned.

Could you point me to how I can see the final plan ? I couldn't find that
in any of the resources I was referring to

On Fri, 7 Jan 2022, 07:25 Albert, <zi...@gmail.com> wrote:

> I happen to encounter something similar.
>
> it's probably because you are just `explain` it. when you actually `run`
> it. you will get the final spark plan in which case the exchange will be
> reused.
> right, this is different compared with 3.1 probably because the upgraded
> aqe.
>
> not sure whether this is expected though.
>
> On Thu, Jan 6, 2022 at 12:11 AM Abdeali Kothari <ab...@gmail.com>
> wrote:
>
>> Just thought I'd do a quick bump and add the dev mailing list - in case
>> there is some insight there
>> Feels like this should be categorized as a bug for spark 3.2.0
>>
>> On Wed, Dec 29, 2021 at 5:25 PM Abdeali Kothari <ab...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I am using pyspark for some projects. And one of the things we are doing
>>> is trying to find the tables/columns being used by Spark using the
>>> execution plan.
>>>
>>> When we upgrade to spark 3.2 - the spark plan seems to be different from
>>> previous versions - mainly when we are doing joins.
>>> Below is a reproducible example (you could run the same in versions 2.3
>>> to 3.1 to see the difference)
>>>
>>> My original data frames have the columns: id#0 and id#4
>>> But after doing the joins we are seeing new columns id#34 and id#19
>>> which are not created from the original dataframes I was working with.
>>> In previous versions of spark, this used to use a ReusedExchange step
>>> (shown below)
>>>
>>> I was trying to understand if this is expected in spark 3.2 where the
>>> execution plan seems to be creating a new data source which does not
>>> originate from df1 and df2 which I provided.
>>> NOTE: The same happens even if I read from parquet files
>>>
>>> In spark 3.2:
>>> In [1]: import pyspark
>>>    ...: spark = pyspark.sql.SparkSession.builder.getOrCreate()
>>>
>>> In [2]: df1 = spark.createDataFrame([[1, 10], [2, 20]], ['id', 'col1'])
>>>    ...: df2 = spark.createDataFrame([[1, 11], [2, 22], [2, 222]], ['id',
>>> 'col2'])
>>>    ...: df1.explain()
>>>    ...: df2.explain()
>>> == Physical Plan ==
>>> *(1) Scan ExistingRDD[id#0L,col1#1L]
>>>
>>> == Physical Plan ==
>>> *(1) Scan ExistingRDD[id#4L,col2#5L]
>>>
>>> In [3]: df3 = df1.join(df2, df1['id'] == df2['id']).drop(df2['id'])
>>>    ...: df4 = df2.join(df3, df1['id'] == df2['id'])
>>>    ...: df4.explain()
>>> == Physical Plan ==
>>> AdaptiveSparkPlan isFinalPlan=false
>>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>>    :- Sort [id#4L ASC NULLS FIRST], false, 0
>>>    :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>>> [id=#53]
>>>    :     +- Filter isnotnull(id#4L)
>>>    :        +- Scan ExistingRDD[id#4L,col2#5L]
>>>    +- Project [id#0L, col1#1L, col2#20L]
>>>       +- SortMergeJoin [id#0L], [id#19L], Inner
>>>          :- Sort [id#0L ASC NULLS FIRST], false, 0
>>>          :  +- Exchange hashpartitioning(id#0L, 200),
>>> ENSURE_REQUIREMENTS, [id=#45]
>>>          :     +- Filter isnotnull(id#0L)
>>>          :        +- Scan ExistingRDD[id#0L,col1#1L]
>>>
>>>
>>>
>>> *         +- Sort [id#19L ASC NULLS FIRST], false, 0            +-
>>> Exchange hashpartitioning(id#19L, 200), ENSURE_REQUIREMENTS, [id=#46]
>>>          +- Filter isnotnull(id#19L)                  +- Scan
>>> ExistingRDD[id#19L,col2#20L]*
>>>
>>> In [4]: df1.createOrReplaceTempView('df1')
>>>    ...: df2.createOrReplaceTempView('df2')
>>>    ...: df3 = spark.sql("""
>>>    ...:     SELECT df1.id, df1.col1, df2.col2
>>>    ...:     FROM df1 JOIN df2 ON df1.id = df2.id
>>>    ...: """)
>>>    ...: df3.createOrReplaceTempView('df3')
>>>    ...: df4 = spark.sql("""
>>>    ...:     SELECT df2.*, df3.*
>>>    ...:     FROM df2 JOIN df3 ON df2.id = df3.id
>>>    ...: """)
>>>    ...: df4.explain()
>>> == Physical Plan ==
>>> AdaptiveSparkPlan isFinalPlan=false
>>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>>    :- Sort [id#4L ASC NULLS FIRST], false, 0
>>>    :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>>> [id=#110]
>>>    :     +- Filter isnotnull(id#4L)
>>>    :        +- Scan ExistingRDD[id#4L,col2#5L]
>>>    +- Project [id#0L, col1#1L, col2#35L]
>>>       +- SortMergeJoin [id#0L], [id#34L], Inner
>>>          :- Sort [id#0L ASC NULLS FIRST], false, 0
>>>          :  +- Exchange hashpartitioning(id#0L, 200),
>>> ENSURE_REQUIREMENTS, [id=#102]
>>>          :     +- Filter isnotnull(id#0L)
>>>          :        +- Scan ExistingRDD[id#0L,col1#1L]
>>>
>>>
>>>
>>> *         +- Sort [id#34L ASC NULLS FIRST], false, 0            +-
>>> Exchange hashpartitioning(id#34L, 200), ENSURE_REQUIREMENTS, [id=#103]
>>>          +- Filter isnotnull(id#34L)                  +- Scan
>>> ExistingRDD[id#34L,col2#35L]*
>>>
>>>
>>> Doing this in spark 3.1.1 - the plan is:
>>>
>>> *(8) SortMergeJoin [id#4L], [id#0L], Inner
>>> :- *(2) Sort [id#4L ASC NULLS FIRST], false, 0
>>> :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>>> [id=#56]
>>> :     +- *(1) Filter isnotnull(id#4L)
>>> :        +- *(1) Scan ExistingRDD[id#4L,col2#5L]
>>> +- *(7) Project [id#0L, col1#1L, col2#20L]
>>>    +- *(7) SortMergeJoin [id#0L], [id#19L], Inner
>>>       :- *(4) Sort [id#0L ASC NULLS FIRST], false, 0
>>>       :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
>>> [id=#62]
>>>       :     +- *(3) Filter isnotnull(id#0L)
>>>       :        +- *(3) Scan ExistingRDD[id#0L,col1#1L]
>>>
>>> *      +- *(6) Sort [id#19L ASC NULLS FIRST], false, 0         +-
>>> ReusedExchange [id#19L, col2#20L], Exchange hashpartitioning(id#4L, 200),
>>> ENSURE_REQUIREMENTS, [id=#56]*
>>>
>>>
>
> --
> ~~~~~~~~~~~~~~~
> no mistakes
> ~~~~~~~~~~~~~~~~~~
>

Re: Spark 3.2 - ReusedExchange not present in join execution plan

Posted by Abdeali Kothari <ab...@gmail.com>.
Thanks a lot for the reply Albert.

On looking at it and reading about it further - I do see that
"AdaptiveSparkPlan isFinalPlan=false" is mentioned.

Could you point me to how I can see the final plan ? I couldn't find that
in any of the resources I was referring to

On Fri, 7 Jan 2022, 07:25 Albert, <zi...@gmail.com> wrote:

> I happen to encounter something similar.
>
> it's probably because you are just `explain` it. when you actually `run`
> it. you will get the final spark plan in which case the exchange will be
> reused.
> right, this is different compared with 3.1 probably because the upgraded
> aqe.
>
> not sure whether this is expected though.
>
> On Thu, Jan 6, 2022 at 12:11 AM Abdeali Kothari <ab...@gmail.com>
> wrote:
>
>> Just thought I'd do a quick bump and add the dev mailing list - in case
>> there is some insight there
>> Feels like this should be categorized as a bug for spark 3.2.0
>>
>> On Wed, Dec 29, 2021 at 5:25 PM Abdeali Kothari <ab...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I am using pyspark for some projects. And one of the things we are doing
>>> is trying to find the tables/columns being used by Spark using the
>>> execution plan.
>>>
>>> When we upgrade to spark 3.2 - the spark plan seems to be different from
>>> previous versions - mainly when we are doing joins.
>>> Below is a reproducible example (you could run the same in versions 2.3
>>> to 3.1 to see the difference)
>>>
>>> My original data frames have the columns: id#0 and id#4
>>> But after doing the joins we are seeing new columns id#34 and id#19
>>> which are not created from the original dataframes I was working with.
>>> In previous versions of spark, this used to use a ReusedExchange step
>>> (shown below)
>>>
>>> I was trying to understand if this is expected in spark 3.2 where the
>>> execution plan seems to be creating a new data source which does not
>>> originate from df1 and df2 which I provided.
>>> NOTE: The same happens even if I read from parquet files
>>>
>>> In spark 3.2:
>>> In [1]: import pyspark
>>>    ...: spark = pyspark.sql.SparkSession.builder.getOrCreate()
>>>
>>> In [2]: df1 = spark.createDataFrame([[1, 10], [2, 20]], ['id', 'col1'])
>>>    ...: df2 = spark.createDataFrame([[1, 11], [2, 22], [2, 222]], ['id',
>>> 'col2'])
>>>    ...: df1.explain()
>>>    ...: df2.explain()
>>> == Physical Plan ==
>>> *(1) Scan ExistingRDD[id#0L,col1#1L]
>>>
>>> == Physical Plan ==
>>> *(1) Scan ExistingRDD[id#4L,col2#5L]
>>>
>>> In [3]: df3 = df1.join(df2, df1['id'] == df2['id']).drop(df2['id'])
>>>    ...: df4 = df2.join(df3, df1['id'] == df2['id'])
>>>    ...: df4.explain()
>>> == Physical Plan ==
>>> AdaptiveSparkPlan isFinalPlan=false
>>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>>    :- Sort [id#4L ASC NULLS FIRST], false, 0
>>>    :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>>> [id=#53]
>>>    :     +- Filter isnotnull(id#4L)
>>>    :        +- Scan ExistingRDD[id#4L,col2#5L]
>>>    +- Project [id#0L, col1#1L, col2#20L]
>>>       +- SortMergeJoin [id#0L], [id#19L], Inner
>>>          :- Sort [id#0L ASC NULLS FIRST], false, 0
>>>          :  +- Exchange hashpartitioning(id#0L, 200),
>>> ENSURE_REQUIREMENTS, [id=#45]
>>>          :     +- Filter isnotnull(id#0L)
>>>          :        +- Scan ExistingRDD[id#0L,col1#1L]
>>>
>>>
>>>
>>> *         +- Sort [id#19L ASC NULLS FIRST], false, 0            +-
>>> Exchange hashpartitioning(id#19L, 200), ENSURE_REQUIREMENTS, [id=#46]
>>>          +- Filter isnotnull(id#19L)                  +- Scan
>>> ExistingRDD[id#19L,col2#20L]*
>>>
>>> In [4]: df1.createOrReplaceTempView('df1')
>>>    ...: df2.createOrReplaceTempView('df2')
>>>    ...: df3 = spark.sql("""
>>>    ...:     SELECT df1.id, df1.col1, df2.col2
>>>    ...:     FROM df1 JOIN df2 ON df1.id = df2.id
>>>    ...: """)
>>>    ...: df3.createOrReplaceTempView('df3')
>>>    ...: df4 = spark.sql("""
>>>    ...:     SELECT df2.*, df3.*
>>>    ...:     FROM df2 JOIN df3 ON df2.id = df3.id
>>>    ...: """)
>>>    ...: df4.explain()
>>> == Physical Plan ==
>>> AdaptiveSparkPlan isFinalPlan=false
>>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>>    :- Sort [id#4L ASC NULLS FIRST], false, 0
>>>    :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>>> [id=#110]
>>>    :     +- Filter isnotnull(id#4L)
>>>    :        +- Scan ExistingRDD[id#4L,col2#5L]
>>>    +- Project [id#0L, col1#1L, col2#35L]
>>>       +- SortMergeJoin [id#0L], [id#34L], Inner
>>>          :- Sort [id#0L ASC NULLS FIRST], false, 0
>>>          :  +- Exchange hashpartitioning(id#0L, 200),
>>> ENSURE_REQUIREMENTS, [id=#102]
>>>          :     +- Filter isnotnull(id#0L)
>>>          :        +- Scan ExistingRDD[id#0L,col1#1L]
>>>
>>>
>>>
>>> *         +- Sort [id#34L ASC NULLS FIRST], false, 0            +-
>>> Exchange hashpartitioning(id#34L, 200), ENSURE_REQUIREMENTS, [id=#103]
>>>          +- Filter isnotnull(id#34L)                  +- Scan
>>> ExistingRDD[id#34L,col2#35L]*
>>>
>>>
>>> Doing this in spark 3.1.1 - the plan is:
>>>
>>> *(8) SortMergeJoin [id#4L], [id#0L], Inner
>>> :- *(2) Sort [id#4L ASC NULLS FIRST], false, 0
>>> :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>>> [id=#56]
>>> :     +- *(1) Filter isnotnull(id#4L)
>>> :        +- *(1) Scan ExistingRDD[id#4L,col2#5L]
>>> +- *(7) Project [id#0L, col1#1L, col2#20L]
>>>    +- *(7) SortMergeJoin [id#0L], [id#19L], Inner
>>>       :- *(4) Sort [id#0L ASC NULLS FIRST], false, 0
>>>       :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
>>> [id=#62]
>>>       :     +- *(3) Filter isnotnull(id#0L)
>>>       :        +- *(3) Scan ExistingRDD[id#0L,col1#1L]
>>>
>>> *      +- *(6) Sort [id#19L ASC NULLS FIRST], false, 0         +-
>>> ReusedExchange [id#19L, col2#20L], Exchange hashpartitioning(id#4L, 200),
>>> ENSURE_REQUIREMENTS, [id=#56]*
>>>
>>>
>
> --
> ~~~~~~~~~~~~~~~
> no mistakes
> ~~~~~~~~~~~~~~~~~~
>

Re: Spark 3.2 - ReusedExchange not present in join execution plan

Posted by Albert <zi...@gmail.com>.
I happen to encounter something similar.

it's probably because you are just `explain` it. when you actually `run`
it. you will get the final spark plan in which case the exchange will be
reused.
right, this is different compared with 3.1 probably because the upgraded
aqe.

not sure whether this is expected though.

On Thu, Jan 6, 2022 at 12:11 AM Abdeali Kothari <ab...@gmail.com>
wrote:

> Just thought I'd do a quick bump and add the dev mailing list - in case
> there is some insight there
> Feels like this should be categorized as a bug for spark 3.2.0
>
> On Wed, Dec 29, 2021 at 5:25 PM Abdeali Kothari <ab...@gmail.com>
> wrote:
>
>> Hi,
>> I am using pyspark for some projects. And one of the things we are doing
>> is trying to find the tables/columns being used by Spark using the
>> execution plan.
>>
>> When we upgrade to spark 3.2 - the spark plan seems to be different from
>> previous versions - mainly when we are doing joins.
>> Below is a reproducible example (you could run the same in versions 2.3
>> to 3.1 to see the difference)
>>
>> My original data frames have the columns: id#0 and id#4
>> But after doing the joins we are seeing new columns id#34 and id#19 which
>> are not created from the original dataframes I was working with.
>> In previous versions of spark, this used to use a ReusedExchange step
>> (shown below)
>>
>> I was trying to understand if this is expected in spark 3.2 where the
>> execution plan seems to be creating a new data source which does not
>> originate from df1 and df2 which I provided.
>> NOTE: The same happens even if I read from parquet files
>>
>> In spark 3.2:
>> In [1]: import pyspark
>>    ...: spark = pyspark.sql.SparkSession.builder.getOrCreate()
>>
>> In [2]: df1 = spark.createDataFrame([[1, 10], [2, 20]], ['id', 'col1'])
>>    ...: df2 = spark.createDataFrame([[1, 11], [2, 22], [2, 222]], ['id',
>> 'col2'])
>>    ...: df1.explain()
>>    ...: df2.explain()
>> == Physical Plan ==
>> *(1) Scan ExistingRDD[id#0L,col1#1L]
>>
>> == Physical Plan ==
>> *(1) Scan ExistingRDD[id#4L,col2#5L]
>>
>> In [3]: df3 = df1.join(df2, df1['id'] == df2['id']).drop(df2['id'])
>>    ...: df4 = df2.join(df3, df1['id'] == df2['id'])
>>    ...: df4.explain()
>> == Physical Plan ==
>> AdaptiveSparkPlan isFinalPlan=false
>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>    :- Sort [id#4L ASC NULLS FIRST], false, 0
>>    :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>> [id=#53]
>>    :     +- Filter isnotnull(id#4L)
>>    :        +- Scan ExistingRDD[id#4L,col2#5L]
>>    +- Project [id#0L, col1#1L, col2#20L]
>>       +- SortMergeJoin [id#0L], [id#19L], Inner
>>          :- Sort [id#0L ASC NULLS FIRST], false, 0
>>          :  +- Exchange hashpartitioning(id#0L, 200),
>> ENSURE_REQUIREMENTS, [id=#45]
>>          :     +- Filter isnotnull(id#0L)
>>          :        +- Scan ExistingRDD[id#0L,col1#1L]
>>
>>
>>
>> *         +- Sort [id#19L ASC NULLS FIRST], false, 0            +-
>> Exchange hashpartitioning(id#19L, 200), ENSURE_REQUIREMENTS, [id=#46]
>>          +- Filter isnotnull(id#19L)                  +- Scan
>> ExistingRDD[id#19L,col2#20L]*
>>
>> In [4]: df1.createOrReplaceTempView('df1')
>>    ...: df2.createOrReplaceTempView('df2')
>>    ...: df3 = spark.sql("""
>>    ...:     SELECT df1.id, df1.col1, df2.col2
>>    ...:     FROM df1 JOIN df2 ON df1.id = df2.id
>>    ...: """)
>>    ...: df3.createOrReplaceTempView('df3')
>>    ...: df4 = spark.sql("""
>>    ...:     SELECT df2.*, df3.*
>>    ...:     FROM df2 JOIN df3 ON df2.id = df3.id
>>    ...: """)
>>    ...: df4.explain()
>> == Physical Plan ==
>> AdaptiveSparkPlan isFinalPlan=false
>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>    :- Sort [id#4L ASC NULLS FIRST], false, 0
>>    :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>> [id=#110]
>>    :     +- Filter isnotnull(id#4L)
>>    :        +- Scan ExistingRDD[id#4L,col2#5L]
>>    +- Project [id#0L, col1#1L, col2#35L]
>>       +- SortMergeJoin [id#0L], [id#34L], Inner
>>          :- Sort [id#0L ASC NULLS FIRST], false, 0
>>          :  +- Exchange hashpartitioning(id#0L, 200),
>> ENSURE_REQUIREMENTS, [id=#102]
>>          :     +- Filter isnotnull(id#0L)
>>          :        +- Scan ExistingRDD[id#0L,col1#1L]
>>
>>
>>
>> *         +- Sort [id#34L ASC NULLS FIRST], false, 0            +-
>> Exchange hashpartitioning(id#34L, 200), ENSURE_REQUIREMENTS, [id=#103]
>>          +- Filter isnotnull(id#34L)                  +- Scan
>> ExistingRDD[id#34L,col2#35L]*
>>
>>
>> Doing this in spark 3.1.1 - the plan is:
>>
>> *(8) SortMergeJoin [id#4L], [id#0L], Inner
>> :- *(2) Sort [id#4L ASC NULLS FIRST], false, 0
>> :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS, [id=#56]
>> :     +- *(1) Filter isnotnull(id#4L)
>> :        +- *(1) Scan ExistingRDD[id#4L,col2#5L]
>> +- *(7) Project [id#0L, col1#1L, col2#20L]
>>    +- *(7) SortMergeJoin [id#0L], [id#19L], Inner
>>       :- *(4) Sort [id#0L ASC NULLS FIRST], false, 0
>>       :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
>> [id=#62]
>>       :     +- *(3) Filter isnotnull(id#0L)
>>       :        +- *(3) Scan ExistingRDD[id#0L,col1#1L]
>>
>> *      +- *(6) Sort [id#19L ASC NULLS FIRST], false, 0         +-
>> ReusedExchange [id#19L, col2#20L], Exchange hashpartitioning(id#4L, 200),
>> ENSURE_REQUIREMENTS, [id=#56]*
>>
>>

-- 
~~~~~~~~~~~~~~~
no mistakes
~~~~~~~~~~~~~~~~~~

Re: Spark 3.2 - ReusedExchange not present in join execution plan

Posted by Albert <zi...@gmail.com>.
I happen to encounter something similar.

it's probably because you are just `explain` it. when you actually `run`
it. you will get the final spark plan in which case the exchange will be
reused.
right, this is different compared with 3.1 probably because the upgraded
aqe.

not sure whether this is expected though.

On Thu, Jan 6, 2022 at 12:11 AM Abdeali Kothari <ab...@gmail.com>
wrote:

> Just thought I'd do a quick bump and add the dev mailing list - in case
> there is some insight there
> Feels like this should be categorized as a bug for spark 3.2.0
>
> On Wed, Dec 29, 2021 at 5:25 PM Abdeali Kothari <ab...@gmail.com>
> wrote:
>
>> Hi,
>> I am using pyspark for some projects. And one of the things we are doing
>> is trying to find the tables/columns being used by Spark using the
>> execution plan.
>>
>> When we upgrade to spark 3.2 - the spark plan seems to be different from
>> previous versions - mainly when we are doing joins.
>> Below is a reproducible example (you could run the same in versions 2.3
>> to 3.1 to see the difference)
>>
>> My original data frames have the columns: id#0 and id#4
>> But after doing the joins we are seeing new columns id#34 and id#19 which
>> are not created from the original dataframes I was working with.
>> In previous versions of spark, this used to use a ReusedExchange step
>> (shown below)
>>
>> I was trying to understand if this is expected in spark 3.2 where the
>> execution plan seems to be creating a new data source which does not
>> originate from df1 and df2 which I provided.
>> NOTE: The same happens even if I read from parquet files
>>
>> In spark 3.2:
>> In [1]: import pyspark
>>    ...: spark = pyspark.sql.SparkSession.builder.getOrCreate()
>>
>> In [2]: df1 = spark.createDataFrame([[1, 10], [2, 20]], ['id', 'col1'])
>>    ...: df2 = spark.createDataFrame([[1, 11], [2, 22], [2, 222]], ['id',
>> 'col2'])
>>    ...: df1.explain()
>>    ...: df2.explain()
>> == Physical Plan ==
>> *(1) Scan ExistingRDD[id#0L,col1#1L]
>>
>> == Physical Plan ==
>> *(1) Scan ExistingRDD[id#4L,col2#5L]
>>
>> In [3]: df3 = df1.join(df2, df1['id'] == df2['id']).drop(df2['id'])
>>    ...: df4 = df2.join(df3, df1['id'] == df2['id'])
>>    ...: df4.explain()
>> == Physical Plan ==
>> AdaptiveSparkPlan isFinalPlan=false
>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>    :- Sort [id#4L ASC NULLS FIRST], false, 0
>>    :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>> [id=#53]
>>    :     +- Filter isnotnull(id#4L)
>>    :        +- Scan ExistingRDD[id#4L,col2#5L]
>>    +- Project [id#0L, col1#1L, col2#20L]
>>       +- SortMergeJoin [id#0L], [id#19L], Inner
>>          :- Sort [id#0L ASC NULLS FIRST], false, 0
>>          :  +- Exchange hashpartitioning(id#0L, 200),
>> ENSURE_REQUIREMENTS, [id=#45]
>>          :     +- Filter isnotnull(id#0L)
>>          :        +- Scan ExistingRDD[id#0L,col1#1L]
>>
>>
>>
>> *         +- Sort [id#19L ASC NULLS FIRST], false, 0            +-
>> Exchange hashpartitioning(id#19L, 200), ENSURE_REQUIREMENTS, [id=#46]
>>          +- Filter isnotnull(id#19L)                  +- Scan
>> ExistingRDD[id#19L,col2#20L]*
>>
>> In [4]: df1.createOrReplaceTempView('df1')
>>    ...: df2.createOrReplaceTempView('df2')
>>    ...: df3 = spark.sql("""
>>    ...:     SELECT df1.id, df1.col1, df2.col2
>>    ...:     FROM df1 JOIN df2 ON df1.id = df2.id
>>    ...: """)
>>    ...: df3.createOrReplaceTempView('df3')
>>    ...: df4 = spark.sql("""
>>    ...:     SELECT df2.*, df3.*
>>    ...:     FROM df2 JOIN df3 ON df2.id = df3.id
>>    ...: """)
>>    ...: df4.explain()
>> == Physical Plan ==
>> AdaptiveSparkPlan isFinalPlan=false
>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>    :- Sort [id#4L ASC NULLS FIRST], false, 0
>>    :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>> [id=#110]
>>    :     +- Filter isnotnull(id#4L)
>>    :        +- Scan ExistingRDD[id#4L,col2#5L]
>>    +- Project [id#0L, col1#1L, col2#35L]
>>       +- SortMergeJoin [id#0L], [id#34L], Inner
>>          :- Sort [id#0L ASC NULLS FIRST], false, 0
>>          :  +- Exchange hashpartitioning(id#0L, 200),
>> ENSURE_REQUIREMENTS, [id=#102]
>>          :     +- Filter isnotnull(id#0L)
>>          :        +- Scan ExistingRDD[id#0L,col1#1L]
>>
>>
>>
>> *         +- Sort [id#34L ASC NULLS FIRST], false, 0            +-
>> Exchange hashpartitioning(id#34L, 200), ENSURE_REQUIREMENTS, [id=#103]
>>          +- Filter isnotnull(id#34L)                  +- Scan
>> ExistingRDD[id#34L,col2#35L]*
>>
>>
>> Doing this in spark 3.1.1 - the plan is:
>>
>> *(8) SortMergeJoin [id#4L], [id#0L], Inner
>> :- *(2) Sort [id#4L ASC NULLS FIRST], false, 0
>> :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS, [id=#56]
>> :     +- *(1) Filter isnotnull(id#4L)
>> :        +- *(1) Scan ExistingRDD[id#4L,col2#5L]
>> +- *(7) Project [id#0L, col1#1L, col2#20L]
>>    +- *(7) SortMergeJoin [id#0L], [id#19L], Inner
>>       :- *(4) Sort [id#0L ASC NULLS FIRST], false, 0
>>       :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
>> [id=#62]
>>       :     +- *(3) Filter isnotnull(id#0L)
>>       :        +- *(3) Scan ExistingRDD[id#0L,col1#1L]
>>
>> *      +- *(6) Sort [id#19L ASC NULLS FIRST], false, 0         +-
>> ReusedExchange [id#19L, col2#20L], Exchange hashpartitioning(id#4L, 200),
>> ENSURE_REQUIREMENTS, [id=#56]*
>>
>>

-- 
~~~~~~~~~~~~~~~
no mistakes
~~~~~~~~~~~~~~~~~~