You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "angerszhu (Jira)" <ji...@apache.org> on 2020/10/13 07:41:00 UTC

[jira] [Comment Edited] (SPARK-33071) Join with ambiguous column succeeding but giving wrong output

    [ https://issues.apache.org/jira/browse/SPARK-33071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17212862#comment-17212862 ] 

angerszhu edited comment on SPARK-33071 at 10/13/20, 7:40 AM:
--------------------------------------------------------------

it is wrong, I am working on this 


was (Author: angerszhuuu):
it is wrong, I am working on this will rase a pr

> Join with ambiguous column succeeding but giving wrong output
> -------------------------------------------------------------
>
>                 Key: SPARK-33071
>                 URL: https://issues.apache.org/jira/browse/SPARK-33071
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.4, 3.0.1
>            Reporter: George
>            Priority: Major
>              Labels: correctness
>
> When joining two datasets where one column in each dataset is sourced from the same input dataset, the join successfully runs, but does not select the correct columns, leading to incorrect output.
> Repro using pyspark:
> {code:java}
> sc.version
> import pyspark.sql.functions as F
> d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, 'units' : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', 'sales': 1, 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}]
> input_df = spark.createDataFrame(d)
> df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales'))
> df2 = input_df.groupBy("key").agg(F.sum('units').alias('units'))
> df1 = df1.filter(F.col("key") != F.lit("c"))
> df2 = df2.filter(F.col("key") != F.lit("d"))
> ret = df1.join(df2, df1.key == df2.key, "full").select(
> df1["key"].alias("df1_key"),
> df2["key"].alias("df2_key"),
> df1["sales"],
> df2["units"],
> F.coalesce(df1["key"], df2["key"]).alias("key"))
> ret.show()
> ret.explain(){code}
> output for 2.4.4:
> {code:java}
> >>> sc.version
> u'2.4.4'
> >>> import pyspark.sql.functions as F
> >>> d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, 'units' : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', 'sales': 1, 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}]
> >>> input_df = spark.createDataFrame(d)
> >>> df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales'))
> >>> df2 = input_df.groupBy("key").agg(F.sum('units').alias('units'))
> >>> df1 = df1.filter(F.col("key") != F.lit("c"))
> >>> df2 = df2.filter(F.col("key") != F.lit("d"))
> >>> ret = df1.join(df2, df1.key == df2.key, "full").select(
> ... df1["key"].alias("df1_key"),
> ... df2["key"].alias("df2_key"),
> ... df1["sales"],
> ... df2["units"],
> ... F.coalesce(df1["key"], df2["key"]).alias("key"))
> 20/10/05 15:46:14 WARN Column: Constructing trivially true equals predicate, 'key#213 = key#213'. Perhaps you need to use aliases.
> >>> ret.show()
> +-------+-------+-----+-----+----+
> |df1_key|df2_key|sales|units| key|
> +-------+-------+-----+-----+----+
> |      d|      d|    3| null|   d|
> |   null|   null| null|    2|null|
> |      b|      b|    5|   10|   b|
> |      a|      a|    3|    6|   a|
> +-------+-------+-----+-----+----+>>> ret.explain()
> == Physical Plan ==
> *(5) Project [key#213 AS df1_key#258, key#213 AS df2_key#259, sales#223L, units#230L, coalesce(key#213, key#213) AS key#260]
> +- SortMergeJoin [key#213], [key#237], FullOuter
>    :- *(2) Sort [key#213 ASC NULLS FIRST], false, 0
>    :  +- *(2) HashAggregate(keys=[key#213], functions=[sum(sales#214L)])
>    :     +- Exchange hashpartitioning(key#213, 200)
>    :        +- *(1) HashAggregate(keys=[key#213], functions=[partial_sum(sales#214L)])
>    :           +- *(1) Project [key#213, sales#214L]
>    :              +- *(1) Filter (isnotnull(key#213) && NOT (key#213 = c))
>    :                 +- Scan ExistingRDD[key#213,sales#214L,units#215L]
>    +- *(4) Sort [key#237 ASC NULLS FIRST], false, 0
>       +- *(4) HashAggregate(keys=[key#237], functions=[sum(units#239L)])
>          +- Exchange hashpartitioning(key#237, 200)
>             +- *(3) HashAggregate(keys=[key#237], functions=[partial_sum(units#239L)])
>                +- *(3) Project [key#237, units#239L]
>                   +- *(3) Filter (isnotnull(key#237) && NOT (key#237 = d))
>                      +- Scan ExistingRDD[key#237,sales#238L,units#239L]
> {code}
> output for 3.0.1:
> {code:java}
> // code placeholder
> >>> sc.version
> u'3.0.1'
> >>> import pyspark.sql.functions as F
> >>> d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, 'units' : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', 'sales': 1, 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}]
> >>> input_df = spark.createDataFrame(d)
> /usr/local/lib/python2.7/site-packages/pyspark/sql/session.py:381: UserWarning: inferring schema from dict is deprecated,please use pyspark.sql.Row instead
>   warnings.warn("inferring schema from dict is deprecated,"
> >>> df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales'))
> >>> df2 = input_df.groupBy("key").agg(F.sum('units').alias('units'))
> >>> df1 = df1.filter(F.col("key") != F.lit("c"))
> >>> df2 = df2.filter(F.col("key") != F.lit("d"))
> >>> ret = df1.join(df2, df1.key == df2.key, "full").select(
> ... df1["key"].alias("df1_key"),
> ... df2["key"].alias("df2_key"),
> ... df1["sales"],
> ... df2["units"],
> ... F.coalesce(df1["key"], df2["key"]).alias("key"))
> >>> ret.show()
> +-------+-------+-----+-----+----+
> |df1_key|df2_key|sales|units| key|
> +-------+-------+-----+-----+----+
> |      d|      d|    3| null|   d|
> |   null|   null| null|    2|null|
> |      b|      b|    5|   10|   b|
> |      a|      a|    3|    6|   a|
> +-------+-------+-----+-----+----+>>> ret.explain()
> == Physical Plan ==
> *(5) Project [key#0 AS df1_key#45, key#0 AS df2_key#46, sales#10L, units#17L, coalesce(key#0, key#0) AS key#47]
> +- SortMergeJoin [key#0], [key#24], FullOuter
>    :- *(2) Sort [key#0 ASC NULLS FIRST], false, 0
>    :  +- *(2) HashAggregate(keys=[key#0], functions=[sum(sales#1L)])
>    :     +- Exchange hashpartitioning(key#0, 200), true, [id=#152]
>    :        +- *(1) HashAggregate(keys=[key#0], functions=[partial_sum(sales#1L)])
>    :           +- *(1) Project [key#0, sales#1L]
>    :              +- *(1) Filter (isnotnull(key#0) AND NOT (key#0 = c))
>    :                 +- *(1) Scan ExistingRDD[key#0,sales#1L,units#2L]
>    +- *(4) Sort [key#24 ASC NULLS FIRST], false, 0
>       +- *(4) HashAggregate(keys=[key#24], functions=[sum(units#26L)])
>          +- Exchange hashpartitioning(key#24, 200), true, [id=#158]
>             +- *(3) HashAggregate(keys=[key#24], functions=[partial_sum(units#26L)])
>                +- *(3) Project [key#24, units#26L]
>                   +- *(3) Filter (isnotnull(key#24) AND NOT (key#24 = d))
>                      +- *(3) Scan ExistingRDD[key#24,sales#25L,units#26L]{code}
> key#0 is the reference used for both alias operations and both sides of the coalesce, despite the query plan projecting key#24 for the right side of the join.
> Concretely, I believe the output of the join should be this:
> {code:java}
> +-------+-------+-----+-----+----+
> |df1_key|df2_key|sales|units| key|
> +-------+-------+-----+-----+----+
> |      d|   null|    3| null|   d|
> |   null|      c| null|    2|   c|
> |      b|      b|    5|   10|   b|
> |      a|      a|    3|    6|   a|
> +-------+-------+-----+-----+----+
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org