You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Takeshi Yamamuro (JIRA)" <ji...@apache.org> on 2018/06/16 09:45:00 UTC

[jira] [Resolved] (SPARK-24399) Reused Exchange is used where it should not be

     [ https://issues.apache.org/jira/browse/SPARK-24399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Takeshi Yamamuro resolved SPARK-24399.
--------------------------------------
       Resolution: Fixed
    Fix Version/s: 2.3.1

> Reused Exchange is used where it should not be
> ----------------------------------------------
>
>                 Key: SPARK-24399
>                 URL: https://issues.apache.org/jira/browse/SPARK-24399
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.0
>            Reporter: David Vrba
>            Priority: Critical
>              Labels: correctness
>             Fix For: 2.3.1
>
>
> Reused Exchange produces wrong result. Here is the code to reproduce the issue:
> {code:java}
>  
> import org.apache.spark.sql.functions.{sum, lit}
> import org.apache.spark.sql.expressions.Window
> val row1 = (1, 3, 4, 50)
> val row2 = (2, 2, 2, 250)
> val row3 = (3, 2, 4, 250)
> val row4 = (4, 3, 1, 350)
> val data = Seq(row1, row2, row3, row4)
> val df = data.toDF("id", "pFilter", "secondFilter", "metricToAgg").cache()
> val w = Window.partitionBy($"id")
> val firstUnionPart = df.withColumn("activity_sum", sum("metricToAgg").over(w))
>   .filter($"activity_sum" > 50)
>   .filter($"pFilter".isin(2, 3))
>   .agg(sum($"metricToAgg"))
>   .withColumn("t", lit("first_union_part"))
> val secondUnionPart = df.withColumn("activity_sum",sum("metricToAgg").over(w))
>   .filter($"activity_sum" > 50)
>   .filter($"secondFilter".isin(2, 3))
>   .agg(sum($"metricToAgg"))
>   .withColumn("t", lit("second_union_part"))
> val finalDF = firstUnionPart.union(secondUnionPart)
> finalDF.show()
> +----------------+-----------------+ 
> |sum(metricToAgg)| t               | 
> +----------------+-----------------+ 
> | 850            | first_union_part| 
> | 850            |second_union_part| 
> +----------------+-----------------+
> {code}
>  
> The second row is wrong, it should be 250, instead of 850, which you can see if you show both unionParts separately:
> {code:java}
> firstUnionPart.show() 
> +----------------+----------------+ 
> |sum(metricToAgg)|               t| 
> +----------------+----------------+ 
> |             850|first_union_part| 
> +----------------+----------------+
> secondUnionPart.show()
> +----------------+-----------------+
> |sum(metricToAgg)|                t|
> +----------------+-----------------+
> |             250|second_union_part|
> +----------------+-----------------+{code}
>  
> The ReusedExchange replaced the part of the query plan in the second branch of the union by the query plan from the first branch as you can see from explain() function.
> I did some inspection and it appears that both sub-plans have the same canonicalized plans and therefore the ReusedExchange takes place. But I don't think they should have the same canonicalized plan, since according to the notes in the source code only plans that evaluate to the same result can have same canonicalized plans. And the two sub-plans in this query lead in principle to different results, because in the second union there is filter on different column than in the first union.
>  
> Interesting think happens when we change the name of the second column from "pFilter" to "kFilter". In this case query works fine and produces correct result, as you can see here:
> {code:java}
> import org.apache.spark.sql.functions.{sum, lit}
> import org.apache.spark.sql.expressions.Window
> val row1 = (1, 3, 4, 50)
> val row2 = (2, 2, 2, 250)
> val row3 = (3, 2, 4, 250)
> val row4 = (4, 3, 1, 350)
> val data = Seq(row1, row2, row3, row4)
> val df = data.toDF("id", "kFilter", "secondFilter", "metricToAgg").cache()
> val w = Window.partitionBy($"id")
> val firstUnionPart = df.withColumn("activity_sum", sum("metricToAgg").over(w))
>   .filter($"activity_sum" > 50)
>   .filter($"kFilter".isin(2, 3))
>   .agg(sum($"metricToAgg"))
>   .withColumn("t", lit("first_union_part"))
> val secondUnionPart = df.withColumn("activity_sum",sum("metricToAgg").over(w))
>   .filter($"activity_sum" > 50)
>   .filter($"secondFilter".isin(2, 3))
>   .agg(sum($"metricToAgg"))
>   .withColumn("t", lit("second_union_part"))
> val finalDF = firstUnionPart.union(secondUnionPart)
> finalDF.show()
> +----------------+-----------------+
> |sum(metricToAgg)|                t|
> +----------------+-----------------+
> |             850| first_union_part|
> |             250|second_union_part|
> +----------------+-----------------+{code}
>  
> The result is now correct and the only think we changed is a name of one column. The ReusedExchange does not happen here and I checked that the canonicalized plans now really differ.
>  
> The key points to reproduce this bug are:
>  # Use union (or some operator with multiple branches)
>  # Use cache to have InMemoryTableScan
>  # Use operator that forces Exchange in the plan (in this case window function call)
>  # Use column names that will have specific alphabetical order
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org