You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/12/17 05:47:20 UTC
[spark] branch branch-2.3 updated: [SPARK-26352][SQL] join reorder
should not change the order of output attributes
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.3 by this push:
new 1576bd7 [SPARK-26352][SQL] join reorder should not change the order of output attributes
1576bd7 is described below
commit 1576bd7b5fa0f3d24c3fa9c99f5d87668e259115
Author: Kris Mok <re...@gmail.com>
AuthorDate: Mon Dec 17 13:41:20 2018 +0800
[SPARK-26352][SQL] join reorder should not change the order of output attributes
## What changes were proposed in this pull request?
The optimizer rule `org.apache.spark.sql.catalyst.optimizer.ReorderJoin` performs join reordering on inner joins. This was introduced from SPARK-12032 (https://github.com/apache/spark/pull/10073) in 2015-12.
After it had reordered the joins, though, it didn't check whether or not the output attribute order is still the same as before. Thus, it's possible to have a mismatch between the reordered output attributes order vs the schema that a DataFrame thinks it has.
The same problem exists in the CBO version of join reordering (`CostBasedJoinReorder`) too.
This can be demonstrated with the example:
```scala
spark.sql("create table table_a (x int, y int) using parquet")
spark.sql("create table table_b (i int, j int) using parquet")
spark.sql("create table table_c (a int, b int) using parquet")
val df = spark.sql("""
with df1 as (select * from table_a cross join table_b)
select * from df1 join table_c on a = x and b = i
""")
```
here's what the DataFrame thinks:
```
scala> df.printSchema
root
|-- x: integer (nullable = true)
|-- y: integer (nullable = true)
|-- i: integer (nullable = true)
|-- j: integer (nullable = true)
|-- a: integer (nullable = true)
|-- b: integer (nullable = true)
```
here's what the optimized plan thinks, after join reordering:
```
scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- ${a.name}: ${a.dataType.typeName}"))
|-- x: integer
|-- y: integer
|-- a: integer
|-- b: integer
|-- i: integer
|-- j: integer
```
If we exclude the `ReorderJoin` rule (using Spark 2.4's optimizer rule exclusion feature), it's back to normal:
```
scala> spark.conf.set("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.ReorderJoin")
scala> val df = spark.sql("with df1 as (select * from table_a cross join table_b) select * from df1 join table_c on a = x and b = i")
df: org.apache.spark.sql.DataFrame = [x: int, y: int ... 4 more fields]
scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- ${a.name}: ${a.dataType.typeName}"))
|-- x: integer
|-- y: integer
|-- i: integer
|-- j: integer
|-- a: integer
|-- b: integer
```
Note that this output attribute ordering problem leads to data corruption, and can manifest itself in various symptoms:
* Silently corrupting data, if the reordered columns happen to either have matching types or have sufficiently-compatible types (e.g. all fixed length primitive types are considered as "sufficiently compatible" in an `UnsafeRow`), then only the resulting data is going to be wrong but it might not trigger any alarms immediately. Or
* Weird Java-level exceptions like `java.lang.NegativeArraySizeException`, or even SIGSEGVs.
## How was this patch tested?
Added new unit test in `JoinReorderSuite` and new end-to-end test in `JoinSuite`.
Also made `JoinReorderSuite` and `StarJoinReorderSuite` assert more strongly on maintaining output attribute order.
Closes #23303 from rednaxelafx/fix-join-reorder.
Authored-by: Kris Mok <re...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 56448c662398f4c5319a337e6601450270a6a27c)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../catalyst/optimizer/CostBasedJoinReorder.scala | 10 ++++++
.../spark/sql/catalyst/optimizer/joins.scala | 12 +++++--
.../catalyst/optimizer/JoinOptimizationSuite.scala | 3 ++
.../sql/catalyst/optimizer/JoinReorderSuite.scala | 38 +++++++++++++++++++---
.../optimizer/StarJoinCostBasedReorderSuite.scala | 21 +++++++++++-
.../catalyst/optimizer/StarJoinReorderSuite.scala | 28 ++++++++++++++--
.../scala/org/apache/spark/sql/JoinSuite.scala | 14 ++++++++
7 files changed, 116 insertions(+), 10 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
index 064ca68..01634a9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
@@ -48,6 +48,7 @@ object CostBasedJoinReorder extends Rule[LogicalPlan] with PredicateHelper {
if projectList.forall(_.isInstanceOf[Attribute]) =>
reorder(p, p.output)
}
+
// After reordering is finished, convert OrderedJoin back to Join
result transformDown {
case OrderedJoin(left, right, jt, cond) => Join(left, right, jt, cond)
@@ -175,11 +176,20 @@ object JoinReorderDP extends PredicateHelper with Logging {
assert(topOutputSet == p.outputSet)
// Keep the same order of final output attributes.
p.copy(projectList = output)
+ case finalPlan if !sameOutput(finalPlan, output) =>
+ Project(output, finalPlan)
case finalPlan =>
finalPlan
}
}
+ private def sameOutput(plan: LogicalPlan, expectedOutput: Seq[Attribute]): Boolean = {
+ val thisOutput = plan.output
+ thisOutput.length == expectedOutput.length && thisOutput.zip(expectedOutput).forall {
+ case (a1, a2) => a1.semanticEquals(a2)
+ }
+ }
+
/** Find all possible plans at the next level, based on existing levels. */
private def searchLevel(
existingLevels: Seq[JoinPlanMap],
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index edbeaf2..fedef68 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -85,9 +85,9 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
}
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case ExtractFiltersAndInnerJoins(input, conditions)
+ case p @ ExtractFiltersAndInnerJoins(input, conditions)
if input.size > 2 && conditions.nonEmpty =>
- if (SQLConf.get.starSchemaDetection && !SQLConf.get.cboEnabled) {
+ val reordered = if (SQLConf.get.starSchemaDetection && !SQLConf.get.cboEnabled) {
val starJoinPlan = StarSchemaDetection.reorderStarJoins(input, conditions)
if (starJoinPlan.nonEmpty) {
val rest = input.filterNot(starJoinPlan.contains(_))
@@ -98,6 +98,14 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
} else {
createOrderedJoin(input, conditions)
}
+
+ if (p.sameOutput(reordered)) {
+ reordered
+ } else {
+ // Reordering the joins have changed the order of the columns.
+ // Inject a projection to make sure we restore to the expected ordering.
+ Project(p.output, reordered)
+ }
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
index ccd9d8d..e9438b2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
@@ -102,16 +102,19 @@ class JoinOptimizationSuite extends PlanTest {
x.join(y).join(z).where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr)),
x.join(z, condition = Some("x.b".attr === "z.b".attr))
.join(y, condition = Some("y.d".attr === "z.a".attr))
+ .select(Seq("x.a", "x.b", "x.c", "y.d", "z.a", "z.b", "z.c").map(_.attr): _*)
),
(
x.join(y, Cross).join(z, Cross)
.where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr)),
x.join(z, Cross, Some("x.b".attr === "z.b".attr))
.join(y, Cross, Some("y.d".attr === "z.a".attr))
+ .select(Seq("x.a", "x.b", "x.c", "y.d", "z.a", "z.b", "z.c").map(_.attr): _*)
),
(
x.join(y, Inner).join(z, Cross).where("x.b".attr === "z.a".attr),
x.join(z, Cross, Some("x.b".attr === "z.a".attr)).join(y, Inner)
+ .select(Seq("x.a", "x.b", "x.c", "y.d", "z.a", "z.b", "z.c").map(_.attr): _*)
)
)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
index 2fb587d..c8a4b6d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
-import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
-import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.{Cross, Inner, PlanTest}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan}
import org.apache.spark.sql.internal.SQLConf.{CBO_ENABLED, JOIN_REORDER_ENABLED}
@@ -133,7 +133,8 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase {
// the original order (t1 J t2) J t3.
val bestPlan =
t1.join(t3, Inner, Some(nameToAttr("t1.v-1-10") === nameToAttr("t3.v-1-100")))
- .join(t2, Inner, Some(nameToAttr("t1.k-1-2") === nameToAttr("t2.k-1-5")))
+ .join(t2, Inner, Some(nameToAttr("t1.k-1-2") === nameToAttr("t2.k-1-5")))
+ .select(outputsOf(t1, t2, t3): _*)
assertEqualPlans(originalPlan, bestPlan)
}
@@ -148,7 +149,9 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase {
val bestPlan =
t1.join(t3, Inner, Some(nameToAttr("t1.v-1-10") === nameToAttr("t3.v-1-100")))
.join(t2, Inner, Some(nameToAttr("t1.k-1-2") === nameToAttr("t2.k-1-5")))
+ .select(outputsOf(t1, t2, t3): _*) // this is redundant but we'll take it for now
.join(t4)
+ .select(outputsOf(t1, t2, t4, t3): _*)
assertEqualPlans(originalPlan, bestPlan)
}
@@ -211,6 +214,7 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase {
t1.join(t2, Inner, Some(nameToAttr("t1.k-1-2") === nameToAttr("t2.k-1-5")))
.join(t4.join(t3, Inner, Some(nameToAttr("t4.v-1-10") === nameToAttr("t3.v-1-100"))),
Inner, Some(nameToAttr("t1.k-1-2") === nameToAttr("t4.k-1-2")))
+ .select(outputsOf(t1, t4, t2, t3): _*)
assertEqualPlans(originalPlan, bestPlan)
}
@@ -228,6 +232,23 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase {
}
}
+ test("SPARK-26352: join reordering should not change the order of attributes") {
+ // This test case does not rely on CBO.
+ // It's similar to the test case above, but catches a reordering bug that the one above doesn't
+ val tab1 = LocalRelation('x.int, 'y.int)
+ val tab2 = LocalRelation('i.int, 'j.int)
+ val tab3 = LocalRelation('a.int, 'b.int)
+ val original =
+ tab1.join(tab2, Cross)
+ .join(tab3, Inner, Some('a === 'x && 'b === 'i))
+ val expected =
+ tab1.join(tab3, Inner, Some('a === 'x))
+ .join(tab2, Cross, Some('b === 'i))
+ .select(outputsOf(tab1, tab2, tab3): _*)
+
+ assertEqualPlans(original, expected)
+ }
+
test("reorder recursively") {
// Original order:
// Join
@@ -275,8 +296,17 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase {
private def assertEqualPlans(
originalPlan: LogicalPlan,
groundTruthBestPlan: LogicalPlan): Unit = {
- val optimized = Optimize.execute(originalPlan.analyze)
+ val analyzed = originalPlan.analyze
+ val optimized = Optimize.execute(analyzed)
val expected = groundTruthBestPlan.analyze
+
+ assert(analyzed.sameOutput(expected)) // if this fails, the expected plan itself is incorrect
+ assert(analyzed.sameOutput(optimized))
+
compareJoinOrder(optimized, expected)
}
+
+ private def outputsOf(plans: LogicalPlan*): Seq[Attribute] = {
+ plans.map(_.output).reduce(_ ++ _)
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
index ada6e2a..e678f25 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
@@ -250,6 +250,7 @@ class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBas
.join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk")))
.join(t2, Inner, Some(nameToAttr("f1_c2") === nameToAttr("t2_c1")))
.join(t1, Inner, Some(nameToAttr("f1_c1") === nameToAttr("t1_c1")))
+ .select(outputsOf(f1, t1, t2, d1, d2): _*)
assertEqualPlans(query, expected)
}
@@ -288,6 +289,7 @@ class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBas
.join(t3.join(t2, Inner, Some(nameToAttr("t2_c2") === nameToAttr("t3_c1"))), Inner,
Some(nameToAttr("d1_c2") === nameToAttr("t2_c1")))
.join(t1, Inner, Some(nameToAttr("t1_c1") === nameToAttr("f1_c1")))
+ .select(outputsOf(d1, t1, t2, f1, d2, t3): _*)
assertEqualPlans(query, expected)
}
@@ -329,6 +331,7 @@ class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBas
Some(nameToAttr("t3_c1") === nameToAttr("t4_c1")))
.join(t1.join(t2, Inner, Some(nameToAttr("t1_c1") === nameToAttr("t2_c1"))), Inner,
Some(nameToAttr("t1_c2") === nameToAttr("t4_c2")))
+ .select(outputsOf(d1, t1, t2, t3, t4, f1, d2): _*)
assertEqualPlans(query, expected)
}
@@ -379,6 +382,7 @@ class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBas
Some(nameToAttr("d3_c2") === nameToAttr("t1_c1")))
.join(t5.join(t6, Inner, Some(nameToAttr("t5_c2") === nameToAttr("t6_c2"))), Inner,
Some(nameToAttr("d2_c2") === nameToAttr("t5_c1")))
+ .select(outputsOf(d1, t3, t4, f1, d2, t5, t6, d3, t1, t2): _*)
assertEqualPlans(query, expected)
}
@@ -407,6 +411,7 @@ class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBas
f1.join(d3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("d3_pk")))
.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
.join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk")))
+ .select(outputsOf(d1, d2, f1, d3): _*)
assertEqualPlans(query, expected)
}
@@ -432,13 +437,27 @@ class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBas
f1.join(t3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("t3_c1")))
.join(t2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("t2_c1")))
.join(t1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("t1_c1")))
+ .select(outputsOf(t1, f1, t2, t3): _*)
assertEqualPlans(query, expected)
}
private def assertEqualPlans( plan1: LogicalPlan, plan2: LogicalPlan): Unit = {
- val optimized = Optimize.execute(plan1.analyze)
+ val analyzed = plan1.analyze
+ val optimized = Optimize.execute(analyzed)
val expected = plan2.analyze
+
+ assert(equivalentOutput(analyzed, expected)) // if this fails, the expected itself is incorrect
+ assert(equivalentOutput(analyzed, optimized))
+
compareJoinOrder(optimized, expected)
}
+
+ private def outputsOf(plans: LogicalPlan*): Seq[Attribute] = {
+ plans.map(_.output).reduce(_ ++ _)
+ }
+
+ private def equivalentOutput(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = {
+ normalizeExprIds(plan1).output == normalizeExprIds(plan2).output
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
index 777c563..c7e97b4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
@@ -201,6 +201,7 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase {
.join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk1")))
.join(d3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("d3_pk1")))
.join(s3, Inner, Some(nameToAttr("d3_fk1") === nameToAttr("s3_pk1")))
+ .select(outputsOf(d1, d2, f1, d3, s3): _*)
assertEqualPlans(query, expected)
}
@@ -239,6 +240,7 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase {
.join(d3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("d3_pk1")))
.join(d2, Inner, Some(nameToAttr("f1_fk2") < nameToAttr("d2_pk1")))
.join(s3, Inner, Some(nameToAttr("d3_fk1") === nameToAttr("s3_pk1")))
+ .select(outputsOf(d1, f1, d2, s3, d3): _*)
assertEqualPlans(query, expected)
}
@@ -274,7 +276,7 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase {
.join(d3, Inner, Some(nameToAttr("d3_fk1") === nameToAttr("s3_pk1")))
.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk1")))
.join(s3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("s3_c2")))
-
+ .select(outputsOf(d1, f1, d2, s3, d3): _*)
assertEqualPlans(query, expected)
}
@@ -311,6 +313,7 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase {
.join(d3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("d3_pk1")))
.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_c2")))
.join(s3, Inner, Some(nameToAttr("d3_fk1") < nameToAttr("s3_pk1")))
+ .select(outputsOf(d1, f1, d2, s3, d3): _*)
assertEqualPlans(query, expected)
}
@@ -414,6 +417,7 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase {
.join(d2.where(nameToAttr("d2_c2") === 2), Inner,
Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk1")))
.join(s3, Inner, Some(nameToAttr("f11_fk1") === nameToAttr("s3_pk1")))
+ .select(outputsOf(d1, f11, f1, d2, s3): _*)
assertEqualPlans(query, equivQuery)
}
@@ -449,6 +453,7 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase {
.join(d2.where(nameToAttr("d2_c2") === 2), Inner,
Some(nameToAttr("f1_fk2") === nameToAttr("d2_c4")))
.join(s3, Inner, Some(nameToAttr("d3_fk1") === nameToAttr("s3_pk1")))
+ .select(outputsOf(d1, d3, f1, d2, s3): _*)
assertEqualPlans(query, expected)
}
@@ -484,6 +489,7 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase {
.join(d2.where(nameToAttr("d2_c2") === 2), Inner,
Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk1")))
.join(s3, Inner, Some(nameToAttr("d3_fk1") === nameToAttr("s3_pk1")))
+ .select(outputsOf(d1, d3, f1, d2, s3): _*)
assertEqualPlans(query, expected)
}
@@ -518,6 +524,7 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase {
.join(d2.where(nameToAttr("d2_c2") === 2),
Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk1")))
.join(s3, Inner, Some(nameToAttr("d3_fk1") === nameToAttr("s3_pk1")))
+ .select(outputsOf(d1, d3, f1, d2, s3): _*)
assertEqualPlans(query, expected)
}
@@ -551,6 +558,7 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase {
.join(d3, Inner, Some(nameToAttr("f1_fk3") < nameToAttr("d3_pk1")))
.join(d2, Inner, Some(nameToAttr("f1_fk2") < nameToAttr("d2_pk1")))
.join(s3, Inner, Some(nameToAttr("d3_fk1") < nameToAttr("s3_pk1")))
+ .select(outputsOf(d1, d3, f1, d2, s3): _*)
assertEqualPlans(query, expected)
}
@@ -584,13 +592,27 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase {
.join(d3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("d3_pk1")))
.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk1")))
.join(s3, Inner, Some(nameToAttr("d3_fk1") === nameToAttr("s3_pk1")))
+ .select(outputsOf(d1, d3, f1, d2, s3): _*)
assertEqualPlans(query, expected)
}
- private def assertEqualPlans( plan1: LogicalPlan, plan2: LogicalPlan): Unit = {
- val optimized = Optimize.execute(plan1.analyze)
+ private def assertEqualPlans(plan1: LogicalPlan, plan2: LogicalPlan): Unit = {
+ val analyzed = plan1.analyze
+ val optimized = Optimize.execute(analyzed)
val expected = plan2.analyze
+
+ assert(equivalentOutput(analyzed, expected)) // if this fails, the expected itself is incorrect
+ assert(equivalentOutput(analyzed, optimized))
+
compareJoinOrder(optimized, expected)
}
+
+ private def outputsOf(plans: LogicalPlan*): Seq[Attribute] = {
+ plans.map(_.output).reduce(_ ++ _)
+ }
+
+ private def equivalentOutput(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = {
+ normalizeExprIds(plan1).output == normalizeExprIds(plan2).output
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 44767df..52fa22c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -893,4 +893,18 @@ class JoinSuite extends QueryTest with SharedSQLContext {
checkAnswer(res, Row(0, 0, 0))
}
}
+
+ test("SPARK-26352: join reordering should not change the order of columns") {
+ withTable("tab1", "tab2", "tab3") {
+ spark.sql("select 1 as x, 100 as y").write.saveAsTable("tab1")
+ spark.sql("select 42 as i, 200 as j").write.saveAsTable("tab2")
+ spark.sql("select 1 as a, 42 as b").write.saveAsTable("tab3")
+
+ val df = spark.sql("""
+ with tmp as (select * from tab1 cross join tab2)
+ select * from tmp join tab3 on a = x and b = i
+ """)
+ checkAnswer(df, Row(1, 100, 42, 200, 1, 42))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org