You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/08/10 23:26:54 UTC
[spark] branch branch-3.0 updated: [SPARK-32528][SQL][TEST][3.0]
The analyze method should make sure the plan is analyzed
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 93eb567 [SPARK-32528][SQL][TEST][3.0] The analyze method should make sure the plan is analyzed
93eb567 is described below
commit 93eb5674348c00b89f0cf43123ec18ff6b90a27d
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Mon Aug 10 16:24:08 2020 -0700
[SPARK-32528][SQL][TEST][3.0] The analyze method should make sure the plan is analyzed
### What changes were proposed in this pull request?
backport https://github.com/apache/spark/pull/29349 to 3.0.
This PR updates the `analyze` method to make sure the plan can be resolved. It also fixes some miswritten optimizer tests.
### Why are the changes needed?
It's error-prone if the `analyze` method can return an unresolved plan.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
test only
Closes #29400 from cloud-fan/backport.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../apache/spark/sql/catalyst/dsl/package.scala | 7 ++-
.../sql/catalyst/analysis/AnalysisSuite.scala | 8 +++
.../BinaryComparisonSimplificationSuite.scala | 4 +-
.../catalyst/optimizer/FilterPushdownSuite.scala | 57 ++++++++--------------
.../sql/catalyst/optimizer/JoinReorderSuite.scala | 4 +-
.../PullupCorrelatedPredicatesSuite.scala | 3 +-
.../ReplaceNullWithFalseInPredicateSuite.scala | 5 +-
.../catalyst/optimizer/SimplifyCastsSuite.scala | 9 ++--
.../optimizer/SimplifyConditionalSuite.scala | 23 +++++----
.../catalyst/optimizer/StarJoinReorderSuite.scala | 19 +++++---
10 files changed, 68 insertions(+), 71 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 26f5bee..141bcfa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -418,8 +418,11 @@ package object dsl {
def distribute(exprs: Expression*)(n: Int): LogicalPlan =
RepartitionByExpression(exprs, logicalPlan, numPartitions = n)
- def analyze: LogicalPlan =
- EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(logicalPlan))
+ def analyze: LogicalPlan = {
+ val analyzed = analysis.SimpleAnalyzer.execute(logicalPlan)
+ analysis.SimpleAnalyzer.checkAnalysis(analyzed)
+ EliminateSubqueryAliases(analyzed)
+ }
def hint(name: String, parameters: Any*): LogicalPlan =
UnresolvedHint(name, parameters, logicalPlan)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 453a4e6..6ec341d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -25,6 +25,7 @@ import org.apache.log4j.Level
import org.scalatest.Matchers
import org.apache.spark.api.python.PythonEvalType
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.dsl.expressions._
@@ -46,6 +47,13 @@ import org.apache.spark.sql.types._
class AnalysisSuite extends AnalysisTest with Matchers {
import org.apache.spark.sql.catalyst.analysis.TestRelations._
+ test("fail for unresolved plan") {
+ intercept[AnalysisException] {
+ // `testRelation` does not have column `b`.
+ testRelation.select('b).analyze
+ }
+ }
+
test("union project *") {
val plan = (1 to 120)
.map(_ => testRelation)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala
index 9c71cc8..c026918 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala
@@ -119,9 +119,7 @@ class BinaryComparisonSimplificationSuite extends PlanTest with PredicateHelper
// GetStructField with different names are semantically equal; thus, `EqualTo(fieldA1, fieldA2)`
// will be optimized to `TrueLiteral` by `SimplifyBinaryComparison`.
- val originalQuery = nonNullableRelation
- .where(EqualTo(fieldA1, fieldA2))
- .analyze
+ val originalQuery = nonNullableRelation.where(EqualTo(fieldA1, fieldA2))
val optimized = Optimize.execute(originalQuery)
val correctAnswer = nonNullableRelation.analyze
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 70e29dc..50de3a4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.types.{BooleanType, IntegerType}
+import org.apache.spark.sql.types.IntegerType
import org.apache.spark.unsafe.types.CalendarInterval
class FilterPushdownSuite extends PlanTest {
@@ -664,14 +664,14 @@ class FilterPushdownSuite extends PlanTest {
val generator = Explode('c_arr)
val originalQuery = {
testRelationWithArrayType
- .generate(generator, alias = Some("arr"))
+ .generate(generator, alias = Some("arr"), outputNames = Seq("c"))
.where(('b >= 5) && ('c > 6))
}
val optimized = Optimize.execute(originalQuery.analyze)
val referenceResult = {
testRelationWithArrayType
.where('b >= 5)
- .generate(generator, alias = Some("arr"))
+ .generate(generator, alias = Some("arr"), outputNames = Seq("c"))
.where('c > 6).analyze
}
@@ -1135,49 +1135,32 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
}
- test("join condition pushdown: deterministic and non-deterministic") {
- val x = testRelation.subquery('x)
- val y = testRelation.subquery('y)
-
- // Verify that all conditions except the watermark touching condition are pushed down
- // by the optimizer and others are not.
- val originalQuery = x.join(y, condition = Some("x.a".attr === 5 && "y.a".attr === 5 &&
- "x.a".attr === Rand(10) && "y.b".attr === 5))
- val correctAnswer =
- x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && "y.b".attr === 5),
- condition = Some("x.a".attr === Rand(10)))
-
- // CheckAnalysis will ensure nondeterministic expressions not appear in join condition.
- // TODO support nondeterministic expressions in join condition.
- comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
- checkAnalysis = false)
- }
-
test("watermark pushdown: no pushdown on watermark attribute #1") {
val interval = new CalendarInterval(2, 2, 2000L)
+ val relation = LocalRelation(attrA, 'b.timestamp, attrC)
// Verify that all conditions except the watermark touching condition are pushed down
// by the optimizer and others are not.
- val originalQuery = EventTimeWatermark('b, interval, testRelation)
- .where('a === 5 && 'b === 10 && 'c === 5)
+ val originalQuery = EventTimeWatermark('b, interval, relation)
+ .where('a === 5 && 'b === new java.sql.Timestamp(0) && 'c === 5)
val correctAnswer = EventTimeWatermark(
- 'b, interval, testRelation.where('a === 5 && 'c === 5))
- .where('b === 10)
+ 'b, interval, relation.where('a === 5 && 'c === 5))
+ .where('b === new java.sql.Timestamp(0))
- comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
- checkAnalysis = false)
+ comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze)
}
test("watermark pushdown: no pushdown for nondeterministic filter") {
val interval = new CalendarInterval(2, 2, 2000L)
+ val relation = LocalRelation(attrA, attrB, 'c.timestamp)
// Verify that all conditions except the watermark touching condition are pushed down
// by the optimizer and others are not.
- val originalQuery = EventTimeWatermark('c, interval, testRelation)
- .where('a === 5 && 'b === Rand(10) && 'c === 5)
+ val originalQuery = EventTimeWatermark('c, interval, relation)
+ .where('a === 5 && 'b === Rand(10) && 'c === new java.sql.Timestamp(0))
val correctAnswer = EventTimeWatermark(
- 'c, interval, testRelation.where('a === 5))
- .where('b === Rand(10) && 'c === 5)
+ 'c, interval, relation.where('a === 5))
+ .where('b === Rand(10) && 'c === new java.sql.Timestamp(0))
comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
checkAnalysis = false)
@@ -1185,13 +1168,14 @@ class FilterPushdownSuite extends PlanTest {
test("watermark pushdown: full pushdown") {
val interval = new CalendarInterval(2, 2, 2000L)
+ val relation = LocalRelation(attrA, attrB, 'c.timestamp)
// Verify that all conditions except the watermark touching condition are pushed down
// by the optimizer and others are not.
- val originalQuery = EventTimeWatermark('c, interval, testRelation)
+ val originalQuery = EventTimeWatermark('c, interval, relation)
.where('a === 5 && 'b === 10)
val correctAnswer = EventTimeWatermark(
- 'c, interval, testRelation.where('a === 5 && 'b === 10))
+ 'c, interval, relation.where('a === 5 && 'b === 10))
comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
checkAnalysis = false)
@@ -1199,11 +1183,12 @@ class FilterPushdownSuite extends PlanTest {
test("watermark pushdown: no pushdown on watermark attribute #2") {
val interval = new CalendarInterval(2, 2, 2000L)
+ val relation = LocalRelation('a.timestamp, attrB, attrC)
- val originalQuery = EventTimeWatermark('a, interval, testRelation)
- .where('a === 5 && 'b === 10)
+ val originalQuery = EventTimeWatermark('a, interval, relation)
+ .where('a === new java.sql.Timestamp(0) && 'b === 10)
val correctAnswer = EventTimeWatermark(
- 'a, interval, testRelation.where('b === 10)).where('a === 5)
+ 'a, interval, relation.where('b === 10)).where('a === new java.sql.Timestamp(0))
comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
checkAnalysis = false)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
index f775500..36db2e2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
@@ -307,9 +307,9 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase {
val originalPlan2 =
t1.join(t2, Inner, Some(nameToAttr("t1.k-1-2") === nameToAttr("t2.k-1-5")))
.hint("broadcast")
- .join(t4, Inner, Some(nameToAttr("t4.v-1-10") === nameToAttr("t3.v-1-100")))
+ .join(t4, Inner, Some(nameToAttr("t1.k-1-2") === nameToAttr("t4.k-1-2")))
.hint("broadcast")
- .join(t3, Inner, Some(nameToAttr("t1.k-1-2") === nameToAttr("t4.k-1-2")))
+ .join(t3, Inner, Some(nameToAttr("t4.v-1-10") === nameToAttr("t3.v-1-100")))
assertEqualPlans(originalPlan2, originalPlan2)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala
index 2d86d5a..191edc8 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala
@@ -91,9 +91,8 @@ class PullupCorrelatedPredicatesSuite extends PlanTest {
.select(max('d))
val scalarSubquery =
testRelation
- .where(ScalarSubquery(subPlan))
+ .where(ScalarSubquery(subPlan) === 1)
.select('a).analyze
- assert(scalarSubquery.resolved)
val optimized = Optimize.execute(scalarSubquery)
val doubleOptimized = Optimize.execute(optimized)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicateSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicateSuite.scala
index c7f42f0..eb52c5b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicateSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicateSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.optimizer
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
@@ -50,10 +51,10 @@ class ReplaceNullWithFalseInPredicateSuite extends PlanTest {
}
test("Not expected type - replaceNullWithFalse") {
- val e = intercept[IllegalArgumentException] {
+ val e = intercept[AnalysisException] {
testFilter(originalCond = Literal(null, IntegerType), expectedCond = FalseLiteral)
}.getMessage
- assert(e.contains("but got the type `int` in `CAST(NULL AS INT)"))
+ assert(e.contains("'CAST(NULL AS INT)' of type int is not a boolean"))
}
test("replace null in branches of If") {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCastsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCastsSuite.scala
index 7b3f5b0..0ccf8ae 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCastsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCastsSuite.scala
@@ -42,7 +42,8 @@ class SimplifyCastsSuite extends PlanTest {
test("nullable element to non-nullable element array cast") {
val input = LocalRelation('a.array(ArrayType(IntegerType, true)))
- val plan = input.select('a.cast(ArrayType(IntegerType, false)).as("casted")).analyze
+ val attr = input.output.head
+ val plan = input.select(attr.cast(ArrayType(IntegerType, false)).as("casted"))
val optimized = Optimize.execute(plan)
// Though cast from `ArrayType(IntegerType, true)` to `ArrayType(IntegerType, false)` is not
// allowed, here we just ensure that `SimplifyCasts` rule respect the plan.
@@ -60,8 +61,9 @@ class SimplifyCastsSuite extends PlanTest {
test("nullable value map to non-nullable value map cast") {
val input = LocalRelation('m.map(MapType(StringType, StringType, true)))
- val plan = input.select('m.cast(MapType(StringType, StringType, false))
- .as("casted")).analyze
+ val attr = input.output.head
+ val plan = input.select(attr.cast(MapType(StringType, StringType, false))
+ .as("casted"))
val optimized = Optimize.execute(plan)
// Though cast from `MapType(StringType, StringType, true)` to
// `MapType(StringType, StringType, false)` is not allowed, here we just ensure that
@@ -69,4 +71,3 @@ class SimplifyCastsSuite extends PlanTest {
comparePlans(optimized, plan, checkAnalysis = false)
}
}
-
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala
index 8ad7c12..7b4ef54 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala
@@ -18,13 +18,14 @@
package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.types.{IntegerType, NullType}
+import org.apache.spark.sql.types.{BooleanType, IntegerType}
class SimplifyConditionalSuite extends PlanTest with PredicateHelper {
@@ -34,20 +35,18 @@ class SimplifyConditionalSuite extends PlanTest with PredicateHelper {
BooleanSimplification, ConstantFolding, SimplifyConditionals) :: Nil
}
+ private val relation = LocalRelation('a.int, 'b.int, 'c.boolean)
+
protected def assertEquivalent(e1: Expression, e2: Expression): Unit = {
- val correctAnswer = Project(Alias(e2, "out")() :: Nil, OneRowRelation()).analyze
- val actual = Optimize.execute(Project(Alias(e1, "out")() :: Nil, OneRowRelation()).analyze)
+ val correctAnswer = Project(Alias(e2, "out")() :: Nil, relation).analyze
+ val actual = Optimize.execute(Project(Alias(e1, "out")() :: Nil, relation).analyze)
comparePlans(actual, correctAnswer)
}
private val trueBranch = (TrueLiteral, Literal(5))
private val normalBranch = (NonFoldableLiteral(true), Literal(10))
private val unreachableBranch = (FalseLiteral, Literal(20))
- private val nullBranch = (Literal.create(null, NullType), Literal(30))
-
- val isNotNullCond = IsNotNull(UnresolvedAttribute(Seq("a")))
- val isNullCond = IsNull(UnresolvedAttribute("b"))
- val notCond = Not(UnresolvedAttribute("c"))
+ private val nullBranch = (Literal.create(null, BooleanType), Literal(30))
test("simplify if") {
assertEquivalent(
@@ -59,7 +58,7 @@ class SimplifyConditionalSuite extends PlanTest with PredicateHelper {
Literal(20))
assertEquivalent(
- If(Literal.create(null, NullType), Literal(10), Literal(20)),
+ If(Literal.create(null, BooleanType), Literal(10), Literal(20)),
Literal(20))
}
@@ -127,9 +126,9 @@ class SimplifyConditionalSuite extends PlanTest with PredicateHelper {
test("simplify CaseWhen if all the outputs are semantic equivalence") {
// When the conditions in `CaseWhen` are all deterministic, `CaseWhen` can be removed.
assertEquivalent(
- CaseWhen((isNotNullCond, Subtract(Literal(3), Literal(2))) ::
- (isNullCond, Literal(1)) ::
- (notCond, Add(Literal(6), Literal(-5))) ::
+ CaseWhen(('a.isNotNull, Subtract(Literal(3), Literal(2))) ::
+ ('b.isNull, Literal(1)) ::
+ (!'c, Add(Literal(6), Literal(-5))) ::
Nil,
Add(Literal(2), Literal(-1))),
Literal(1)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
index 10e970d..17dc958 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
@@ -226,7 +226,7 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase {
assertEqualPlans(query, expected)
}
- test("Test 3: Star join on a subset of dimensions since join column is not unique") {
+ test("Test 3: Star join on a subset of dimensions since join column is not unique") {
// Star join:
// (=) (=)
// d1 - f1 - d2
@@ -254,9 +254,9 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase {
val expected =
f1.join(d1.where(nameToAttr("d1_c2") === 2), Inner,
Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk1")))
- .join(d3, Inner, Some(nameToAttr("d3_fk1") === nameToAttr("s3_pk1")))
- .join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk1")))
- .join(s3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("s3_c2")))
+ .join(d3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("d3_pk1")))
+ .join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_c4")))
+ .join(s3, Inner, Some(nameToAttr("d3_fk1") === nameToAttr("s3_pk1")))
.select(outputsOf(d1, f1, d2, s3, d3): _*)
assertEqualPlans(query, expected)
@@ -316,20 +316,23 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase {
// Positional join reordering: d3_ns, f1, d1, d2, s3
// Star join reordering: empty
+ val d3_pk1 = d3_ns.output.find(_.name == "d3_pk1").get
+ val d3_fk1 = d3_ns.output.find(_.name == "d3_fk1").get
+
val query =
d3_ns.join(f1).join(d1).join(d2).join(s3)
.where((nameToAttr("f1_fk2") === nameToAttr("d2_pk1")) &&
(nameToAttr("d2_c2") === 2) &&
(nameToAttr("f1_fk1") === nameToAttr("d1_pk1")) &&
- (nameToAttr("f1_fk3") === nameToAttr("d3_pk1")) &&
- (nameToAttr("d3_fk1") === nameToAttr("s3_pk1")))
+ (nameToAttr("f1_fk3") === d3_pk1) &&
+ (d3_fk1 === nameToAttr("s3_pk1")))
val equivQuery =
- d3_ns.join(f1, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("d3_pk1")))
+ d3_ns.join(f1, Inner, Some(nameToAttr("f1_fk3") === d3_pk1))
.join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk1")))
.join(d2.where(nameToAttr("d2_c2") === 2), Inner,
Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk1")))
- .join(s3, Inner, Some(nameToAttr("d3_fk1") === nameToAttr("s3_pk1")))
+ .join(s3, Inner, Some(d3_fk1 === nameToAttr("s3_pk1")))
assertEqualPlans(query, equivQuery)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org