You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/02/14 12:58:36 UTC

[spark] branch master updated: [SPARK-42431][CONNECT] Avoid calling `LogicalPlan.output` before analysis in `Union`

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cbcea0e3043 [SPARK-42431][CONNECT] Avoid calling `LogicalPlan.output` before analysis in `Union`
cbcea0e3043 is described below

commit cbcea0e304340cae5b7a822f468388e03dd4d46f
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Tue Feb 14 21:58:23 2023 +0900

    [SPARK-42431][CONNECT] Avoid calling `LogicalPlan.output` before analysis in `Union`
    
    ### What changes were proposed in this pull request?
    Avoid calling `output` before analysis;
    Avoid applying optimizer rules before analysis;
    Remove the usage of optimizer `CombineUnions`, since it may discard the `PLAN_ID_TAG`
    
    ### Why are the changes needed?
    it is not expected to calling `output` and apply optimizer rules before analysis
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    updated UT
    
    Closes #40008 from zhengruifeng/connect_analyze.
    
    Authored-by: Ruifeng Zheng <ru...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../sql/connect/planner/SparkConnectPlanner.scala  | 29 +++++++++++-----------
 .../connect/planner/SparkConnectPlannerSuite.scala |  2 +-
 2 files changed, 15 insertions(+), 16 deletions(-)

diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index d509a926cdd..b0b4db48a3b 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.{expressions, AliasIdentifier, FunctionIden
 import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, MultiAlias, UnresolvedAlias, UnresolvedAttribute, UnresolvedExtractValue, UnresolvedFunction, UnresolvedRegex, UnresolvedRelation, UnresolvedStar}
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException, ParserUtils}
 import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter, UsingJoin}
 import org.apache.spark.sql.catalyst.plans.logical
@@ -1237,37 +1236,37 @@ class SparkConnectPlanner(val session: SparkSession) {
   }
 
   private def transformSetOperation(u: proto.SetOperation): LogicalPlan = {
-    assert(u.hasLeftInput && u.hasRightInput, "Union must have 2 inputs")
+    if (!u.hasLeftInput || !u.hasRightInput) {
+      throw InvalidPlanInput("Set operation must have 2 inputs")
+    }
+    val leftPlan = transformRelation(u.getLeftInput)
+    val rightPlan = transformRelation(u.getRightInput)
+    val isAll = if (u.hasIsAll) u.getIsAll else false
 
     u.getSetOpType match {
       case proto.SetOperation.SetOpType.SET_OP_TYPE_EXCEPT =>
         if (u.getByName) {
           throw InvalidPlanInput("Except does not support union_by_name")
         }
-        Except(transformRelation(u.getLeftInput), transformRelation(u.getRightInput), u.getIsAll)
+        Except(leftPlan, rightPlan, isAll)
       case proto.SetOperation.SetOpType.SET_OP_TYPE_INTERSECT =>
         if (u.getByName) {
           throw InvalidPlanInput("Intersect does not support union_by_name")
         }
-        Intersect(
-          transformRelation(u.getLeftInput),
-          transformRelation(u.getRightInput),
-          u.getIsAll)
+        Intersect(leftPlan, rightPlan, isAll)
       case proto.SetOperation.SetOpType.SET_OP_TYPE_UNION =>
         if (!u.getByName && u.getAllowMissingColumns) {
           throw InvalidPlanInput(
             "UnionByName `allowMissingCol` can be true only if `byName` is true.")
         }
-        val combinedUnion = CombineUnions(
-          Union(
-            Seq(transformRelation(u.getLeftInput), transformRelation(u.getRightInput)),
-            byName = u.getByName,
-            allowMissingCol = u.getAllowMissingColumns))
-        if (u.getIsAll) {
-          combinedUnion
+        val union = Union(Seq(leftPlan, rightPlan), u.getByName, u.getAllowMissingColumns)
+        if (isAll) {
+          union
         } else {
-          logical.Deduplicate(combinedUnion.output, combinedUnion)
+          val analyzed = new QueryExecution(session, union).analyzed
+          Deduplicate(analyzed.output, analyzed)
         }
+
       case _ =>
         throw InvalidPlanInput(s"Unsupported set operation ${u.getSetOpTypeValue}")
     }
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
index d8baa182e5a..7470d539787 100644
--- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
@@ -190,7 +190,7 @@ class SparkConnectPlannerSuite extends SparkFunSuite with SparkConnectPlanTest {
   }
 
   test("Simple Union") {
-    intercept[AssertionError](
+    intercept[InvalidPlanInput](
       transform(proto.Relation.newBuilder.setSetOp(proto.SetOperation.newBuilder.build()).build))
     val union = proto.Relation.newBuilder
       .setSetOp(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org