You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/09/28 20:22:49 UTC
spark git commit: [SPARK-17673][SQL] Incorrect exchange reuse with
RowDataSourceScan
Repository: spark
Updated Branches:
refs/heads/master 46d1203bf -> a6cfa3f38
[SPARK-17673][SQL] Incorrect exchange reuse with RowDataSourceScan
## What changes were proposed in this pull request?
It seems the equality check for reuse of `RowDataSourceScanExec` nodes doesn't respect the output schema. This can cause self-joins or unions over the same underlying data source to return incorrect results if they select different fields.
## How was this patch tested?
New unit test passes after the fix.
Author: Eric Liang <ek...@databricks.com>
Closes #15273 from ericl/spark-17673.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6cfa3f3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6cfa3f3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6cfa3f3
Branch: refs/heads/master
Commit: a6cfa3f38bcf6ba154d5ed2a53748fbc90c8872a
Parents: 46d1203
Author: Eric Liang <ek...@databricks.com>
Authored: Wed Sep 28 13:22:45 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Sep 28 13:22:45 2016 -0700
----------------------------------------------------------------------
.../spark/sql/execution/datasources/DataSourceStrategy.scala | 4 ++++
.../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 8 ++++++++
2 files changed, 12 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/a6cfa3f3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 63f01c5..693b4c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -340,6 +340,8 @@ object DataSourceStrategy extends Strategy with Logging {
// `Filter`s or cannot be handled by `relation`.
val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
+ // These metadata values make scan plans uniquely identifiable for equality checking.
+ // TODO(SPARK-17701) using strings for equality checking is brittle
val metadata: Map[String, String] = {
val pairs = ArrayBuffer.empty[(String, String)]
@@ -350,6 +352,8 @@ object DataSourceStrategy extends Strategy with Logging {
}
pairs += ("PushedFilters" -> markedFilters.mkString("[", ", ", "]"))
}
+ pairs += ("ReadSchema" ->
+ StructType.fromAttributes(projects.map(_.toAttribute)).catalogString)
pairs.toMap
}
http://git-wip-us.apache.org/repos/asf/spark/blob/a6cfa3f3/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 10f15ca..c94cb3b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -791,4 +791,12 @@ class JDBCSuite extends SparkFunSuite
val schema = JdbcUtils.schemaString(df, "jdbc:mysql://localhost:3306/temp")
assert(schema.contains("`order` TEXT"))
}
+
+ test("SPARK-17673: Exchange reuse respects differences in output schema") {
+ val df = sql("SELECT * FROM inttypes WHERE a IS NOT NULL")
+ val df1 = df.groupBy("a").agg("c" -> "min")
+ val df2 = df.groupBy("a").agg("d" -> "min")
+ val res = df1.union(df2)
+ assert(res.distinct().count() == 2) // would be 1 if the exchange was incorrectly reused
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org