You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/04/02 08:36:14 UTC

[spark] branch branch-3.0 updated: [SPARK-31321][SQL] Remove SaveMode check in v2 FileWriteBuilder

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 2386a15  [SPARK-31321][SQL] Remove SaveMode check in v2 FileWriteBuilder
2386a15 is described below

commit 2386a15344e43166e440753fbad5490c8aee6473
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Thu Apr 2 08:34:36 2020 +0000

    [SPARK-31321][SQL] Remove SaveMode check in v2 FileWriteBuilder
    
    ### What changes were proposed in this pull request?
    
    The `SaveMode` is resolved before we create `FileWriteBuilder` to build `BatchWrite`.
    
    In https://github.com/apache/spark/pull/25876, we removed save mode for DSV2 from DataFrameWriter. So that the `mode` method is never used which makes `validateInputs` fail determinately without `mode` set.
    
    ### Why are the changes needed?
    rm dead code.
    
    ### Does this PR introduce any user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests.
    
    Closes #28090 from yaooqinn/SPARK-31321.
    
    Authored-by: Kent Yao <ya...@hotmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 1ce584f6b78a109d63d1a5ea8b721422f9d8d6e0)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../datasources/v2/FileWriteBuilder.scala          | 32 ++--------------------
 1 file changed, 3 insertions(+), 29 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
index d519832..cd62ee7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
@@ -16,7 +16,6 @@
  */
 package org.apache.spark.sql.execution.datasources.v2
 
-import java.io.IOException
 import java.util.UUID
 
 import scala.collection.JavaConverters._
@@ -27,7 +26,7 @@ import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
 
 import org.apache.spark.internal.io.FileCommitProtocol
-import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, WriteBuilder}
@@ -46,12 +45,6 @@ abstract class FileWriteBuilder(
   private val schema = info.schema()
   private val queryId = info.queryId()
   private val options = info.options()
-  private var mode: SaveMode = _
-
-  def mode(mode: SaveMode): WriteBuilder = {
-    this.mode = mode
-    this
-  }
 
   override def buildForBatch(): BatchWrite = {
     val sparkSession = SparkSession.active
@@ -68,26 +61,8 @@ abstract class FileWriteBuilder(
     lazy val description =
       createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, options.asScala.toMap)
 
-    val fs = path.getFileSystem(hadoopConf)
-    mode match {
-      case SaveMode.ErrorIfExists if fs.exists(path) =>
-        val qualifiedOutputPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
-        throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
-
-      case SaveMode.Ignore if fs.exists(path) =>
-        null
-
-      case SaveMode.Overwrite =>
-        if (fs.exists(path) && !committer.deleteWithJob(fs, path, true)) {
-          throw new IOException(s"Unable to clear directory $path prior to writing to it")
-        }
-        committer.setupJob(job)
-        new FileBatchWrite(job, description, committer)
-
-      case _ =>
-        committer.setupJob(job)
-        new FileBatchWrite(job, description, committer)
-    }
+    committer.setupJob(job)
+    new FileBatchWrite(job, description, committer)
   }
 
   /**
@@ -104,7 +79,6 @@ abstract class FileWriteBuilder(
   private def validateInputs(caseSensitiveAnalysis: Boolean): Unit = {
     assert(schema != null, "Missing input data schema")
     assert(queryId != null, "Missing query ID")
-    assert(mode != null, "Missing save mode")
 
     if (paths.length != 1) {
       throw new IllegalArgumentException("Expected exactly one path to be specified, but " +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org