You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Miguel Molina (Jira)" <ji...@apache.org> on 2019/10/22 10:09:00 UTC

[jira] [Created] (SPARK-29549) Union of DataSourceV2 datasources leads to duplicated results

Miguel Molina created SPARK-29549:
-------------------------------------

             Summary: Union of DataSourceV2 datasources leads to duplicated results
                 Key: SPARK-29549
                 URL: https://issues.apache.org/jira/browse/SPARK-29549
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.3.4, 2.3.3, 2.3.2, 2.3.1, 2.3.0
            Reporter: Miguel Molina


Hello!

I've discovered that when two DataSourceV2 data frames in a query of the exact same shape are joined and there is an aggregation in the query, only the first results are used. The rest get removed by the ReuseExchange rule and reuse the results of the first data frame, leading to N times the first data frame results.

 

I've put together a repository with an example project where this can be reproduced: [https://github.com/erizocosmico/spark-union-issue]

 

Basically, doing this:

 
{code:java}
val products = spark.sql("SELECT name, COUNT(*) as count FROM products GROUP BY name")
val users = spark.sql("SELECT name, COUNT(*) as count FROM users GROUP BY name")

products.union(users)
 .select("name")
 .show(truncate = false, numRows = 50){code}
 

 

Where products is:
{noformat}
+---------+---+
|name |id |
+---------+---+
|candy |1 |
|chocolate|2 |
|milk |3 |
|cinnamon |4 |
|pizza |5 |
|pineapple|6 |
+---------+---+{noformat}
And users is:
{noformat}
+-------+---+
|name |id |
+-------+---+
|andy |1 |
|alice |2 |
|mike |3 |
|mariah |4 |
|eleanor|5 |
|matthew|6 |
+-------+---+ {noformat}
 

Results are incorrect:
{noformat}
+---------+
|name |
+---------+
|candy |
|pizza |
|chocolate|
|cinnamon |
|pineapple|
|milk |
|candy |
|pizza |
|chocolate|
|cinnamon |
|pineapple|
|milk |
+---------+{noformat}
 

This is the plan explained:

 
{noformat}
== Parsed Logical Plan ==
'Project [unresolvedalias('name, None)]
+- AnalysisBarrier
 +- Union
 :- Aggregate [name#0], [name#0, count(1) AS count#8L]
 : +- SubqueryAlias products
 : +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
 +- Aggregate [name#4], [name#4, count(1) AS count#12L]
 +- SubqueryAlias users
 +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], [alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6]))
== Analyzed Logical Plan ==
name: string
Project [name#0]
+- Union
 :- Aggregate [name#0], [name#0, count(1) AS count#8L]
 : +- SubqueryAlias products
 : +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
 +- Aggregate [name#4], [name#4, count(1) AS count#12L]
 +- SubqueryAlias users
 +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], [alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6]))
== Optimized Logical Plan ==
Union
:- Aggregate [name#0], [name#0]
: +- Project [name#0]
: +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
+- Aggregate [name#4], [name#4]
 +- Project [name#4]
 +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], [alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6]))
== Physical Plan ==
Union
:- *(2) HashAggregate(keys=[name#0], functions=[], output=[name#0])
: +- Exchange hashpartitioning(name#0, 200)
: +- *(1) HashAggregate(keys=[name#0], functions=[], output=[name#0])
: +- *(1) Project [name#0]
: +- *(1) DataSourceV2Scan [name#0, id#1], DefaultReader(List([candy,1], [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
+- *(4) HashAggregate(keys=[name#4], functions=[], output=[name#4])
 +- ReusedExchange [name#4], Exchange hashpartitioning(name#0, 200)
{noformat}
 

 

In the physical plan, the first exchange is reused, but it shouldn't be because both sources are not the same.

 
{noformat}
== Physical Plan ==
Union
:- *(2) HashAggregate(keys=[name#0], functions=[], output=[name#0])
: +- Exchange hashpartitioning(name#0, 200)
: +- *(1) HashAggregate(keys=[name#0], functions=[], output=[name#0])
: +- *(1) Project [name#0]
: +- *(1) DataSourceV2Scan [name#0, id#1], DefaultReader(List([candy,1], [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
+- *(4) HashAggregate(keys=[name#4], functions=[], output=[name#4])
 +- ReusedExchange [name#4], Exchange hashpartitioning(name#0, 200){noformat}
 

This seems to be fixed in 2.4.x, but affects, 2.3.x versions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org