You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/02/15 11:32:25 UTC
[spark] branch branch-3.4 updated: [SPARK-42431][CONNECT][FOLLOWUP] Use `Distinct` to delay analysis for `Union`
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 835f9b7dcf3 [SPARK-42431][CONNECT][FOLLOWUP] Use `Distinct` to delay analysis for `Union`
835f9b7dcf3 is described below
commit 835f9b7dcf382d72ced9a92828e68e647c20c130
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Wed Feb 15 20:31:26 2023 +0900
[SPARK-42431][CONNECT][FOLLOWUP] Use `Distinct` to delay analysis for `Union`
### What changes were proposed in this pull request?
use `Distinct` instead of `Deduplicate`
### Why are the changes needed?
to delay analysis, see https://github.com/apache/spark/pull/40008#discussion_r1106529796
### Does this PR introduce _any_ user-facing change?
plan shown in `explain` may change
### How was this patch tested?
updated UT
Closes #40029 from zhengruifeng/connect_analyze_distinct.
Authored-by: Ruifeng Zheng <ru...@apache.org>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
(cherry picked from commit 28153bed56a198882fbfc9068b6bba6fe3be338a)
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../spark/sql/connect/planner/SparkConnectPlanner.scala | 3 +--
.../spark/sql/connect/planner/SparkConnectProtoSuite.scala | 14 +++++++++-----
2 files changed, 10 insertions(+), 7 deletions(-)
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index b0b4db48a3b..57bd28c0c4f 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -1263,8 +1263,7 @@ class SparkConnectPlanner(val session: SparkSession) {
if (isAll) {
union
} else {
- val analyzed = new QueryExecution(session, union).analyzed
- Deduplicate(analyzed.output, analyzed)
+ logical.Distinct(union)
}
case _ =>
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
index ca087f357cb..e94b6d137cd 100644
--- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
@@ -30,8 +30,7 @@ import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Row, SaveMode
import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericInternalRow, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, PlanTest, RightOuter}
-import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{Distinct, LocalRelation, LogicalPlan}
import org.apache.spark.sql.connect.dsl.MockRemoteSession
import org.apache.spark.sql.connect.dsl.commands._
import org.apache.spark.sql.connect.dsl.expressions._
@@ -370,7 +369,7 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
comparePlans(connectPlan5, sparkPlan5)
val connectPlan6 = connectTestRelation.union(connectTestRelation, isAll = false)
- val sparkPlan6 = sparkTestRelation.union(sparkTestRelation).distinct()
+ val sparkPlan6 = Distinct(sparkTestRelation.union(sparkTestRelation).logicalPlan)
comparePlans(connectPlan6, sparkPlan6)
val connectPlan7 =
@@ -380,7 +379,7 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
val connectPlan8 =
connectTestRelation.union(connectTestRelation2, isAll = false, byName = true)
- val sparkPlan8 = sparkTestRelation.unionByName(sparkTestRelation2).distinct()
+ val sparkPlan8 = Distinct(sparkTestRelation.unionByName(sparkTestRelation2).logicalPlan)
comparePlans(connectPlan8, sparkPlan8)
}
@@ -963,7 +962,12 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
// Compares proto plan with DataFrame.
private def comparePlans(connectPlan: proto.Relation, sparkPlan: DataFrame): Unit = {
+ comparePlans(connectPlan, sparkPlan.queryExecution.analyzed)
+ }
+
+ // Compares proto plan with LogicalPlan.
+ private def comparePlans(connectPlan: proto.Relation, sparkPlan: LogicalPlan): Unit = {
val connectAnalyzed = analyzePlan(transform(connectPlan))
- comparePlans(connectAnalyzed, sparkPlan.queryExecution.analyzed, false)
+ comparePlans(connectAnalyzed, sparkPlan, false)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org