You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/02/20 08:41:41 UTC

[spark] branch branch-3.4 updated: [SPARK-41959][SQL] Improve v1 writes with empty2null

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 5d617e347c3 [SPARK-41959][SQL] Improve v1 writes with empty2null
5d617e347c3 is described below

commit 5d617e347c358114b1cba9426dd854e68dcadeef
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Mon Feb 20 16:41:09 2023 +0800

    [SPARK-41959][SQL] Improve v1 writes with empty2null
    
    ### What changes were proposed in this pull request?
    
    Cleanup some unnecessary `Empty2Null` related code
    
    ### Why are the changes needed?
    
    V1Writes checked idempotency using WriteFiles, so it's unnecessary to check if empty2null if exists again.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    pass CI
    
    Closes #39475 from ulysses-you/SPARK-41959.
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 547737b82dfee7e800930fd91bf2761263f68881)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/execution/datasources/FileFormatWriter.scala     |  9 ++-------
 .../org/apache/spark/sql/execution/datasources/V1Writes.scala  | 10 ++--------
 2 files changed, 4 insertions(+), 15 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 2491c9d7754..8321b1fac71 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -206,13 +206,8 @@ object FileFormatWriter extends Logging {
       partitionColumns: Seq[Attribute],
       sortColumns: Seq[Attribute],
       orderingMatched: Boolean): Set[String] = {
-    val hasEmpty2Null = plan.exists(p => V1WritesUtils.hasEmptyToNull(p.expressions))
-    val empty2NullPlan = if (hasEmpty2Null) {
-      plan
-    } else {
-      val projectList = V1WritesUtils.convertEmptyToNull(plan.output, partitionColumns)
-      if (projectList.nonEmpty) ProjectExec(projectList, plan) else plan
-    }
+    val projectList = V1WritesUtils.convertEmptyToNull(plan.output, partitionColumns)
+    val empty2NullPlan = if (projectList.nonEmpty) ProjectExec(projectList, plan) else plan
 
     writeAndCommit(job, description, committer) {
       val (planToExecute, concurrentOutputWriterSpec) = if (orderingMatched) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
index b17d72b0f72..b1d2588ede6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
@@ -93,13 +93,8 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {
   }
 
   private def prepareQuery(write: V1WriteCommand, query: LogicalPlan): LogicalPlan = {
-    val hasEmpty2Null = query.exists(p => hasEmptyToNull(p.expressions))
-    val empty2NullPlan = if (hasEmpty2Null) {
-      query
-    } else {
-      val projectList = convertEmptyToNull(query.output, write.partitionColumns)
-      if (projectList.isEmpty) query else Project(projectList, query)
-    }
+    val projectList = convertEmptyToNull(query.output, write.partitionColumns)
+    val empty2NullPlan = if (projectList.isEmpty) query else Project(projectList, query)
     assert(empty2NullPlan.output.length == query.output.length)
     val attrMap = AttributeMap(query.output.zip(empty2NullPlan.output))
 
@@ -108,7 +103,6 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {
       case a: Attribute => attrMap.getOrElse(a, a)
     }.asInstanceOf[SortOrder])
     val outputOrdering = query.outputOrdering
-    // Check if the ordering is already matched to ensure the idempotency of the rule.
     val orderingMatched = isOrderingMatched(requiredOrdering, outputOrdering)
     if (orderingMatched) {
       empty2NullPlan


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org