You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/02/15 11:32:25 UTC

[spark] branch branch-3.4 updated: [SPARK-42431][CONNECT][FOLLOWUP] Use `Distinct` to delay analysis for `Union`

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 835f9b7dcf3 [SPARK-42431][CONNECT][FOLLOWUP] Use `Distinct` to delay analysis for `Union`
835f9b7dcf3 is described below

commit 835f9b7dcf382d72ced9a92828e68e647c20c130
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Wed Feb 15 20:31:26 2023 +0900

    [SPARK-42431][CONNECT][FOLLOWUP] Use `Distinct` to delay analysis for `Union`
    
    ### What changes were proposed in this pull request?
    use `Distinct` instead of `Deduplicate`
    
    ### Why are the changes needed?
    to delay analysis, see https://github.com/apache/spark/pull/40008#discussion_r1106529796
    
    ### Does this PR introduce _any_ user-facing change?
    plan shown in `explain` may change
    
    ### How was this patch tested?
    updated UT
    
    Closes #40029 from zhengruifeng/connect_analyze_distinct.
    
    Authored-by: Ruifeng Zheng <ru...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit 28153bed56a198882fbfc9068b6bba6fe3be338a)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../spark/sql/connect/planner/SparkConnectPlanner.scala    |  3 +--
 .../spark/sql/connect/planner/SparkConnectProtoSuite.scala | 14 +++++++++-----
 2 files changed, 10 insertions(+), 7 deletions(-)

diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index b0b4db48a3b..57bd28c0c4f 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -1263,8 +1263,7 @@ class SparkConnectPlanner(val session: SparkSession) {
         if (isAll) {
           union
         } else {
-          val analyzed = new QueryExecution(session, union).analyzed
-          Deduplicate(analyzed.output, analyzed)
+          logical.Distinct(union)
         }
 
       case _ =>
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
index ca087f357cb..e94b6d137cd 100644
--- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
@@ -30,8 +30,7 @@ import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Row, SaveMode
 import org.apache.spark.sql.catalyst.analysis
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericInternalRow, UnsafeProjection}
 import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, PlanTest, RightOuter}
-import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{Distinct, LocalRelation, LogicalPlan}
 import org.apache.spark.sql.connect.dsl.MockRemoteSession
 import org.apache.spark.sql.connect.dsl.commands._
 import org.apache.spark.sql.connect.dsl.expressions._
@@ -370,7 +369,7 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
     comparePlans(connectPlan5, sparkPlan5)
 
     val connectPlan6 = connectTestRelation.union(connectTestRelation, isAll = false)
-    val sparkPlan6 = sparkTestRelation.union(sparkTestRelation).distinct()
+    val sparkPlan6 = Distinct(sparkTestRelation.union(sparkTestRelation).logicalPlan)
     comparePlans(connectPlan6, sparkPlan6)
 
     val connectPlan7 =
@@ -380,7 +379,7 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
 
     val connectPlan8 =
       connectTestRelation.union(connectTestRelation2, isAll = false, byName = true)
-    val sparkPlan8 = sparkTestRelation.unionByName(sparkTestRelation2).distinct()
+    val sparkPlan8 = Distinct(sparkTestRelation.unionByName(sparkTestRelation2).logicalPlan)
     comparePlans(connectPlan8, sparkPlan8)
   }
 
@@ -963,7 +962,12 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
 
   // Compares proto plan with DataFrame.
   private def comparePlans(connectPlan: proto.Relation, sparkPlan: DataFrame): Unit = {
+    comparePlans(connectPlan, sparkPlan.queryExecution.analyzed)
+  }
+
+  // Compares proto plan with LogicalPlan.
+  private def comparePlans(connectPlan: proto.Relation, sparkPlan: LogicalPlan): Unit = {
     val connectAnalyzed = analyzePlan(transform(connectPlan))
-    comparePlans(connectAnalyzed, sparkPlan.queryExecution.analyzed, false)
+    comparePlans(connectAnalyzed, sparkPlan, false)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org