You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "chie hayashida (JIRA)" <ji...@apache.org> on 2017/01/07 08:13:58 UTC

[jira] [Commented] (SPARK-17154) Wrong result can be returned or AnalysisException can be thrown after self-join or similar operations

    [ https://issues.apache.org/jira/browse/SPARK-17154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15807077#comment-15807077 ] 

chie hayashida commented on SPARK-17154:
----------------------------------------

[~nsyca], [~cloud_fan], [~sarutak]
I have an example code below.

# Example 1

``` scala
scala> val df = Seq((1,1,1),(1,2,3),(1,4,5),(2,2,4),(2,5,7),(2,8,8)).toDF("id","value1","value2")
df: org.apache.spark.sql.DataFrame = [id: int, value1: int ... 1 more field]

scala> val df2 = df
df2: org.apache.spark.sql.DataFrame = [id: int, value1: int ... 1 more field]

scala> val df3 = df.join(df2,df("id") === df2("id") && df("value2") <= df2("value2"))
17/01/07 16:29:26 WARN Column: Constructing trivially true equals predicate, 'id#171 = id#171'. Perhaps you need to use aliases.
df3: org.apache.spark.sql.DataFrame = [id: int, value1: int ... 4 more fields]

scala> df3.show
+---+------+------+---+------+------+
| id|value1|value2| id|value1|value2|
+---+------+------+---+------+------+
|  1|     1|     1|  1|     4|     5|
|  1|     1|     1|  1|     2|     3|
|  1|     1|     1|  1|     1|     1|
|  1|     2|     3|  1|     4|     5|
|  1|     2|     3|  1|     2|     3|
|  1|     2|     3|  1|     1|     1|
|  1|     4|     5|  1|     4|     5|
|  1|     4|     5|  1|     2|     3|
|  1|     4|     5|  1|     1|     1|
|  2|     2|     4|  2|     8|     8|
|  2|     2|     4|  2|     5|     7|
|  2|     2|     4|  2|     2|     4|
|  2|     5|     7|  2|     8|     8|
|  2|     5|     7|  2|     5|     7|
|  2|     5|     7|  2|     2|     4|
|  2|     8|     8|  2|     8|     8|
|  2|     8|     8|  2|     5|     7|
|  2|     8|     8|  2|     2|     4|
+---+------+------+---+------+------+


scala> df3.explain
== Physical Plan ==
*BroadcastHashJoin [id#171], [id#178], Inner, BuildRight
:- LocalTableScan [id#171, value1#172, value2#173]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   +- LocalTableScan [id#178, value1#179, value2#180]
```

# Example2
```scala
scala> val df = Seq((1,1,1),(1,2,3),(1,4,5),(2,2,4),(2,5,7),(2,8,8)).toDF("id","value1","value2")
df: org.apache.spark.sql.DataFrame = [id: int, value1: int ... 1 more field]

scala> val df2 = df.select($"id".as("id2"),$"value1".as("value11"),$"value2".as("value22"))
df4: org.apache.spark.sql.DataFrame = [id2: int, value11: int ... 1 more field]

scala> val df3 = df.join(df2,df("id") === df2("id2") && df("value2") <= df2("value22"))
df5: org.apache.spark.sql.DataFrame = [id: int, value1: int ... 4 more fields]

scala> df3.show
+---+------+------+---+-------+-------+
| id|value1|value2|id2|value11|value22|
+---+------+------+---+-------+-------+
|  1|     1|     1|  1|      4|      5|
|  1|     1|     1|  1|      2|      3|
|  1|     1|     1|  1|      1|      1|
|  1|     2|     3|  1|      4|      5|
|  1|     2|     3|  1|      2|      3|
|  1|     4|     5|  1|      4|      5|
|  2|     2|     4|  2|      8|      8|
|  2|     2|     4|  2|      5|      7|
|  2|     2|     4|  2|      2|      4|
|  2|     5|     7|  2|      8|      8|
|  2|     5|     7|  2|      5|      7|
|  2|     8|     8|  2|      8|      8|
+---+------+------+---+-------+-------+

scala> df3.explain
== Physical Plan ==
*BroadcastHashJoin [id#171], [id2#243], Inner, BuildRight, (value2#173 <= value22#245)
:- LocalTableScan [id#171, value1#172, value2#173]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   +- LocalTableScan [id2#243, value11#244, value22#245]

```

The content of df3 are different between Example1 and Example2.

I think the reason of this is same as SPARK-17154.

In above case I understand result of Example1 is incollect and that of Example 2 is collect.
But this issue isn't trivial and some developer may overlook this buggy code, I think.
Permanent action should be taken for this issue, I think.

> Wrong result can be returned or AnalysisException can be thrown after self-join or similar operations
> -----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-17154
>                 URL: https://issues.apache.org/jira/browse/SPARK-17154
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.2, 2.0.0
>            Reporter: Kousuke Saruta
>         Attachments: Name-conflicts-2.pdf, Solution_Proposal_SPARK-17154.pdf
>
>
> When we join two DataFrames which are originated from a same DataFrame, operations to the joined DataFrame can fail.
> One reproducible  example is as follows.
> {code}
> val df = Seq(
>   (1, "a", "A"),
>   (2, "b", "B"),
>   (3, "c", "C"),
>   (4, "d", "D"),
>   (5, "e", "E")).toDF("col1", "col2", "col3")
>   val filtered = df.filter("col1 != 3").select("col1", "col2")
>   val joined = filtered.join(df, filtered("col1") === df("col1"), "inner")
>   val selected1 = joined.select(df("col3"))
> {code}
> In this case, AnalysisException is thrown.
> Another example is as follows.
> {code}
> val df = Seq(
>   (1, "a", "A"),
>   (2, "b", "B"),
>   (3, "c", "C"),
>   (4, "d", "D"),
>   (5, "e", "E")).toDF("col1", "col2", "col3")
>   val filtered = df.filter("col1 != 3").select("col1", "col2")
>   val rightOuterJoined = filtered.join(df, filtered("col1") === df("col1"), "right")
>   val selected2 = rightOuterJoined.select(df("col1"))
>   selected2.show
> {code}
> In this case, we will expect to get the answer like as follows.
> {code}
> 1
> 2
> 3
> 4
> 5
> {code}
> But the actual result is as follows.
> {code}
> 1
> 2
> null
> 4
> 5
> {code}
> The cause of the problems in the examples is that the logical plan related to the right side DataFrame and the expressions of its output are re-created in the analyzer (at ResolveReference rule) when a DataFrame has expressions which have a same exprId each other.
> Re-created expressions are equally to the original ones except exprId.
> This will happen when we do self-join or similar pattern operations.
> In the first example, df("col3") returns a Column which includes an expression and the expression have an exprId (say id1 here).
> After join, the expresion which the right side DataFrame (df) has is re-created and the old and new expressions are equally but exprId is renewed (say id2 for the new exprId here).
> Because of the mismatch of those exprIds, AnalysisException is thrown.
> In the second example, df("col1") returns a column and the expression contained in the column is assigned an exprId (say id3).
> On the other hand, a column returned by filtered("col1") has an expression which has the same exprId (id3).
> After join, the expressions in the right side DataFrame are re-created and the expression assigned id3 is no longer present in the right side but present in the left side.
> So, referring df("col1") to the joined DataFrame, we get col1 of right side which includes null.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org