You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jurriaan Pruis (JIRA)" <ji...@apache.org> on 2016/06/28 10:51:57 UTC

[jira] [Updated] (SPARK-16252) Full Outer join with literal column results in incorrect result

     [ https://issues.apache.org/jira/browse/SPARK-16252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jurriaan Pruis updated SPARK-16252:
-----------------------------------
    Description: 
{code}
>>> from pyspark.sql.functions import lit, coalesce
>>> data1 = [[1,2], [3, 4], [5, 6]]
>>> data2 = [[1,2], [5, 6]]
>>> df1 = sqlContext.createDataFrame(data1, ["a", "b"])
>>> df2 = sqlContext.createDataFrame(data2, ["a", "b"])
>>> df2 = df2.withColumn('type', lit('type 2'))
>>> df1.join(df2, ['a', 'b'], 'outer').select('a', 'b', coalesce(df2['type'], lit('type 1')).alias('type')).show()
+---+---+------+
|  a|  b|  type|
+---+---+------+
|  5|  6|type 2|
|  3|  4|type 2|
|  1|  2|type 2|
+---+---+------+
{code}

While the expected result would be

{code}
+---+---+------+
|  a|  b|  type|
+---+---+------+
|  5|  6|type 2|
|  3|  4|type 1|
|  1|  2|type 2|
+---+---+------+
{code}

The generated query plan is:
{code}
== Parsed Logical Plan ==
'Project [unresolvedalias('a, None), unresolvedalias('b, None), coalesce(type#43, type 1) AS type#74]
+- Project [coalesce(a#33L, a#38L) AS a#71L, coalesce(b#34L, b#39L) AS b#72L, type#43]
   +- Join FullOuter, ((a#33L = a#38L) && (b#34L = b#39L))
      :- LogicalRDD [a#33L, b#34L]
      +- Project [a#38L, b#39L, type 2 AS type#43]
         +- LogicalRDD [a#38L, b#39L]

== Analyzed Logical Plan ==
a: bigint, b: bigint, type: string
Project [a#71L, b#72L, coalesce(type#43, type 1) AS type#74]
+- Project [coalesce(a#33L, a#38L) AS a#71L, coalesce(b#34L, b#39L) AS b#72L, type#43]
   +- Join FullOuter, ((a#33L = a#38L) && (b#34L = b#39L))
      :- LogicalRDD [a#33L, b#34L]
      +- Project [a#38L, b#39L, type 2 AS type#43]
         +- LogicalRDD [a#38L, b#39L]

== Optimized Logical Plan ==
Project [coalesce(a#33L, a#38L) AS a#71L, coalesce(b#34L, b#39L) AS b#72L, type 2 AS type#74]
+- Join FullOuter, ((a#33L = a#38L) && (b#34L = b#39L))
   :- LogicalRDD [a#33L, b#34L]
   +- LogicalRDD [a#38L, b#39L]

== Physical Plan ==
*Project [coalesce(a#33L, a#38L) AS a#71L, coalesce(b#34L, b#39L) AS b#72L, type 2 AS type#74]
+- SortMergeJoin [a#33L, b#34L], [a#38L, b#39L], FullOuter
   :- *Sort [a#33L ASC, b#34L ASC], false, 0
   :  +- Exchange hashpartitioning(a#33L, b#34L, 200)
   :     +- Scan ExistingRDD[a#33L,b#34L]
   +- *Sort [a#38L ASC, b#39L ASC], false, 0
      +- Exchange hashpartitioning(a#38L, b#39L, 200)
         +- Scan ExistingRDD[a#38L,b#39L]
{code}

As you can see the physical query plan is clearly wrong! It moved the 'type 2' literal to the final Project step.


  was:
{code}
>>> from pyspark.sql.functions import lit, coalesce
>>> data1 = [[1,2], [3, 4], [5, 6]]
>>> data2 = [[1,2], [5, 6]]
>>> df1 = sqlContext.createDataFrame(data1, ["a", "b"])
>>> df2 = sqlContext.createDataFrame(data2, ["a", "b"])
>>> df2 = df2.withColumn('type', lit('type 2'))
>>> df1.join(df2, ['a', 'b'], 'outer').select('a', 'b', coalesce(df2['type'], lit('type 1')).alias('type')).show()
+---+---+------+
|  a|  b|  type|
+---+---+------+
|  5|  6|type 2|
|  3|  4|type 2|
|  1|  2|type 2|
+---+---+------+
{code}

While the expected result would be

{code}
>>> df1.join(df2, ['a', 'b'], 'outer').select('a', 'b', coalesce(df2['type'], lit('type 1')).alias('type')).show()
+---+---+------+
|  a|  b|  type|
+---+---+------+
|  5|  6|type 2|
|  3|  4|type 1|
|  1|  2|type 2|
+---+---+------+
{code}

The generated query plan is:
{code}
== Parsed Logical Plan ==
'Project [unresolvedalias('a, None), unresolvedalias('b, None), coalesce(type#43, type 1) AS type#74]
+- Project [coalesce(a#33L, a#38L) AS a#71L, coalesce(b#34L, b#39L) AS b#72L, type#43]
   +- Join FullOuter, ((a#33L = a#38L) && (b#34L = b#39L))
      :- LogicalRDD [a#33L, b#34L]
      +- Project [a#38L, b#39L, type 2 AS type#43]
         +- LogicalRDD [a#38L, b#39L]

== Analyzed Logical Plan ==
a: bigint, b: bigint, type: string
Project [a#71L, b#72L, coalesce(type#43, type 1) AS type#74]
+- Project [coalesce(a#33L, a#38L) AS a#71L, coalesce(b#34L, b#39L) AS b#72L, type#43]
   +- Join FullOuter, ((a#33L = a#38L) && (b#34L = b#39L))
      :- LogicalRDD [a#33L, b#34L]
      +- Project [a#38L, b#39L, type 2 AS type#43]
         +- LogicalRDD [a#38L, b#39L]

== Optimized Logical Plan ==
Project [coalesce(a#33L, a#38L) AS a#71L, coalesce(b#34L, b#39L) AS b#72L, type 2 AS type#74]
+- Join FullOuter, ((a#33L = a#38L) && (b#34L = b#39L))
   :- LogicalRDD [a#33L, b#34L]
   +- LogicalRDD [a#38L, b#39L]

== Physical Plan ==
*Project [coalesce(a#33L, a#38L) AS a#71L, coalesce(b#34L, b#39L) AS b#72L, type 2 AS type#74]
+- SortMergeJoin [a#33L, b#34L], [a#38L, b#39L], FullOuter
   :- *Sort [a#33L ASC, b#34L ASC], false, 0
   :  +- Exchange hashpartitioning(a#33L, b#34L, 200)
   :     +- Scan ExistingRDD[a#33L,b#34L]
   +- *Sort [a#38L ASC, b#39L ASC], false, 0
      +- Exchange hashpartitioning(a#38L, b#39L, 200)
         +- Scan ExistingRDD[a#38L,b#39L]
{code}

As you can see the physical query plan is clearly wrong! It moved the 'type 2' literal to the final Project step.



> Full Outer join with literal column results in incorrect result
> ---------------------------------------------------------------
>
>                 Key: SPARK-16252
>                 URL: https://issues.apache.org/jira/browse/SPARK-16252
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.0
>         Environment: Latest Spark master
>            Reporter: Jurriaan Pruis
>
> {code}
> >>> from pyspark.sql.functions import lit, coalesce
> >>> data1 = [[1,2], [3, 4], [5, 6]]
> >>> data2 = [[1,2], [5, 6]]
> >>> df1 = sqlContext.createDataFrame(data1, ["a", "b"])
> >>> df2 = sqlContext.createDataFrame(data2, ["a", "b"])
> >>> df2 = df2.withColumn('type', lit('type 2'))
> >>> df1.join(df2, ['a', 'b'], 'outer').select('a', 'b', coalesce(df2['type'], lit('type 1')).alias('type')).show()
> +---+---+------+
> |  a|  b|  type|
> +---+---+------+
> |  5|  6|type 2|
> |  3|  4|type 2|
> |  1|  2|type 2|
> +---+---+------+
> {code}
> While the expected result would be
> {code}
> +---+---+------+
> |  a|  b|  type|
> +---+---+------+
> |  5|  6|type 2|
> |  3|  4|type 1|
> |  1|  2|type 2|
> +---+---+------+
> {code}
> The generated query plan is:
> {code}
> == Parsed Logical Plan ==
> 'Project [unresolvedalias('a, None), unresolvedalias('b, None), coalesce(type#43, type 1) AS type#74]
> +- Project [coalesce(a#33L, a#38L) AS a#71L, coalesce(b#34L, b#39L) AS b#72L, type#43]
>    +- Join FullOuter, ((a#33L = a#38L) && (b#34L = b#39L))
>       :- LogicalRDD [a#33L, b#34L]
>       +- Project [a#38L, b#39L, type 2 AS type#43]
>          +- LogicalRDD [a#38L, b#39L]
> == Analyzed Logical Plan ==
> a: bigint, b: bigint, type: string
> Project [a#71L, b#72L, coalesce(type#43, type 1) AS type#74]
> +- Project [coalesce(a#33L, a#38L) AS a#71L, coalesce(b#34L, b#39L) AS b#72L, type#43]
>    +- Join FullOuter, ((a#33L = a#38L) && (b#34L = b#39L))
>       :- LogicalRDD [a#33L, b#34L]
>       +- Project [a#38L, b#39L, type 2 AS type#43]
>          +- LogicalRDD [a#38L, b#39L]
> == Optimized Logical Plan ==
> Project [coalesce(a#33L, a#38L) AS a#71L, coalesce(b#34L, b#39L) AS b#72L, type 2 AS type#74]
> +- Join FullOuter, ((a#33L = a#38L) && (b#34L = b#39L))
>    :- LogicalRDD [a#33L, b#34L]
>    +- LogicalRDD [a#38L, b#39L]
> == Physical Plan ==
> *Project [coalesce(a#33L, a#38L) AS a#71L, coalesce(b#34L, b#39L) AS b#72L, type 2 AS type#74]
> +- SortMergeJoin [a#33L, b#34L], [a#38L, b#39L], FullOuter
>    :- *Sort [a#33L ASC, b#34L ASC], false, 0
>    :  +- Exchange hashpartitioning(a#33L, b#34L, 200)
>    :     +- Scan ExistingRDD[a#33L,b#34L]
>    +- *Sort [a#38L ASC, b#39L ASC], false, 0
>       +- Exchange hashpartitioning(a#38L, b#39L, 200)
>          +- Scan ExistingRDD[a#38L,b#39L]
> {code}
> As you can see the physical query plan is clearly wrong! It moved the 'type 2' literal to the final Project step.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org