You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Eric Liang (JIRA)" <ji...@apache.org> on 2016/07/30 22:01:20 UTC
[jira] [Created] (SPARK-16818) Exchange reuse incorrectly reuses
scans over different sets of partitions
Eric Liang created SPARK-16818:
----------------------------------
Summary: Exchange reuse incorrectly reuses scans over different sets of partitions
Key: SPARK-16818
URL: https://issues.apache.org/jira/browse/SPARK-16818
Project: Spark
Issue Type: Bug
Affects Versions: 2.0.0
Reporter: Eric Liang
Priority: Critical
This happens because the file scan operator does not take into account partition pruning in its implementation of `sameResult()`. As a result, executions may be incorrect on self-joins over the same base file relation. Here's a minimal test case to reproduce:
{code}
spark.conf.set("spark.sql.exchange.reuse", true) // defaults to true in 2.0
withTempPath { path =>
val tempDir = path.getCanonicalPath
spark.range(10)
.selectExpr("id % 2 as a", "id % 3 as b", "id as c")
.write
.partitionBy("a")
.parquet(tempDir)
val df = spark.read.parquet(tempDir)
val df1 = df.where("a = 0").groupBy("b").agg("c" -> "sum")
val df2 = df.where("a = 1").groupBy("b").agg("c" -> "sum")
checkAnswer(df1.join(df2, "b"), Row(0, 6, 12) :: Row(1, 4, 8) :: Row(2, 10, 5) :: Nil)
{code}
When exchange reuse is on, the result is
{code}
+---+------+------+
| b|sum(c)|sum(c)|
+---+------+------+
| 0| 6| 6|
| 1| 4| 4|
| 2| 10| 10|
+---+------+------+
{code}
The correct result is
{code}
+---+------+------+
| b|sum(c)|sum(c)|
+---+------+------+
| 0| 6| 12|
| 1| 4| 8|
| 2| 10| 5|
+---+------+------+
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org