You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/05/24 17:46:06 UTC

[spark] branch master updated: [SPARK-26356][SQL] remove SaveMode from data source v2

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d318bf  [SPARK-26356][SQL] remove SaveMode from data source v2
7d318bf is described below

commit 7d318bfe907ab22b904d118e4ff4970af32b0e44
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Fri May 24 10:45:46 2019 -0700

    [SPARK-26356][SQL] remove SaveMode from data source v2
    
    ## What changes were proposed in this pull request?
    
    In data source v1, save mode specified in `DataFrameWriter` is passed to data source implementation directly, and each data source can define its own behavior about save mode. This is confusing and we want to get rid of save mode in data source v2.
    
    For data source v2, we expect data source to implement the `TableCatalog` API, and end-users use SQL(or the new write API described in [this doc](https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5ace0718#heading=h.e9v1af12g5zo)) to acess data sources. The SQL API has very clear semantic and we don't need save mode at all.
    
    However, for simple data sources that do not have table management (like a JIRA data source, a noop sink, etc.), it's not ideal to ask them to implement the `TableCatalog` API, and throw exception here and there.
    
    `TableProvider` API is created for simple data sources. It can only get tables, without any other table management methods. This means, it can only deal with existing tables.
    
    `TableProvider` fits well with `DataStreamReader` and `DataStreamWriter`, as they can only read/write existing tables. However, `TableProvider` doesn't fit `DataFrameWriter` well, as the save mode requires more than just get table. More specifically, `ErrorIfExists` mode needs to check if table exists, and create table. `Ignore` mode needs to check if table exists. When end-users specify `ErrorIfExists` or `Ignore` mode and write data to `TableProvider` via `DataFrameWriter`, Spark fa [...]
    
    The file source is in the middle of `TableProvider` and `TableCatalog`: it's simple but it can check table(path) exists and create table(path). That said, file source supports all the save modes.
    
    Currently file source implements `TableProvider`, and it's not working because `TableProvider` doesn't support `ErrorIfExists` and `Ignore` modes. Ideally we should create a new API for path-based data sources, but to unblock the work of file source v2 migration, this PR proposes to special-case file source v2 in `DataFrameWriter`, to make it work.
    
    This PR also removes `SaveMode` from data source v2, as now only the internal file source v2 needs it.
    
    ## How was this patch tested?
    
    existing tests
    
    Closes #24233 from cloud-fan/file.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: gatorsmile <ga...@gmail.com>
---
 .../apache/spark/sql/sources/v2/TableProvider.java |  4 ++
 .../sql/sources/v2/writer/SupportsSaveMode.java    | 26 --------
 .../spark/sql/sources/v2/writer/WriteBuilder.java  |  4 --
 .../org/apache/spark/sql/DataFrameWriter.scala     | 77 ++++++++++++----------
 .../datasources/noop/NoopDataSource.scala          |  6 +-
 .../datasources/v2/FileWriteBuilder.scala          |  7 +-
 .../datasources/v2/WriteToDataSourceV2Exec.scala   | 35 +++-------
 .../spark/sql/sources/v2/DataSourceV2Suite.scala   | 44 ++++---------
 .../sources/v2/FileDataSourceV2FallBackSuite.scala |  4 +-
 .../sql/sources/v2/SimpleWritableDataSource.scala  | 23 ++-----
 .../sql/test/DataFrameReaderWriterSuite.scala      | 72 ++++++++++++++++++--
 11 files changed, 147 insertions(+), 155 deletions(-)

diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java
index 04ad8fd..0e2eb9c 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java
@@ -26,6 +26,10 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
  * The base interface for v2 data sources which don't have a real catalog. Implementations must
  * have a public, 0-arg constructor.
  * <p>
+ * Note that, TableProvider can only apply data operations to existing tables, like read, append,
+ * delete, and overwrite. It does not support the operations that require metadata changes, like
+ * create/drop tables.
+ * <p>
  * The major responsibility of this interface is to return a {@link Table} for read/write.
  * </p>
  */
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsSaveMode.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsSaveMode.java
deleted file mode 100644
index c4295f2..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsSaveMode.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.writer;
-
-import org.apache.spark.sql.SaveMode;
-
-// A temporary mixin trait for `WriteBuilder` to support `SaveMode`. Will be removed before
-// Spark 3.0 when all the new write operators are finished. See SPARK-26356 for more details.
-public interface SupportsSaveMode extends WriteBuilder {
-  WriteBuilder mode(SaveMode mode);
-}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java
index d88b4a4..158066d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java
@@ -60,10 +60,6 @@ public interface WriteBuilder {
    * exception, data sources must overwrite this method to provide an implementation, if the
    * {@link Table} that creates this write returns {@link TableCapability#BATCH_WRITE} support in
    * its {@link Table#capabilities()}.
-   *
-   * Note that, the returned {@link BatchWrite} can be null if the implementation supports SaveMode,
-   * to indicate that no writing is needed. We can clean it up after removing
-   * {@link SupportsSaveMode}.
    */
   default BatchWrite buildForBatch() {
     throw new UnsupportedOperationException(getClass().getName() +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 18653b2..0c48ec9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -30,12 +30,11 @@ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable,
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation}
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, FileDataSourceV2, WriteToDataSourceV2}
+import org.apache.spark.sql.execution.datasources.v2._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.TableCapability._
-import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -56,13 +55,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
    * <li>`SaveMode.Overwrite`: overwrite the existing data.</li>
    * <li>`SaveMode.Append`: append the data.</li>
    * <li>`SaveMode.Ignore`: ignore the operation (i.e. no-op).</li>
-   * <li>`SaveMode.ErrorIfExists`: default option, throw an exception at runtime.</li>
+   * <li>`SaveMode.ErrorIfExists`: throw an exception at runtime.</li>
    * </ul>
+   * <p>
+   * When writing to data source v1, the default option is `ErrorIfExists`. When writing to data
+   * source v2, the default option is `Append`.
    *
    * @since 1.4.0
    */
   def mode(saveMode: SaveMode): DataFrameWriter[T] = {
-    this.mode = saveMode
+    this.mode = Some(saveMode)
     this
   }
 
@@ -78,15 +80,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
    * @since 1.4.0
    */
   def mode(saveMode: String): DataFrameWriter[T] = {
-    this.mode = saveMode.toLowerCase(Locale.ROOT) match {
-      case "overwrite" => SaveMode.Overwrite
-      case "append" => SaveMode.Append
-      case "ignore" => SaveMode.Ignore
-      case "error" | "errorifexists" | "default" => SaveMode.ErrorIfExists
+    saveMode.toLowerCase(Locale.ROOT) match {
+      case "overwrite" => mode(SaveMode.Overwrite)
+      case "append" => mode(SaveMode.Append)
+      case "ignore" => mode(SaveMode.Ignore)
+      case "error" | "errorifexists" => mode(SaveMode.ErrorIfExists)
+      case "default" => this
       case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
         "Accepted save modes are 'overwrite', 'append', 'ignore', 'error', 'errorifexists'.")
     }
-    this
   }
 
   /**
@@ -268,9 +270,24 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
 
       import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
       provider.getTable(dsOptions) match {
+        // TODO (SPARK-27815): To not break existing tests, here we treat file source as a special
+        // case, and pass the save mode to file source directly. This hack should be removed.
+        case table: FileTable =>
+          val write = table.newWriteBuilder(dsOptions).asInstanceOf[FileWriteBuilder]
+            .mode(modeForDSV1) // should not change default mode for file source.
+            .withQueryId(UUID.randomUUID().toString)
+            .withInputDataSchema(df.logicalPlan.schema)
+            .buildForBatch()
+          // The returned `Write` can be null, which indicates that we can skip writing.
+          if (write != null) {
+            runCommand(df.sparkSession, "save") {
+              WriteToDataSourceV2(write, df.logicalPlan)
+            }
+          }
+
         case table: SupportsWrite if table.supports(BATCH_WRITE) =>
           lazy val relation = DataSourceV2Relation.create(table, dsOptions)
-          mode match {
+          modeForDSV2 match {
             case SaveMode.Append =>
               runCommand(df.sparkSession, "save") {
                 AppendData.byName(relation, df.logicalPlan)
@@ -282,25 +299,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
                 OverwriteByExpression.byName(relation, df.logicalPlan, Literal(true))
               }
 
-            case _ =>
-              table.newWriteBuilder(dsOptions) match {
-                case writeBuilder: SupportsSaveMode =>
-                  val write = writeBuilder.mode(mode)
-                      .withQueryId(UUID.randomUUID().toString)
-                      .withInputDataSchema(df.logicalPlan.schema)
-                      .buildForBatch()
-                  // It can only return null with `SupportsSaveMode`. We can clean it up after
-                  // removing `SupportsSaveMode`.
-                  if (write != null) {
-                    runCommand(df.sparkSession, "save") {
-                      WriteToDataSourceV2(write, df.logicalPlan)
-                    }
-                  }
-
-                case _ =>
-                  throw new AnalysisException(
-                    s"data source ${table.name} does not support SaveMode $mode")
-              }
+            case other =>
+              throw new AnalysisException(s"TableProvider implementation $source cannot be " +
+                s"written with $other mode, please use Append or Overwrite " +
+                "modes instead.")
           }
 
         // Streaming also uses the data source V2 API. So it may be that the data source implements
@@ -328,7 +330,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         sparkSession = df.sparkSession,
         className = source,
         partitionColumns = partitioningColumns.getOrElse(Nil),
-        options = extraOptions.toMap).planForWriting(mode, df.logicalPlan)
+        options = extraOptions.toMap).planForWriting(modeForDSV1, df.logicalPlan)
     }
   }
 
@@ -377,7 +379,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         table = UnresolvedRelation(tableIdent),
         partition = Map.empty[String, Option[String]],
         query = df.logicalPlan,
-        overwrite = mode == SaveMode.Overwrite,
+        overwrite = modeForDSV1 == SaveMode.Overwrite,
         ifPartitionNotExists = false)
     }
   }
@@ -457,7 +459,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
     val tableIdentWithDB = tableIdent.copy(database = Some(db))
     val tableName = tableIdentWithDB.unquotedString
 
-    (tableExists, mode) match {
+    (tableExists, modeForDSV1) match {
       case (true, SaveMode.Ignore) =>
         // Do nothing
 
@@ -512,7 +514,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
       partitionColumnNames = partitioningColumns.getOrElse(Nil),
       bucketSpec = getBucketSpec)
 
-    runCommand(df.sparkSession, "saveAsTable")(CreateTable(tableDesc, mode, Some(df.logicalPlan)))
+    runCommand(df.sparkSession, "saveAsTable")(
+      CreateTable(tableDesc, modeForDSV1, Some(df.logicalPlan)))
   }
 
   /**
@@ -718,13 +721,17 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
     SQLExecution.withNewExecutionId(session, qe, Some(name))(qe.toRdd)
   }
 
+  private def modeForDSV1 = mode.getOrElse(SaveMode.ErrorIfExists)
+
+  private def modeForDSV2 = mode.getOrElse(SaveMode.Append)
+
   ///////////////////////////////////////////////////////////////////////////////////////
   // Builder pattern config options
   ///////////////////////////////////////////////////////////////////////////////////////
 
   private var source: String = df.sparkSession.sessionState.conf.defaultDataSourceName
 
-  private var mode: SaveMode = SaveMode.ErrorIfExists
+  private var mode: Option[SaveMode] = None
 
   private val extraOptions = new scala.collection.mutable.HashMap[String, String]
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
index 6b4efaf..e4f9e49 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
@@ -21,7 +21,6 @@ import java.util
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2._
@@ -47,13 +46,12 @@ private[noop] object NoopTable extends Table with SupportsWrite {
     Set(
       TableCapability.BATCH_WRITE,
       TableCapability.STREAMING_WRITE,
+      TableCapability.TRUNCATE,
       TableCapability.ACCEPT_ANY_SCHEMA).asJava
   }
 }
 
-private[noop] object NoopWriteBuilder extends WriteBuilder
-  with SupportsSaveMode with SupportsTruncate {
-  override def mode(mode: SaveMode): WriteBuilder = this
+private[noop] object NoopWriteBuilder extends WriteBuilder with SupportsTruncate {
   override def truncate(): WriteBuilder = this
   override def buildForBatch(): BatchWrite = NoopBatchWrite
   override def buildForStreaming(): StreamingWrite = NoopStreamingWrite
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
index 7ff5c41..eacc4cb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, DataSource, OutputWriterFactory, WriteJobDescription}
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.writer.{BatchWrite, SupportsSaveMode, WriteBuilder}
+import org.apache.spark.sql.sources.v2.writer.{BatchWrite, WriteBuilder}
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.sql.util.SchemaUtils
@@ -43,8 +43,7 @@ abstract class FileWriteBuilder(
     options: CaseInsensitiveStringMap,
     paths: Seq[String],
     formatName: String,
-    supportsDataType: DataType => Boolean)
-  extends WriteBuilder with SupportsSaveMode {
+    supportsDataType: DataType => Boolean) extends WriteBuilder {
   private var schema: StructType = _
   private var queryId: String = _
   private var mode: SaveMode = _
@@ -59,7 +58,7 @@ abstract class FileWriteBuilder(
     this
   }
 
-  override def mode(mode: SaveMode): WriteBuilder = {
+  def mode(mode: SaveMode): WriteBuilder = {
     this.mode = mode
     this
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 1797166..6c771ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -26,7 +26,6 @@ import org.apache.spark.{SparkEnv, SparkException, TaskContext}
 import org.apache.spark.executor.CommitDeniedException
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog}
 import org.apache.spark.sql.catalog.v2.expressions.Transform
 import org.apache.spark.sql.catalyst.InternalRow
@@ -36,7 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.sources.{AlwaysTrue, Filter}
 import org.apache.spark.sql.sources.v2.SupportsWrite
-import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsSaveMode, SupportsTruncate, WriteBuilder, WriterCommitMessage}
+import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, WriteBuilder, WriterCommitMessage}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.{LongAccumulator, Utils}
 
@@ -81,16 +80,10 @@ case class CreateTableAsSelectExec(
     Utils.tryWithSafeFinallyAndFailureCallbacks({
       catalog.createTable(ident, query.schema, partitioning.toArray, properties.asJava) match {
         case table: SupportsWrite =>
-          val builder = table.newWriteBuilder(writeOptions)
-              .withInputDataSchema(query.schema)
-              .withQueryId(UUID.randomUUID().toString)
-          val batchWrite = builder match {
-            case supportsSaveMode: SupportsSaveMode =>
-              supportsSaveMode.mode(SaveMode.Append).buildForBatch()
-
-            case _ =>
-              builder.buildForBatch()
-          }
+          val batchWrite = table.newWriteBuilder(writeOptions)
+            .withInputDataSchema(query.schema)
+            .withQueryId(UUID.randomUUID().toString)
+            .buildForBatch()
 
           doWrite(batchWrite)
 
@@ -116,13 +109,7 @@ case class AppendDataExec(
     query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {
 
   override protected def doExecute(): RDD[InternalRow] = {
-    val batchWrite = newWriteBuilder() match {
-      case builder: SupportsSaveMode =>
-        builder.mode(SaveMode.Append).buildForBatch()
-
-      case builder =>
-        builder.buildForBatch()
-    }
+    val batchWrite = newWriteBuilder().buildForBatch()
     doWrite(batchWrite)
   }
 }
@@ -152,9 +139,6 @@ case class OverwriteByExpressionExec(
       case builder: SupportsTruncate if isTruncate(deleteWhere) =>
         builder.truncate().buildForBatch()
 
-      case builder: SupportsSaveMode if isTruncate(deleteWhere) =>
-        builder.mode(SaveMode.Overwrite).buildForBatch()
-
       case builder: SupportsOverwrite =>
         builder.overwrite(deleteWhere).buildForBatch()
 
@@ -185,9 +169,6 @@ case class OverwritePartitionsDynamicExec(
       case builder: SupportsDynamicOverwrite =>
         builder.overwriteDynamicPartitions().buildForBatch()
 
-      case builder: SupportsSaveMode =>
-        builder.mode(SaveMode.Overwrite).buildForBatch()
-
       case _ =>
         throw new SparkException(s"Table does not support dynamic partition overwrite: $table")
     }
@@ -350,8 +331,8 @@ object DataWritingSparkTask extends Logging {
 }
 
 private[v2] case class DataWritingSparkTaskResult(
-                                                   numRows: Long,
-                                                   writerCommitMessage: WriterCommitMessage)
+    numRows: Long,
+    writerCommitMessage: WriterCommitMessage)
 
 /**
  * Sink progress information collected after commit.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index 4e071c5..379c9c4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
 import test.org.apache.spark.sql.sources.v2._
 
 import org.apache.spark.SparkException
-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation}
 import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
@@ -219,14 +219,14 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
         assert(spark.read.format(cls.getName).option("path", path).load().collect().isEmpty)
 
         spark.range(10).select('id as 'i, -'id as 'j).write.format(cls.getName)
-          .option("path", path).save()
+          .option("path", path).mode("append").save()
         checkAnswer(
           spark.read.format(cls.getName).option("path", path).load(),
           spark.range(10).select('id, -'id))
 
-        // test with different save modes
+        // default save mode is append
         spark.range(10).select('id as 'i, -'id as 'j).write.format(cls.getName)
-          .option("path", path).mode("append").save()
+          .option("path", path).save()
         checkAnswer(
           spark.read.format(cls.getName).option("path", path).load(),
           spark.range(10).union(spark.range(10)).select('id, -'id))
@@ -237,17 +237,17 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
           spark.read.format(cls.getName).option("path", path).load(),
           spark.range(5).select('id, -'id))
 
-        spark.range(5).select('id as 'i, -'id as 'j).write.format(cls.getName)
-          .option("path", path).mode("ignore").save()
-        checkAnswer(
-          spark.read.format(cls.getName).option("path", path).load(),
-          spark.range(5).select('id, -'id))
+        val e = intercept[AnalysisException] {
+          spark.range(5).select('id as 'i, -'id as 'j).write.format(cls.getName)
+            .option("path", path).mode("ignore").save()
+        }
+        assert(e.message.contains("please use Append or Overwrite modes instead"))
 
-        val e = intercept[Exception] {
+        val e2 = intercept[AnalysisException] {
           spark.range(5).select('id as 'i, -'id as 'j).write.format(cls.getName)
             .option("path", path).mode("error").save()
         }
-        assert(e.getMessage.contains("data already exists"))
+        assert(e2.getMessage.contains("please use Append or Overwrite modes instead"))
 
         // test transaction
         val failingUdf = org.apache.spark.sql.functions.udf {
@@ -262,10 +262,10 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
         }
         // this input data will fail to read middle way.
         val input = spark.range(10).select(failingUdf('id).as('i)).select('i, -'i as 'j)
-        val e2 = intercept[SparkException] {
+        val e3 = intercept[SparkException] {
           input.write.format(cls.getName).option("path", path).mode("overwrite").save()
         }
-        assert(e2.getMessage.contains("Writing job aborted"))
+        assert(e3.getMessage.contains("Writing job aborted"))
         // make sure we don't have partial data.
         assert(spark.read.format(cls.getName).option("path", path).load().collect().isEmpty)
       }
@@ -375,24 +375,6 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
     }
   }
 
-  test("SPARK-25700: do not read schema when writing in other modes except append and overwrite") {
-    withTempPath { file =>
-      val cls = classOf[SimpleWriteOnlyDataSource]
-      val path = file.getCanonicalPath
-      val df = spark.range(5).select('id as 'i, -'id as 'j)
-      // non-append mode should not throw exception, as they don't access schema.
-      df.write.format(cls.getName).option("path", path).mode("error").save()
-      df.write.format(cls.getName).option("path", path).mode("ignore").save()
-      // append and overwrite modes will access the schema and should throw exception.
-      intercept[SchemaReadAttemptException] {
-        df.write.format(cls.getName).option("path", path).mode("append").save()
-      }
-      intercept[SchemaReadAttemptException] {
-        df.write.format(cls.getName).option("path", path).mode("overwrite").save()
-      }
-    }
-  }
-
   test("SPARK-27411: DataSourceV2Strategy should not eliminate subquery") {
     withTempView("t1") {
       val t2 = spark.read.format(classOf[SimpleDataSourceV2].getName).load()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala
index 8627bdf..3ae3056 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala
@@ -50,7 +50,7 @@ class DummyReadOnlyFileTable extends Table with SupportsRead {
   }
 
   override def capabilities(): java.util.Set[TableCapability] =
-    Set(TableCapability.BATCH_READ).asJava
+    Set(TableCapability.BATCH_READ, TableCapability.ACCEPT_ANY_SCHEMA).asJava
 }
 
 class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 {
@@ -73,7 +73,7 @@ class DummyWriteOnlyFileTable extends Table with SupportsWrite {
     throw new AnalysisException("Dummy file writer")
 
   override def capabilities(): java.util.Set[TableCapability] =
-    Set(TableCapability.BATCH_WRITE).asJava
+    Set(TableCapability.BATCH_WRITE, TableCapability.ACCEPT_ANY_SCHEMA).asJava
 }
 
 class FileDataSourceV2FallBackSuite extends QueryTest with SharedSQLContext {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index edebb0b..c9d2f1e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.SparkContext
-import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.sources.v2.TableCapability._
 import org.apache.spark.sql.sources.v2.reader._
@@ -70,38 +69,26 @@ class SimpleWritableDataSource extends TableProvider with SessionConfigSupport {
     override def readSchema(): StructType = tableSchema
   }
 
-  class MyWriteBuilder(path: String) extends WriteBuilder with SupportsSaveMode {
+  class MyWriteBuilder(path: String) extends WriteBuilder with SupportsTruncate {
     private var queryId: String = _
-    private var mode: SaveMode = _
+    private var needTruncate = false
 
     override def withQueryId(queryId: String): WriteBuilder = {
       this.queryId = queryId
       this
     }
 
-    override def mode(mode: SaveMode): WriteBuilder = {
-      this.mode = mode
+    override def truncate(): WriteBuilder = {
+      this.needTruncate = true
       this
     }
 
     override def buildForBatch(): BatchWrite = {
-      assert(mode != null)
-
       val hadoopPath = new Path(path)
       val hadoopConf = SparkContext.getActive.get.hadoopConfiguration
       val fs = hadoopPath.getFileSystem(hadoopConf)
 
-      if (mode == SaveMode.ErrorIfExists) {
-        if (fs.exists(hadoopPath)) {
-          throw new RuntimeException("data already exists.")
-        }
-      }
-      if (mode == SaveMode.Ignore) {
-        if (fs.exists(hadoopPath)) {
-          return null
-        }
-      }
-      if (mode == SaveMode.Overwrite) {
+      if (needTruncate) {
         fs.delete(hadoopPath, true)
       }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index d34da33..5e6e3b4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -38,11 +38,15 @@ import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression}
+import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.datasources.DataSourceUtils
+import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
 import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.QueryExecutionListener
 import org.apache.spark.util.Utils
 
 
@@ -239,15 +243,75 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
   }
 
   test("save mode") {
-    val df = spark.read
+    spark.range(10).write
       .format("org.apache.spark.sql.test")
-      .load()
+      .mode(SaveMode.ErrorIfExists)
+      .save()
+    assert(LastOptions.saveMode === SaveMode.ErrorIfExists)
 
-    df.write
+    spark.range(10).write
+      .format("org.apache.spark.sql.test")
+      .mode(SaveMode.Append)
+      .save()
+    assert(LastOptions.saveMode === SaveMode.Append)
+
+    // By default the save mode is `ErrorIfExists` for data source v1.
+    spark.range(10).write
       .format("org.apache.spark.sql.test")
-      .mode(SaveMode.ErrorIfExists)
       .save()
     assert(LastOptions.saveMode === SaveMode.ErrorIfExists)
+
+    spark.range(10).write
+      .format("org.apache.spark.sql.test")
+      .mode("default")
+      .save()
+    assert(LastOptions.saveMode === SaveMode.ErrorIfExists)
+  }
+
+  test("save mode for data source v2") {
+    var plan: LogicalPlan = null
+    val listener = new QueryExecutionListener {
+      override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
+        plan = qe.analyzed
+
+      }
+      override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}
+    }
+
+    spark.listenerManager.register(listener)
+    try {
+      // append mode creates `AppendData`
+      spark.range(10).write
+        .format(classOf[NoopDataSource].getName)
+        .mode(SaveMode.Append)
+        .save()
+      sparkContext.listenerBus.waitUntilEmpty(1000)
+      assert(plan.isInstanceOf[AppendData])
+
+      // overwrite mode creates `OverwriteByExpression`
+      spark.range(10).write
+        .format(classOf[NoopDataSource].getName)
+        .mode(SaveMode.Overwrite)
+        .save()
+      sparkContext.listenerBus.waitUntilEmpty(1000)
+      assert(plan.isInstanceOf[OverwriteByExpression])
+
+      // By default the save mode is `ErrorIfExists` for data source v2.
+      spark.range(10).write
+        .format(classOf[NoopDataSource].getName)
+        .save()
+      sparkContext.listenerBus.waitUntilEmpty(1000)
+      assert(plan.isInstanceOf[AppendData])
+
+      spark.range(10).write
+        .format(classOf[NoopDataSource].getName)
+        .mode("default")
+        .save()
+      sparkContext.listenerBus.waitUntilEmpty(1000)
+      assert(plan.isInstanceOf[AppendData])
+    } finally {
+      spark.listenerManager.unregister(listener)
+    }
   }
 
   test("test path option in load") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org