You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/02/20 08:41:41 UTC
[spark] branch branch-3.4 updated: [SPARK-41959][SQL] Improve v1 writes with empty2null
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 5d617e347c3 [SPARK-41959][SQL] Improve v1 writes with empty2null
5d617e347c3 is described below
commit 5d617e347c358114b1cba9426dd854e68dcadeef
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Mon Feb 20 16:41:09 2023 +0800
[SPARK-41959][SQL] Improve v1 writes with empty2null
### What changes were proposed in this pull request?
Cleanup some unnecessary `Empty2Null` related code
### Why are the changes needed?
V1Writes checked idempotency using WriteFiles, so it's unnecessary to check if empty2null if exists again.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
pass CI
Closes #39475 from ulysses-you/SPARK-41959.
Authored-by: ulysses-you <ul...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 547737b82dfee7e800930fd91bf2761263f68881)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/execution/datasources/FileFormatWriter.scala | 9 ++-------
.../org/apache/spark/sql/execution/datasources/V1Writes.scala | 10 ++--------
2 files changed, 4 insertions(+), 15 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 2491c9d7754..8321b1fac71 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -206,13 +206,8 @@ object FileFormatWriter extends Logging {
partitionColumns: Seq[Attribute],
sortColumns: Seq[Attribute],
orderingMatched: Boolean): Set[String] = {
- val hasEmpty2Null = plan.exists(p => V1WritesUtils.hasEmptyToNull(p.expressions))
- val empty2NullPlan = if (hasEmpty2Null) {
- plan
- } else {
- val projectList = V1WritesUtils.convertEmptyToNull(plan.output, partitionColumns)
- if (projectList.nonEmpty) ProjectExec(projectList, plan) else plan
- }
+ val projectList = V1WritesUtils.convertEmptyToNull(plan.output, partitionColumns)
+ val empty2NullPlan = if (projectList.nonEmpty) ProjectExec(projectList, plan) else plan
writeAndCommit(job, description, committer) {
val (planToExecute, concurrentOutputWriterSpec) = if (orderingMatched) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
index b17d72b0f72..b1d2588ede6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
@@ -93,13 +93,8 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {
}
private def prepareQuery(write: V1WriteCommand, query: LogicalPlan): LogicalPlan = {
- val hasEmpty2Null = query.exists(p => hasEmptyToNull(p.expressions))
- val empty2NullPlan = if (hasEmpty2Null) {
- query
- } else {
- val projectList = convertEmptyToNull(query.output, write.partitionColumns)
- if (projectList.isEmpty) query else Project(projectList, query)
- }
+ val projectList = convertEmptyToNull(query.output, write.partitionColumns)
+ val empty2NullPlan = if (projectList.isEmpty) query else Project(projectList, query)
assert(empty2NullPlan.output.length == query.output.length)
val attrMap = AttributeMap(query.output.zip(empty2NullPlan.output))
@@ -108,7 +103,6 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {
case a: Attribute => attrMap.getOrElse(a, a)
}.asInstanceOf[SortOrder])
val outputOrdering = query.outputOrdering
- // Check if the ordering is already matched to ensure the idempotency of the rule.
val orderingMatched = isOrderingMatched(requiredOrdering, outputOrdering)
if (orderingMatched) {
empty2NullPlan
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org