You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/08/10 23:26:54 UTC

[spark] branch branch-3.0 updated: [SPARK-32528][SQL][TEST][3.0] The analyze method should make sure the plan is analyzed

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 93eb567  [SPARK-32528][SQL][TEST][3.0] The analyze method should make sure the plan is analyzed
93eb567 is described below

commit 93eb5674348c00b89f0cf43123ec18ff6b90a27d
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Mon Aug 10 16:24:08 2020 -0700

    [SPARK-32528][SQL][TEST][3.0] The analyze method should make sure the plan is analyzed
    
    ### What changes were proposed in this pull request?
    
    backport https://github.com/apache/spark/pull/29349 to 3.0.
    This PR updates the `analyze` method to make sure the plan can be resolved. It also fixes some miswritten optimizer tests.
    
    ### Why are the changes needed?
    
    It's error-prone if the `analyze` method can return an unresolved plan.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    test only
    
    Closes #29400 from cloud-fan/backport.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../apache/spark/sql/catalyst/dsl/package.scala    |  7 ++-
 .../sql/catalyst/analysis/AnalysisSuite.scala      |  8 +++
 .../BinaryComparisonSimplificationSuite.scala      |  4 +-
 .../catalyst/optimizer/FilterPushdownSuite.scala   | 57 ++++++++--------------
 .../sql/catalyst/optimizer/JoinReorderSuite.scala  |  4 +-
 .../PullupCorrelatedPredicatesSuite.scala          |  3 +-
 .../ReplaceNullWithFalseInPredicateSuite.scala     |  5 +-
 .../catalyst/optimizer/SimplifyCastsSuite.scala    |  9 ++--
 .../optimizer/SimplifyConditionalSuite.scala       | 23 +++++----
 .../catalyst/optimizer/StarJoinReorderSuite.scala  | 19 +++++---
 10 files changed, 68 insertions(+), 71 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 26f5bee..141bcfa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -418,8 +418,11 @@ package object dsl {
       def distribute(exprs: Expression*)(n: Int): LogicalPlan =
         RepartitionByExpression(exprs, logicalPlan, numPartitions = n)
 
-      def analyze: LogicalPlan =
-        EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(logicalPlan))
+      def analyze: LogicalPlan = {
+        val analyzed = analysis.SimpleAnalyzer.execute(logicalPlan)
+        analysis.SimpleAnalyzer.checkAnalysis(analyzed)
+        EliminateSubqueryAliases(analyzed)
+      }
 
       def hint(name: String, parameters: Any*): LogicalPlan =
         UnresolvedHint(name, parameters, logicalPlan)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 453a4e6..6ec341d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -25,6 +25,7 @@ import org.apache.log4j.Level
 import org.scalatest.Matchers
 
 import org.apache.spark.api.python.PythonEvalType
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.dsl.expressions._
@@ -46,6 +47,13 @@ import org.apache.spark.sql.types._
 class AnalysisSuite extends AnalysisTest with Matchers {
   import org.apache.spark.sql.catalyst.analysis.TestRelations._
 
+  test("fail for unresolved plan") {
+    intercept[AnalysisException] {
+      // `testRelation` does not have column `b`.
+      testRelation.select('b).analyze
+    }
+  }
+
   test("union project *") {
     val plan = (1 to 120)
       .map(_ => testRelation)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala
index 9c71cc8..c026918 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala
@@ -119,9 +119,7 @@ class BinaryComparisonSimplificationSuite extends PlanTest with PredicateHelper
 
     // GetStructField with different names are semantically equal; thus, `EqualTo(fieldA1, fieldA2)`
     // will be optimized to `TrueLiteral` by `SimplifyBinaryComparison`.
-    val originalQuery = nonNullableRelation
-        .where(EqualTo(fieldA1, fieldA2))
-        .analyze
+    val originalQuery = nonNullableRelation.where(EqualTo(fieldA1, fieldA2))
 
     val optimized = Optimize.execute(originalQuery)
     val correctAnswer = nonNullableRelation.analyze
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 70e29dc..50de3a4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.types.{BooleanType, IntegerType}
+import org.apache.spark.sql.types.IntegerType
 import org.apache.spark.unsafe.types.CalendarInterval
 
 class FilterPushdownSuite extends PlanTest {
@@ -664,14 +664,14 @@ class FilterPushdownSuite extends PlanTest {
     val generator = Explode('c_arr)
     val originalQuery = {
       testRelationWithArrayType
-        .generate(generator, alias = Some("arr"))
+        .generate(generator, alias = Some("arr"), outputNames = Seq("c"))
         .where(('b >= 5) && ('c > 6))
     }
     val optimized = Optimize.execute(originalQuery.analyze)
     val referenceResult = {
       testRelationWithArrayType
         .where('b >= 5)
-        .generate(generator, alias = Some("arr"))
+        .generate(generator, alias = Some("arr"), outputNames = Seq("c"))
         .where('c > 6).analyze
     }
 
@@ -1135,49 +1135,32 @@ class FilterPushdownSuite extends PlanTest {
     comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
   }
 
-  test("join condition pushdown: deterministic and non-deterministic") {
-    val x = testRelation.subquery('x)
-    val y = testRelation.subquery('y)
-
-    // Verify that all conditions except the watermark touching condition are pushed down
-    // by the optimizer and others are not.
-    val originalQuery = x.join(y, condition = Some("x.a".attr === 5 && "y.a".attr === 5 &&
-      "x.a".attr === Rand(10) && "y.b".attr === 5))
-    val correctAnswer =
-      x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && "y.b".attr === 5),
-        condition = Some("x.a".attr === Rand(10)))
-
-    // CheckAnalysis will ensure nondeterministic expressions not appear in join condition.
-    // TODO support nondeterministic expressions in join condition.
-    comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
-      checkAnalysis = false)
-  }
-
   test("watermark pushdown: no pushdown on watermark attribute #1") {
     val interval = new CalendarInterval(2, 2, 2000L)
+    val relation = LocalRelation(attrA, 'b.timestamp, attrC)
 
     // Verify that all conditions except the watermark touching condition are pushed down
     // by the optimizer and others are not.
-    val originalQuery = EventTimeWatermark('b, interval, testRelation)
-      .where('a === 5 && 'b === 10 && 'c === 5)
+    val originalQuery = EventTimeWatermark('b, interval, relation)
+      .where('a === 5 && 'b === new java.sql.Timestamp(0) && 'c === 5)
     val correctAnswer = EventTimeWatermark(
-      'b, interval, testRelation.where('a === 5 && 'c === 5))
-      .where('b === 10)
+      'b, interval, relation.where('a === 5 && 'c === 5))
+      .where('b === new java.sql.Timestamp(0))
 
-    comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
-      checkAnalysis = false)
+    comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze)
   }
 
   test("watermark pushdown: no pushdown for nondeterministic filter") {
     val interval = new CalendarInterval(2, 2, 2000L)
+    val relation = LocalRelation(attrA, attrB, 'c.timestamp)
 
     // Verify that all conditions except the watermark touching condition are pushed down
     // by the optimizer and others are not.
-    val originalQuery = EventTimeWatermark('c, interval, testRelation)
-      .where('a === 5 && 'b === Rand(10) && 'c === 5)
+    val originalQuery = EventTimeWatermark('c, interval, relation)
+      .where('a === 5 && 'b === Rand(10) && 'c === new java.sql.Timestamp(0))
     val correctAnswer = EventTimeWatermark(
-      'c, interval, testRelation.where('a === 5))
-      .where('b === Rand(10) && 'c === 5)
+      'c, interval, relation.where('a === 5))
+      .where('b === Rand(10) && 'c === new java.sql.Timestamp(0))
 
     comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
       checkAnalysis = false)
@@ -1185,13 +1168,14 @@ class FilterPushdownSuite extends PlanTest {
 
   test("watermark pushdown: full pushdown") {
     val interval = new CalendarInterval(2, 2, 2000L)
+    val relation = LocalRelation(attrA, attrB, 'c.timestamp)
 
     // Verify that all conditions except the watermark touching condition are pushed down
     // by the optimizer and others are not.
-    val originalQuery = EventTimeWatermark('c, interval, testRelation)
+    val originalQuery = EventTimeWatermark('c, interval, relation)
       .where('a === 5 && 'b === 10)
     val correctAnswer = EventTimeWatermark(
-      'c, interval, testRelation.where('a === 5 && 'b === 10))
+      'c, interval, relation.where('a === 5 && 'b === 10))
 
     comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
       checkAnalysis = false)
@@ -1199,11 +1183,12 @@ class FilterPushdownSuite extends PlanTest {
 
   test("watermark pushdown: no pushdown on watermark attribute #2") {
     val interval = new CalendarInterval(2, 2, 2000L)
+    val relation = LocalRelation('a.timestamp, attrB, attrC)
 
-    val originalQuery = EventTimeWatermark('a, interval, testRelation)
-      .where('a === 5 && 'b === 10)
+    val originalQuery = EventTimeWatermark('a, interval, relation)
+      .where('a === new java.sql.Timestamp(0) && 'b === 10)
     val correctAnswer = EventTimeWatermark(
-      'a, interval, testRelation.where('b === 10)).where('a === 5)
+      'a, interval, relation.where('b === 10)).where('a === new java.sql.Timestamp(0))
 
     comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
       checkAnalysis = false)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
index f775500..36db2e2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
@@ -307,9 +307,9 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase {
     val originalPlan2 =
       t1.join(t2, Inner, Some(nameToAttr("t1.k-1-2") === nameToAttr("t2.k-1-5")))
         .hint("broadcast")
-        .join(t4, Inner, Some(nameToAttr("t4.v-1-10") === nameToAttr("t3.v-1-100")))
+        .join(t4, Inner, Some(nameToAttr("t1.k-1-2") === nameToAttr("t4.k-1-2")))
         .hint("broadcast")
-        .join(t3, Inner, Some(nameToAttr("t1.k-1-2") === nameToAttr("t4.k-1-2")))
+        .join(t3, Inner, Some(nameToAttr("t4.v-1-10") === nameToAttr("t3.v-1-100")))
 
     assertEqualPlans(originalPlan2, originalPlan2)
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala
index 2d86d5a..191edc8 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala
@@ -91,9 +91,8 @@ class PullupCorrelatedPredicatesSuite extends PlanTest {
         .select(max('d))
     val scalarSubquery =
       testRelation
-        .where(ScalarSubquery(subPlan))
+        .where(ScalarSubquery(subPlan) === 1)
         .select('a).analyze
-    assert(scalarSubquery.resolved)
 
     val optimized = Optimize.execute(scalarSubquery)
     val doubleOptimized = Optimize.execute(optimized)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicateSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicateSuite.scala
index c7f42f0..eb52c5b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicateSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicateSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
@@ -50,10 +51,10 @@ class ReplaceNullWithFalseInPredicateSuite extends PlanTest {
   }
 
   test("Not expected type - replaceNullWithFalse") {
-    val e = intercept[IllegalArgumentException] {
+    val e = intercept[AnalysisException] {
       testFilter(originalCond = Literal(null, IntegerType), expectedCond = FalseLiteral)
     }.getMessage
-    assert(e.contains("but got the type `int` in `CAST(NULL AS INT)"))
+    assert(e.contains("'CAST(NULL AS INT)' of type int is not a boolean"))
   }
 
   test("replace null in branches of If") {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCastsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCastsSuite.scala
index 7b3f5b0..0ccf8ae 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCastsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCastsSuite.scala
@@ -42,7 +42,8 @@ class SimplifyCastsSuite extends PlanTest {
 
   test("nullable element to non-nullable element array cast") {
     val input = LocalRelation('a.array(ArrayType(IntegerType, true)))
-    val plan = input.select('a.cast(ArrayType(IntegerType, false)).as("casted")).analyze
+    val attr = input.output.head
+    val plan = input.select(attr.cast(ArrayType(IntegerType, false)).as("casted"))
     val optimized = Optimize.execute(plan)
     // Though cast from `ArrayType(IntegerType, true)` to `ArrayType(IntegerType, false)` is not
     // allowed, here we just ensure that `SimplifyCasts` rule respect the plan.
@@ -60,8 +61,9 @@ class SimplifyCastsSuite extends PlanTest {
 
   test("nullable value map to non-nullable value map cast") {
     val input = LocalRelation('m.map(MapType(StringType, StringType, true)))
-    val plan = input.select('m.cast(MapType(StringType, StringType, false))
-      .as("casted")).analyze
+    val attr = input.output.head
+    val plan = input.select(attr.cast(MapType(StringType, StringType, false))
+      .as("casted"))
     val optimized = Optimize.execute(plan)
     // Though cast from `MapType(StringType, StringType, true)` to
     // `MapType(StringType, StringType, false)` is not allowed, here we just ensure that
@@ -69,4 +71,3 @@ class SimplifyCastsSuite extends PlanTest {
     comparePlans(optimized, plan, checkAnalysis = false)
   }
 }
-
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala
index 8ad7c12..7b4ef54 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala
@@ -18,13 +18,14 @@
 package org.apache.spark.sql.catalyst.optimizer
 
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.types.{IntegerType, NullType}
+import org.apache.spark.sql.types.{BooleanType, IntegerType}
 
 
 class SimplifyConditionalSuite extends PlanTest with PredicateHelper {
@@ -34,20 +35,18 @@ class SimplifyConditionalSuite extends PlanTest with PredicateHelper {
       BooleanSimplification, ConstantFolding, SimplifyConditionals) :: Nil
   }
 
+  private val relation = LocalRelation('a.int, 'b.int, 'c.boolean)
+
   protected def assertEquivalent(e1: Expression, e2: Expression): Unit = {
-    val correctAnswer = Project(Alias(e2, "out")() :: Nil, OneRowRelation()).analyze
-    val actual = Optimize.execute(Project(Alias(e1, "out")() :: Nil, OneRowRelation()).analyze)
+    val correctAnswer = Project(Alias(e2, "out")() :: Nil, relation).analyze
+    val actual = Optimize.execute(Project(Alias(e1, "out")() :: Nil, relation).analyze)
     comparePlans(actual, correctAnswer)
   }
 
   private val trueBranch = (TrueLiteral, Literal(5))
   private val normalBranch = (NonFoldableLiteral(true), Literal(10))
   private val unreachableBranch = (FalseLiteral, Literal(20))
-  private val nullBranch = (Literal.create(null, NullType), Literal(30))
-
-  val isNotNullCond = IsNotNull(UnresolvedAttribute(Seq("a")))
-  val isNullCond = IsNull(UnresolvedAttribute("b"))
-  val notCond = Not(UnresolvedAttribute("c"))
+  private val nullBranch = (Literal.create(null, BooleanType), Literal(30))
 
   test("simplify if") {
     assertEquivalent(
@@ -59,7 +58,7 @@ class SimplifyConditionalSuite extends PlanTest with PredicateHelper {
       Literal(20))
 
     assertEquivalent(
-      If(Literal.create(null, NullType), Literal(10), Literal(20)),
+      If(Literal.create(null, BooleanType), Literal(10), Literal(20)),
       Literal(20))
   }
 
@@ -127,9 +126,9 @@ class SimplifyConditionalSuite extends PlanTest with PredicateHelper {
   test("simplify CaseWhen if all the outputs are semantic equivalence") {
     // When the conditions in `CaseWhen` are all deterministic, `CaseWhen` can be removed.
     assertEquivalent(
-      CaseWhen((isNotNullCond, Subtract(Literal(3), Literal(2))) ::
-        (isNullCond, Literal(1)) ::
-        (notCond, Add(Literal(6), Literal(-5))) ::
+      CaseWhen(('a.isNotNull, Subtract(Literal(3), Literal(2))) ::
+        ('b.isNull, Literal(1)) ::
+        (!'c, Add(Literal(6), Literal(-5))) ::
         Nil,
         Add(Literal(2), Literal(-1))),
       Literal(1)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
index 10e970d..17dc958 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
@@ -226,7 +226,7 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase {
     assertEqualPlans(query, expected)
   }
 
-  test("Test 3:  Star join on a subset of dimensions since join column is not unique") {
+  test("Test 3: Star join on a subset of dimensions since join column is not unique") {
     // Star join:
     //   (=)  (=)
     // d1 - f1 - d2
@@ -254,9 +254,9 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase {
     val expected =
       f1.join(d1.where(nameToAttr("d1_c2") === 2), Inner,
           Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk1")))
-        .join(d3, Inner, Some(nameToAttr("d3_fk1") === nameToAttr("s3_pk1")))
-        .join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk1")))
-        .join(s3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("s3_c2")))
+        .join(d3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("d3_pk1")))
+        .join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_c4")))
+        .join(s3, Inner, Some(nameToAttr("d3_fk1") === nameToAttr("s3_pk1")))
         .select(outputsOf(d1, f1, d2, s3, d3): _*)
 
     assertEqualPlans(query, expected)
@@ -316,20 +316,23 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase {
     // Positional join reordering: d3_ns, f1, d1, d2, s3
     // Star join reordering: empty
 
+    val d3_pk1 = d3_ns.output.find(_.name == "d3_pk1").get
+    val d3_fk1 = d3_ns.output.find(_.name == "d3_fk1").get
+
     val query =
       d3_ns.join(f1).join(d1).join(d2).join(s3)
         .where((nameToAttr("f1_fk2") === nameToAttr("d2_pk1")) &&
           (nameToAttr("d2_c2") === 2) &&
           (nameToAttr("f1_fk1") === nameToAttr("d1_pk1")) &&
-          (nameToAttr("f1_fk3") === nameToAttr("d3_pk1")) &&
-          (nameToAttr("d3_fk1") === nameToAttr("s3_pk1")))
+          (nameToAttr("f1_fk3") === d3_pk1) &&
+          (d3_fk1 === nameToAttr("s3_pk1")))
 
     val equivQuery =
-      d3_ns.join(f1, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("d3_pk1")))
+      d3_ns.join(f1, Inner, Some(nameToAttr("f1_fk3") === d3_pk1))
         .join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk1")))
         .join(d2.where(nameToAttr("d2_c2") === 2), Inner,
           Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk1")))
-        .join(s3, Inner, Some(nameToAttr("d3_fk1") === nameToAttr("s3_pk1")))
+        .join(s3, Inner, Some(d3_fk1 === nameToAttr("s3_pk1")))
 
     assertEqualPlans(query, equivQuery)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org