You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "GUAN Hao (JIRA)" <ji...@apache.org> on 2016/08/03 10:17:20 UTC

[jira] [Updated] (SPARK-16869) Wrong projection when join on columns with the same name which are derived from the same dataframe

     [ https://issues.apache.org/jira/browse/SPARK-16869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

GUAN Hao updated SPARK-16869:
-----------------------------
    Description: 
I have to DataFrames, both contain a column named *i* which are derived from a same DataFrame (join).

{code}
b
+---+---+---+---+
|  j|  p|  i|  k|
+---+---+---+---+
|  3|  2|  3|  3|
|  2|  1|  2|  2|
+---+---+---+---+

c
+---+---+---+---+
|  j|  k|  q|  i|
+---+---+---+---+
|  1|  1|  0|  1|
|  2|  2|  1|  2|
+---+---+---+---+
{code}

The result of OUTER join of two DataFrames above is:

{code}
i = colaesce(b.i, c.i)
+----+----+----+---+---+----+----+
| b_i| c_i|   i|  j|  k|   p|   q|
+----+----+----+---+---+----+----+
|   2|   2|   2|  2|  2|   1|   1|
|null|   1|   1|  1|  1|null|   0|
|   3|null|   3|  3|  3|   2|null|
+----+----+----+---+---+----+----+
{code}

However, what I got is:

{code}
+----+----+----+---+---+----+----+
| b_i| c_i|   i|  j|  k|   p|   q|
+----+----+----+---+---+----+----+
|   2|   2|   2|  2|  2|   1|   1|
|null|null|null|  1|  1|null|   0|
|   3|   3|   3|  3|  3|   2|null|
+----+----+----+---+---+----+----+
{code}

{code}
== Physical Plan ==
*Project [i#0L AS b_i#146L, i#0L AS c_i#147L, coalesce(i#0L, i#0L) AS i#148L, coalesce(j#12L, j#21L) AS j#149L, coalesce(k#2L, k#22L) AS k#150L, p#13L, q#23L]
+- SortMergeJoin [i#0L, j#12L, k#2L], [i#113L, j#21L, k#22L], FullOuter
....
{code}

As shown in the plan, columns {{b.i}} and {{c.i}} are correctly resolved to {{i#0L}} and {{i#113L}} correspondingly in the join condition part. However,
in the projection part, both {{b.i}} and {{c.i}} are resolved to {{i#0L}}.

Complete code to re-produce:

{code}
from pyspark import SparkContext, SQLContext

from pyspark.sql import Row, functions

sc = SparkContext()
sqlContext = SQLContext(sc)

data_a = sc.parallelize([
    Row(i=1, j=1, k=1),
    Row(i=2, j=2, k=2),
    Row(i=3, j=3, k=3),
])
table_a = sqlContext.createDataFrame(data_a)
table_a.show()

data_b = sc.parallelize([
    Row(j=2, p=1),
    Row(j=3, p=2),
])
table_b = sqlContext.createDataFrame(data_b)
table_b.show()

data_c = sc.parallelize([
    Row(j=1, k=1, q=0),
    Row(j=2, k=2, q=1),
])
table_c = sqlContext.createDataFrame(data_c)
table_c.show()

b = table_b.join(table_a, table_b.j == table_a.j).drop(table_a.j)

c = table_c.join(table_a, (table_c.j == table_a.j)
                  & (table_c.k == table_a.k)) \
    .drop(table_a.j) \
    .drop(table_a.k)


b.show()
c.show()

result = b.join(c, (b.i == c.i)
              & (b.j == c.j)
              & (b.k == c.k), 'outer') \
    .select(
        b.i.alias('b_i'),
        c.i.alias('c_i'),
        functions.coalesce(b.i, c.i).alias('i'),
        functions.coalesce(b.j, c.j).alias('j'),
        functions.coalesce(b.k, c.k).alias('k'),
        b.p,
        c.q,
    )

result.explain()
result.show()
{code}


  was:
I have to DataFrames, both contain a column named *i* which are derived from a same DataFrame (join).

{code}
b
+---+---+---+---+
|  j|  p|  i|  k|
+---+---+---+---+
|  3|  2|  3|  3|
|  2|  1|  2|  2|
+---+---+---+---+

c
+---+---+---+---+
|  j|  k|  q|  i|
+---+---+---+---+
|  1|  1|  0|  1|
|  2|  2|  1|  2|
+---+---+---+---+
{code}

The result of OUTER join of two DataFrames above is:

{code}
i = colaesce(b.i, c.i)
+----+----+----+---+---+----+----+
| b_i| c_i|   i|  j|  k|   p|   q|
+----+----+----+---+---+----+----+
|   2|   2|   2|  2|  2|   1|   1|
|null|   1|   1|  1|  1|null|   0|
|   3|null|   3|  3|  3|   2|null|
+----+----+----+---+---+----+----+
{code}

However, what I got is:

{code}
+----+----+----+---+---+----+----+
| b_i| c_i|   i|  j|  k|   p|   q|
+----+----+----+---+---+----+----+
|   2|   2|   2|  2|  2|   1|   1|
|null|null|null|  1|  1|null|   0|
|   3|   3|   3|  3|  3|   2|null|
+----+----+----+---+---+----+----+
{code}

{code}
== Physical Plan ==
*Project [i#0L AS b_i#146L, i#0L AS c_i#147L, coalesce(i#0L, i#0L) AS i#148L, coalesce(j#12L, j#21L) AS j#149L, coalesce(k#2L, k#22L) AS k#150L, p#13L, q#23L]
+- SortMergeJoin [i#0L, j#12L, k#2L], [i#113L, j#21L, k#22L], FullOuter
....
{code}

As shown in the plan, columns {{b.i}} and {{c.i}} are correctly resolved to {{i#0L}} and {{i#113L}} correspondingly in the join condition part. However,
in the projection part, both {{b.i}} and {{c.i}} are resolved to {{i#0L}}.

Complete code to re-produce:

{code}
from pyspark import SparkContext, SQLContext

from pyspark.sql import Row, functions

sc = SparkContext()
sqlContext = SQLContext(sc)

data_a = sc.parallelize([
    Row(i=1, j=1, k=1),
    Row(i=2, j=2, k=2),
    Row(i=3, j=3, k=3),
])
table_a = sqlContext.createDataFrame(data_a)
table_a.show()

data_b = sc.parallelize([
    Row(j=2, p=1),
    Row(j=3, p=2),
])
table_b = sqlContext.createDataFrame(data_b)
table_b.show()

data_c = sc.parallelize([
    Row(j=1, k=1, q=0),
    Row(j=2, k=2, q=1),
])
table_c = sqlContext.createDataFrame(data_c)
table_c.show()

b = table_b.join(table_a, table_b.j == table_a.j).drop(table_a.j)

c = table_c.join(table_a, (table_c.j == table_a.j)
                  & (table_c.k == table_a.k)) \
    .drop(table_a.j) \
    .drop(table_a.k)


b.show()
c.show()
{code}



> Wrong projection when join on columns with the same name which are derived from the same dataframe
> --------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-16869
>                 URL: https://issues.apache.org/jira/browse/SPARK-16869
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.0
>            Reporter: GUAN Hao
>
> I have to DataFrames, both contain a column named *i* which are derived from a same DataFrame (join).
> {code}
> b
> +---+---+---+---+
> |  j|  p|  i|  k|
> +---+---+---+---+
> |  3|  2|  3|  3|
> |  2|  1|  2|  2|
> +---+---+---+---+
> c
> +---+---+---+---+
> |  j|  k|  q|  i|
> +---+---+---+---+
> |  1|  1|  0|  1|
> |  2|  2|  1|  2|
> +---+---+---+---+
> {code}
> The result of OUTER join of two DataFrames above is:
> {code}
> i = colaesce(b.i, c.i)
> +----+----+----+---+---+----+----+
> | b_i| c_i|   i|  j|  k|   p|   q|
> +----+----+----+---+---+----+----+
> |   2|   2|   2|  2|  2|   1|   1|
> |null|   1|   1|  1|  1|null|   0|
> |   3|null|   3|  3|  3|   2|null|
> +----+----+----+---+---+----+----+
> {code}
> However, what I got is:
> {code}
> +----+----+----+---+---+----+----+
> | b_i| c_i|   i|  j|  k|   p|   q|
> +----+----+----+---+---+----+----+
> |   2|   2|   2|  2|  2|   1|   1|
> |null|null|null|  1|  1|null|   0|
> |   3|   3|   3|  3|  3|   2|null|
> +----+----+----+---+---+----+----+
> {code}
> {code}
> == Physical Plan ==
> *Project [i#0L AS b_i#146L, i#0L AS c_i#147L, coalesce(i#0L, i#0L) AS i#148L, coalesce(j#12L, j#21L) AS j#149L, coalesce(k#2L, k#22L) AS k#150L, p#13L, q#23L]
> +- SortMergeJoin [i#0L, j#12L, k#2L], [i#113L, j#21L, k#22L], FullOuter
> ....
> {code}
> As shown in the plan, columns {{b.i}} and {{c.i}} are correctly resolved to {{i#0L}} and {{i#113L}} correspondingly in the join condition part. However,
> in the projection part, both {{b.i}} and {{c.i}} are resolved to {{i#0L}}.
> Complete code to re-produce:
> {code}
> from pyspark import SparkContext, SQLContext
> from pyspark.sql import Row, functions
> sc = SparkContext()
> sqlContext = SQLContext(sc)
> data_a = sc.parallelize([
>     Row(i=1, j=1, k=1),
>     Row(i=2, j=2, k=2),
>     Row(i=3, j=3, k=3),
> ])
> table_a = sqlContext.createDataFrame(data_a)
> table_a.show()
> data_b = sc.parallelize([
>     Row(j=2, p=1),
>     Row(j=3, p=2),
> ])
> table_b = sqlContext.createDataFrame(data_b)
> table_b.show()
> data_c = sc.parallelize([
>     Row(j=1, k=1, q=0),
>     Row(j=2, k=2, q=1),
> ])
> table_c = sqlContext.createDataFrame(data_c)
> table_c.show()
> b = table_b.join(table_a, table_b.j == table_a.j).drop(table_a.j)
> c = table_c.join(table_a, (table_c.j == table_a.j)
>                   & (table_c.k == table_a.k)) \
>     .drop(table_a.j) \
>     .drop(table_a.k)
> b.show()
> c.show()
> result = b.join(c, (b.i == c.i)
>               & (b.j == c.j)
>               & (b.k == c.k), 'outer') \
>     .select(
>         b.i.alias('b_i'),
>         c.i.alias('c_i'),
>         functions.coalesce(b.i, c.i).alias('i'),
>         functions.coalesce(b.j, c.j).alias('j'),
>         functions.coalesce(b.k, c.k).alias('k'),
>         b.p,
>         c.q,
>     )
> result.explain()
> result.show()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org