You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/02/19 00:17:37 UTC

[spark] branch master updated: [SPARK-26785][SQL] data source v2 API refactor: streaming write

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f85ed9a  [SPARK-26785][SQL] data source v2 API refactor: streaming write
f85ed9a is described below

commit f85ed9a3e55083b0de0e20a37775efa92d248a4f
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Mon Feb 18 16:17:24 2019 -0800

    [SPARK-26785][SQL] data source v2 API refactor: streaming write
    
    ## What changes were proposed in this pull request?
    
    Continue the API refactor for streaming write, according to the [doc](https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing).
    
    The major changes:
    1. rename `StreamingWriteSupport` to `StreamingWrite`
    2. add `WriteBuilder.buildForStreaming`
    3. update existing sinks, to move the creation of `StreamingWrite` to `Table`
    
    ## How was this patch tested?
    
    existing tests
    
    Closes #23702 from cloud-fan/stream-write.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: gatorsmile <ga...@gmail.com>
---
 .../spark/sql/kafka010/KafkaSourceProvider.scala   | 42 ++++++-----
 ...riteSupport.scala => KafkaStreamingWrite.scala} |  8 +-
 .../spark/sql/sources/v2/SessionConfigSupport.java |  4 +-
 .../sources/v2/StreamingWriteSupportProvider.java  | 54 -------------
 .../spark/sql/sources/v2/SupportsBatchWrite.java   |  2 +-
 ...BatchWrite.java => SupportsStreamingWrite.java} |  9 ++-
 .../apache/spark/sql/sources/v2/TableProvider.java |  3 +-
 .../spark/sql/sources/v2/writer/WriteBuilder.java  |  9 ++-
 .../sql/sources/v2/writer/WriterCommitMessage.java |  4 +-
 .../streaming/StreamingDataWriterFactory.java      |  2 +-
 ...eamingWriteSupport.java => StreamingWrite.java} | 21 +++++-
 .../streaming/SupportsOutputMode.java}             | 17 +++--
 .../org/apache/spark/sql/DataFrameReader.scala     |  2 +-
 .../datasources/noop/NoopDataSource.scala          | 26 +++----
 .../datasources/v2/DataSourceV2StringFormat.scala  | 88 ----------------------
 .../datasources/v2/DataSourceV2Utils.scala         | 43 +++++------
 .../execution/streaming/MicroBatchExecution.scala  | 20 +++--
 .../execution/streaming/StreamingRelation.scala    |  6 +-
 .../spark/sql/execution/streaming/console.scala    | 43 ++++++++---
 .../streaming/continuous/ContinuousExecution.scala | 25 +++---
 .../streaming/continuous/EpochCoordinator.scala    |  6 +-
 .../continuous/WriteToContinuousDataSource.scala   |  6 +-
 .../WriteToContinuousDataSourceExec.scala          | 13 ++--
 ...onsoleWriteSupport.scala => ConsoleWrite.scala} |  6 +-
 ...portProvider.scala => ForeachWriterTable.scala} | 76 +++++++++++--------
 .../streaming/sources/MicroBatchWrite.scala        |  4 +-
 .../streaming/sources/RateStreamProvider.scala     |  3 +-
 .../sources/TextSocketSourceProvider.scala         |  3 +-
 .../sql/execution/streaming/sources/memoryV2.scala | 42 +++++++----
 .../spark/sql/streaming/DataStreamReader.scala     |  2 +-
 .../spark/sql/streaming/DataStreamWriter.scala     | 50 ++++++------
 .../sql/streaming/StreamingQueryManager.scala      |  4 +-
 ...org.apache.spark.sql.sources.DataSourceRegister |  2 +-
 .../execution/streaming/MemorySinkV2Suite.scala    |  6 +-
 .../sql/sources/v2/DataSourceV2UtilsSuite.scala    |  4 +-
 .../sql/sources/v2/SimpleWritableDataSource.scala  |  3 +-
 .../ContinuousQueuedDataReaderSuite.scala          |  4 +-
 .../continuous/EpochCoordinatorSuite.scala         |  6 +-
 .../sources/StreamingDataSourceV2Suite.scala       | 70 ++++++++++-------
 39 files changed, 345 insertions(+), 393 deletions(-)

diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 9238899b..6994517 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -33,7 +33,8 @@ import org.apache.spark.sql.sources._
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
+import org.apache.spark.sql.sources.v2.writer.WriteBuilder
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWrite, SupportsOutputMode}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 
@@ -47,7 +48,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
     with StreamSinkProvider
     with RelationProvider
     with CreatableRelationProvider
-    with StreamingWriteSupportProvider
     with TableProvider
     with Logging {
   import KafkaSourceProvider._
@@ -180,20 +180,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
     }
   }
 
-  override def createStreamingWriteSupport(
-      queryId: String,
-      schema: StructType,
-      mode: OutputMode,
-      options: DataSourceOptions): StreamingWriteSupport = {
-    import scala.collection.JavaConverters._
-
-    val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim)
-    // We convert the options argument from V2 -> Java map -> scala mutable -> scala immutable.
-    val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap)
-
-    new KafkaStreamingWriteSupport(topic, producerParams, schema)
-  }
-
   private def strategy(caseInsensitiveParams: Map[String, String]) =
       caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get match {
     case ("assign", value) =>
@@ -365,7 +351,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
   }
 
   class KafkaTable(strategy: => ConsumerStrategy) extends Table
-    with SupportsMicroBatchRead with SupportsContinuousRead {
+    with SupportsMicroBatchRead with SupportsContinuousRead with SupportsStreamingWrite {
 
     override def name(): String = s"Kafka $strategy"
 
@@ -374,6 +360,28 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
     override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new ScanBuilder {
       override def build(): Scan = new KafkaScan(options)
     }
+
+    override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
+      new WriteBuilder with SupportsOutputMode {
+        private var inputSchema: StructType = _
+
+        override def withInputDataSchema(schema: StructType): WriteBuilder = {
+          this.inputSchema = schema
+          this
+        }
+
+        override def outputMode(mode: OutputMode): WriteBuilder = this
+
+        override def buildForStreaming(): StreamingWrite = {
+          import scala.collection.JavaConverters._
+
+          assert(inputSchema != null)
+          val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim)
+          val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap)
+          new KafkaStreamingWrite(topic, producerParams, inputSchema)
+        }
+      }
+    }
   }
 
   class KafkaScan(options: DataSourceOptions) extends Scan {
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
similarity index 95%
rename from external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala
rename to external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
index 0d831c3..e3101e1 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
 import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -33,18 +33,18 @@ import org.apache.spark.sql.types.StructType
 case object KafkaWriterCommitMessage extends WriterCommitMessage
 
 /**
- * A [[StreamingWriteSupport]] for Kafka writing. Responsible for generating the writer factory.
+ * A [[StreamingWrite]] for Kafka writing. Responsible for generating the writer factory.
  *
  * @param topic The topic this writer is responsible for. If None, topic will be inferred from
  *              a `topic` field in the incoming data.
  * @param producerParams Parameters for Kafka producers in each task.
  * @param schema The schema of the input data.
  */
-class KafkaStreamingWriteSupport(
+class KafkaStreamingWrite(
     topic: Option[String],
     producerParams: ju.Map[String, Object],
     schema: StructType)
-  extends StreamingWriteSupport {
+  extends StreamingWrite {
 
   validateQuery(schema.toAttributes, producerParams, topic)
 
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
index c00abd9..d27fbfd 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
@@ -20,12 +20,12 @@ package org.apache.spark.sql.sources.v2;
 import org.apache.spark.annotation.Evolving;
 
 /**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
+ * A mix-in interface for {@link TableProvider}. Data sources can implement this interface to
  * propagate session configs with the specified key-prefix to all data source operations in this
  * session.
  */
 @Evolving
-public interface SessionConfigSupport extends DataSourceV2 {
+public interface SessionConfigSupport extends TableProvider {
 
   /**
    * Key prefix of the session configs to propagate, which is usually the data source name. Spark
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java
deleted file mode 100644
index 8ac9c51..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport;
-import org.apache.spark.sql.streaming.OutputMode;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
- * provide data writing ability for structured streaming.
- *
- * This interface is used to create {@link StreamingWriteSupport} instances when end users run
- * {@code Dataset.writeStream.format(...).option(...).start()}.
- */
-@Evolving
-public interface StreamingWriteSupportProvider extends DataSourceV2, BaseStreamingSink {
-
-  /**
-   * Creates a {@link StreamingWriteSupport} instance to save the data to this data source, which is
-   * called by Spark at the beginning of each streaming query.
-   *
-   * @param queryId A unique string for the writing query. It's possible that there are many
-   *                writing queries running at the same time, and the returned
-   *                {@link StreamingWriteSupport} can use this id to distinguish itself from others.
-   * @param schema the schema of the data to be written.
-   * @param mode the output mode which determines what successive epoch output means to this
-   *             sink, please refer to {@link OutputMode} for more details.
-   * @param options the options for the returned data source writer, which is an immutable
-   *                case-insensitive string-to-string map.
-   */
-  StreamingWriteSupport createStreamingWriteSupport(
-      String queryId,
-      StructType schema,
-      OutputMode mode,
-      DataSourceOptions options);
-}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java
index 08caadd..b2cd97a 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java
@@ -24,7 +24,7 @@ import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
  * An empty mix-in interface for {@link Table}, to indicate this table supports batch write.
  * <p>
  * If a {@link Table} implements this interface, the
- * {@link SupportsWrite#newWriteBuilder(DataSourceOptions)}  must return a {@link WriteBuilder}
+ * {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder}
  * with {@link WriteBuilder#buildForBatch()} implemented.
  * </p>
  */
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java
similarity index 76%
copy from sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java
copy to sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java
index 08caadd..1050d35 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java
@@ -18,15 +18,16 @@
 package org.apache.spark.sql.sources.v2;
 
 import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
 import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
 
 /**
- * An empty mix-in interface for {@link Table}, to indicate this table supports batch write.
+ * An empty mix-in interface for {@link Table}, to indicate this table supports streaming write.
  * <p>
  * If a {@link Table} implements this interface, the
- * {@link SupportsWrite#newWriteBuilder(DataSourceOptions)}  must return a {@link WriteBuilder}
- * with {@link WriteBuilder#buildForBatch()} implemented.
+ * {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder}
+ * with {@link WriteBuilder#buildForStreaming()} implemented.
  * </p>
  */
 @Evolving
-public interface SupportsBatchWrite extends SupportsWrite {}
+public interface SupportsStreamingWrite extends SupportsWrite, BaseStreamingSink { }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java
index 855d5ef..a9b83b6 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java
@@ -29,8 +29,7 @@ import org.apache.spark.sql.types.StructType;
  * </p>
  */
 @Evolving
-// TODO: do not extend `DataSourceV2`, after we finish the API refactor completely.
-public interface TableProvider extends DataSourceV2 {
+public interface TableProvider {
 
   /**
    * Return a {@link Table} instance to do read/write with user-specified options.
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java
index e861c72..07529fe 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java
@@ -20,6 +20,7 @@ package org.apache.spark.sql.sources.v2.writer;
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.sources.v2.SupportsBatchWrite;
 import org.apache.spark.sql.sources.v2.Table;
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite;
 import org.apache.spark.sql.types.StructType;
 
 /**
@@ -64,6 +65,12 @@ public interface WriteBuilder {
    * {@link SupportsSaveMode}.
    */
   default BatchWrite buildForBatch() {
-    throw new UnsupportedOperationException("Batch scans are not supported");
+    throw new UnsupportedOperationException(getClass().getName() +
+      " does not support batch write");
+  }
+
+  default StreamingWrite buildForStreaming() {
+    throw new UnsupportedOperationException(getClass().getName() +
+      " does not support streaming write");
   }
 }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
index 6334c8f..23e8580 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
@@ -20,12 +20,12 @@ package org.apache.spark.sql.sources.v2.writer;
 import java.io.Serializable;
 
 import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport;
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite;
 
 /**
  * A commit message returned by {@link DataWriter#commit()} and will be sent back to the driver side
  * as the input parameter of {@link BatchWrite#commit(WriterCommitMessage[])} or
- * {@link StreamingWriteSupport#commit(long, WriterCommitMessage[])}.
+ * {@link StreamingWrite#commit(long, WriterCommitMessage[])}.
  *
  * This is an empty interface, data sources should define their own message class and use it when
  * generating messages at executor side and handling the messages at driver side.
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java
index 7d3d21c..af2f03c 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java
@@ -26,7 +26,7 @@ import org.apache.spark.sql.sources.v2.writer.DataWriter;
 
 /**
  * A factory of {@link DataWriter} returned by
- * {@link StreamingWriteSupport#createStreamingWriterFactory()}, which is responsible for creating
+ * {@link StreamingWrite#createStreamingWriterFactory()}, which is responsible for creating
  * and initializing the actual data writer at executor side.
  *
  * Note that, the writer factory will be serialized and sent to executors, then the data writer
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWrite.java
similarity index 73%
rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java
rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWrite.java
index 84cfbf2..5617f1c 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWrite.java
@@ -22,13 +22,26 @@ import org.apache.spark.sql.sources.v2.writer.DataWriter;
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
 
 /**
- * An interface that defines how to write the data to data source for streaming processing.
+ * An interface that defines how to write the data to data source in streaming queries.
  *
- * Streaming queries are divided into intervals of data called epochs, with a monotonically
- * increasing numeric ID. This writer handles commits and aborts for each successive epoch.
+ * The writing procedure is:
+ *   1. Create a writer factory by {@link #createStreamingWriterFactory()}, serialize and send it to
+ *      all the partitions of the input data(RDD).
+ *   2. For each epoch in each partition, create the data writer, and write the data of the epoch in
+ *      the partition with this writer. If all the data are written successfully, call
+ *      {@link DataWriter#commit()}. If exception happens during the writing, call
+ *      {@link DataWriter#abort()}.
+ *   3. If writers in all partitions of one epoch are successfully committed, call
+ *      {@link #commit(long, WriterCommitMessage[])}. If some writers are aborted, or the job failed
+ *      with an unknown reason, call {@link #abort(long, WriterCommitMessage[])}.
+ *
+ * While Spark will retry failed writing tasks, Spark won't retry failed writing jobs. Users should
+ * do it manually in their Spark applications if they want to retry.
+ *
+ * Please refer to the documentation of commit/abort methods for detailed specifications.
  */
 @Evolving
-public interface StreamingWriteSupport {
+public interface StreamingWrite {
 
   /**
    * Creates a writer factory which will be serialized and sent to executors.
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsOutputMode.java
similarity index 67%
rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsOutputMode.java
index 43bdcca..832dcfa 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsOutputMode.java
@@ -15,12 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.sources.v2;
+package org.apache.spark.sql.sources.v2.writer.streaming;
 
-import org.apache.spark.annotation.Evolving;
+import org.apache.spark.annotation.Unstable;
+import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
+import org.apache.spark.sql.streaming.OutputMode;
 
-/**
- * TODO: remove it when we finish the API refactor for streaming write side.
- */
-@Evolving
-public interface DataSourceV2 {}
+// TODO: remove it when we have `SupportsTruncate`
+@Unstable
+public interface SupportsOutputMode extends WriteBuilder {
+
+  WriteBuilder outputMode(OutputMode mode);
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 713c9a9..e757785 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -205,7 +205,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
     if (classOf[TableProvider].isAssignableFrom(cls)) {
       val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
       val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
-        ds = provider, conf = sparkSession.sessionState.conf)
+        source = provider, conf = sparkSession.sessionState.conf)
       val pathsOption = {
         val objectMapper = new ObjectMapper()
         DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
index 452ebbb..8f2072c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 
@@ -30,30 +30,23 @@ import org.apache.spark.sql.types.StructType
  * This is no-op datasource. It does not do anything besides consuming its input.
  * This can be useful for benchmarking or to cache data without any additional overhead.
  */
-class NoopDataSource
-  extends DataSourceV2
-  with TableProvider
-  with DataSourceRegister
-  with StreamingWriteSupportProvider {
-
+class NoopDataSource extends TableProvider with DataSourceRegister {
   override def shortName(): String = "noop"
   override def getTable(options: DataSourceOptions): Table = NoopTable
-  override def createStreamingWriteSupport(
-      queryId: String,
-      schema: StructType,
-      mode: OutputMode,
-      options: DataSourceOptions): StreamingWriteSupport = NoopStreamingWriteSupport
 }
 
-private[noop] object NoopTable extends Table with SupportsBatchWrite {
+private[noop] object NoopTable extends Table with SupportsBatchWrite with SupportsStreamingWrite {
   override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = NoopWriteBuilder
   override def name(): String = "noop-table"
   override def schema(): StructType = new StructType()
 }
 
-private[noop] object NoopWriteBuilder extends WriteBuilder with SupportsSaveMode {
-  override def buildForBatch(): BatchWrite = NoopBatchWrite
+private[noop] object NoopWriteBuilder extends WriteBuilder
+  with SupportsSaveMode with SupportsOutputMode {
   override def mode(mode: SaveMode): WriteBuilder = this
+  override def outputMode(mode: OutputMode): WriteBuilder = this
+  override def buildForBatch(): BatchWrite = NoopBatchWrite
+  override def buildForStreaming(): StreamingWrite = NoopStreamingWrite
 }
 
 private[noop] object NoopBatchWrite extends BatchWrite {
@@ -72,7 +65,7 @@ private[noop] object NoopWriter extends DataWriter[InternalRow] {
   override def abort(): Unit = {}
 }
 
-private[noop] object NoopStreamingWriteSupport extends StreamingWriteSupport {
+private[noop] object NoopStreamingWrite extends StreamingWrite {
   override def createStreamingWriterFactory(): StreamingDataWriterFactory =
     NoopStreamingDataWriterFactory
   override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
@@ -85,4 +78,3 @@ private[noop] object NoopStreamingDataWriterFactory extends StreamingDataWriterF
       taskId: Long,
       epochId: Long): DataWriter[InternalRow] = NoopWriter
 }
-
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala
deleted file mode 100644
index f11703c..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.v2
-
-import org.apache.commons.lang3.StringUtils
-
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.sources.v2.DataSourceV2
-import org.apache.spark.util.Utils
-
-/**
- * A trait that can be used by data source v2 related query plans(both logical and physical), to
- * provide a string format of the data source information for explain.
- */
-trait DataSourceV2StringFormat {
-
-  /**
-   * The instance of this data source implementation. Note that we only consider its class in
-   * equals/hashCode, not the instance itself.
-   */
-  def source: DataSourceV2
-
-  /**
-   * The output of the data source reader, w.r.t. column pruning.
-   */
-  def output: Seq[Attribute]
-
-  /**
-   * The options for this data source reader.
-   */
-  def options: Map[String, String]
-
-  /**
-   * The filters which have been pushed to the data source.
-   */
-  def pushedFilters: Seq[Expression]
-
-  private def sourceName: String = source match {
-    case registered: DataSourceRegister => registered.shortName()
-    // source.getClass.getSimpleName can cause Malformed class name error,
-    // call safer `Utils.getSimpleName` instead
-    case _ => Utils.getSimpleName(source.getClass)
-  }
-
-  def metadataString(maxFields: Int): String = {
-    val entries = scala.collection.mutable.ArrayBuffer.empty[(String, String)]
-
-    if (pushedFilters.nonEmpty) {
-      entries += "Filters" -> pushedFilters.mkString("[", ", ", "]")
-    }
-
-    // TODO: we should only display some standard options like path, table, etc.
-    if (options.nonEmpty) {
-      entries += "Options" -> Utils.redact(options).map {
-        case (k, v) => s"$k=$v"
-      }.mkString("[", ",", "]")
-    }
-
-    val outputStr = truncatedString(output, "[", ", ", "]", maxFields)
-
-    val entriesStr = if (entries.nonEmpty) {
-      truncatedString(entries.map {
-        case (key, value) => key + ": " + StringUtils.abbreviate(value, 100)
-      }, " (", ", ", ")", maxFields)
-    } else {
-      ""
-    }
-
-    s"$sourceName$outputStr$entriesStr"
-  }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
index e9cc399..30897d8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
@@ -21,8 +21,7 @@ import java.util.regex.Pattern
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.sources.v2.{DataSourceV2, SessionConfigSupport}
+import org.apache.spark.sql.sources.v2.{SessionConfigSupport, TableProvider}
 
 private[sql] object DataSourceV2Utils extends Logging {
 
@@ -34,34 +33,28 @@ private[sql] object DataSourceV2Utils extends Logging {
    * `spark.datasource.$keyPrefix`. A session config `spark.datasource.$keyPrefix.xxx -> yyy` will
    * be transformed into `xxx -> yyy`.
    *
-   * @param ds a [[DataSourceV2]] object
+   * @param source a [[TableProvider]] object
    * @param conf the session conf
    * @return an immutable map that contains all the extracted and transformed k/v pairs.
    */
-  def extractSessionConfigs(ds: DataSourceV2, conf: SQLConf): Map[String, String] = ds match {
-    case cs: SessionConfigSupport =>
-      val keyPrefix = cs.keyPrefix()
-      require(keyPrefix != null, "The data source config key prefix can't be null.")
-
-      val pattern = Pattern.compile(s"^spark\\.datasource\\.$keyPrefix\\.(.+)")
-
-      conf.getAllConfs.flatMap { case (key, value) =>
-        val m = pattern.matcher(key)
-        if (m.matches() && m.groupCount() > 0) {
-          Seq((m.group(1), value))
-        } else {
-          Seq.empty
+  def extractSessionConfigs(source: TableProvider, conf: SQLConf): Map[String, String] = {
+    source match {
+      case cs: SessionConfigSupport =>
+        val keyPrefix = cs.keyPrefix()
+        require(keyPrefix != null, "The data source config key prefix can't be null.")
+
+        val pattern = Pattern.compile(s"^spark\\.datasource\\.$keyPrefix\\.(.+)")
+
+        conf.getAllConfs.flatMap { case (key, value) =>
+          val m = pattern.matcher(key)
+          if (m.matches() && m.groupCount() > 0) {
+            Seq((m.group(1), value))
+          } else {
+            Seq.empty
+          }
         }
-      }
-
-    case _ => Map.empty
-  }
 
-  def failForUserSpecifiedSchema[T](ds: DataSourceV2): T = {
-    val name = ds match {
-      case register: DataSourceRegister => register.shortName()
-      case _ => ds.getClass.getName
+      case _ => Map.empty
     }
-    throw new UnsupportedOperationException(name + " source does not support user-specified schema")
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 2c33975..cca2790 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWrite, RateCo
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2}
+import org.apache.spark.sql.sources.v2.writer.streaming.SupportsOutputMode
 import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
 import org.apache.spark.util.Clock
 
@@ -513,13 +514,16 @@ class MicroBatchExecution(
 
     val triggerLogicalPlan = sink match {
       case _: Sink => newAttributePlan
-      case s: StreamingWriteSupportProvider =>
-        val writer = s.createStreamingWriteSupport(
-          s"$runId",
-          newAttributePlan.schema,
-          outputMode,
-          new DataSourceOptions(extraOptions.asJava))
-        WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, writer), newAttributePlan)
+      case s: SupportsStreamingWrite =>
+        // TODO: we should translate OutputMode to concrete write actions like truncate, but
+        // the truncate action is being developed in SPARK-26666.
+        val writeBuilder = s.newWriteBuilder(new DataSourceOptions(extraOptions.asJava))
+          .withQueryId(runId.toString)
+          .withInputDataSchema(newAttributePlan.schema)
+        val streamingWrite = writeBuilder.asInstanceOf[SupportsOutputMode]
+          .outputMode(outputMode)
+          .buildForStreaming()
+        WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, streamingWrite), newAttributePlan)
       case _ => throw new IllegalArgumentException(s"unknown sink type for $sink")
     }
 
@@ -549,7 +553,7 @@ class MicroBatchExecution(
       SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) {
         sink match {
           case s: Sink => s.addBatch(currentBatchId, nextBatch)
-          case _: StreamingWriteSupportProvider =>
+          case _: SupportsStreamingWrite =>
             // This doesn't accumulate any data - it just forces execution of the microbatch writer.
             nextBatch.collect()
         }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index 83d38dc..1b7aa54 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
 import org.apache.spark.sql.execution.LeafExecNode
 import org.apache.spark.sql.execution.datasources.DataSource
-import org.apache.spark.sql.sources.v2.{DataSourceV2, Table}
+import org.apache.spark.sql.sources.v2.{Table, TableProvider}
 
 object StreamingRelation {
   def apply(dataSource: DataSource): StreamingRelation = {
@@ -86,13 +86,13 @@ case class StreamingExecutionRelation(
 // know at read time whether the query is continuous or not, so we need to be able to
 // swap a V1 relation back in.
 /**
- * Used to link a [[DataSourceV2]] into a streaming
+ * Used to link a [[TableProvider]] into a streaming
  * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. This is only used for creating
  * a streaming [[org.apache.spark.sql.DataFrame]] from [[org.apache.spark.sql.DataFrameReader]],
  * and should be converted before passing to [[StreamExecution]].
  */
 case class StreamingRelationV2(
-    dataSource: DataSourceV2,
+    source: TableProvider,
     sourceName: String,
     table: Table,
     extraOptions: Map[String, String],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
index 9c5c16f..348bc76 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
@@ -18,10 +18,11 @@
 package org.apache.spark.sql.execution.streaming
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.execution.streaming.sources.ConsoleWriteSupport
+import org.apache.spark.sql.execution.streaming.sources.ConsoleWrite
 import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister}
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, StreamingWriteSupportProvider}
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
+import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.writer.WriteBuilder
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWrite, SupportsOutputMode}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 
@@ -30,17 +31,12 @@ case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame)
   override def schema: StructType = data.schema
 }
 
-class ConsoleSinkProvider extends DataSourceV2
-  with StreamingWriteSupportProvider
+class ConsoleSinkProvider extends TableProvider
   with DataSourceRegister
   with CreatableRelationProvider {
 
-  override def createStreamingWriteSupport(
-      queryId: String,
-      schema: StructType,
-      mode: OutputMode,
-      options: DataSourceOptions): StreamingWriteSupport = {
-    new ConsoleWriteSupport(schema, options)
+  override def getTable(options: DataSourceOptions): Table = {
+    ConsoleTable
   }
 
   def createRelation(
@@ -60,3 +56,28 @@ class ConsoleSinkProvider extends DataSourceV2
 
   def shortName(): String = "console"
 }
+
+object ConsoleTable extends Table with SupportsStreamingWrite {
+
+  override def name(): String = "console"
+
+  override def schema(): StructType = StructType(Nil)
+
+  override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
+    new WriteBuilder with SupportsOutputMode {
+      private var inputSchema: StructType = _
+
+      override def withInputDataSchema(schema: StructType): WriteBuilder = {
+        this.inputSchema = schema
+        this
+      }
+
+      override def outputMode(mode: OutputMode): WriteBuilder = this
+
+      override def buildForStreaming(): StreamingWrite = {
+        assert(inputSchema != null)
+        new ConsoleWrite(inputSchema, options)
+      }
+    }
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index b22795d..20101c7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -32,8 +32,9 @@ import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
 import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _}
 import org.apache.spark.sql.sources.v2
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamingWriteSupportProvider, SupportsContinuousRead}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsContinuousRead, SupportsStreamingWrite}
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.streaming.SupportsOutputMode
 import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
 import org.apache.spark.util.Clock
 
@@ -42,7 +43,7 @@ class ContinuousExecution(
     name: String,
     checkpointRoot: String,
     analyzedPlan: LogicalPlan,
-    sink: StreamingWriteSupportProvider,
+    sink: SupportsStreamingWrite,
     trigger: Trigger,
     triggerClock: Clock,
     outputMode: OutputMode,
@@ -174,12 +175,15 @@ class ContinuousExecution(
           "CurrentTimestamp and CurrentDate not yet supported for continuous processing")
     }
 
-    val writer = sink.createStreamingWriteSupport(
-      s"$runId",
-      withNewSources.schema,
-      outputMode,
-      new DataSourceOptions(extraOptions.asJava))
-    val planWithSink = WriteToContinuousDataSource(writer, withNewSources)
+    // TODO: we should translate OutputMode to concrete write actions like truncate, but
+    // the truncate action is being developed in SPARK-26666.
+    val writeBuilder = sink.newWriteBuilder(new DataSourceOptions(extraOptions.asJava))
+      .withQueryId(runId.toString)
+      .withInputDataSchema(withNewSources.schema)
+    val streamingWrite = writeBuilder.asInstanceOf[SupportsOutputMode]
+      .outputMode(outputMode)
+      .buildForStreaming()
+    val planWithSink = WriteToContinuousDataSource(streamingWrite, withNewSources)
 
     reportTimeTaken("queryPlanning") {
       lastExecution = new IncrementalExecution(
@@ -214,9 +218,8 @@ class ContinuousExecution(
       trigger.asInstanceOf[ContinuousTrigger].intervalMs.toString)
 
     // Use the parent Spark session for the endpoint since it's where this query ID is registered.
-    val epochEndpoint =
-      EpochCoordinatorRef.create(
-        writer, stream, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get)
+    val epochEndpoint = EpochCoordinatorRef.create(
+      streamingWrite, stream, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get)
     val epochUpdateThread = new Thread(new Runnable {
       override def run: Unit = {
         try {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
index d1bda79..a998422 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
@@ -25,7 +25,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeR
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
 import org.apache.spark.util.RpcUtils
 
 private[continuous] sealed trait EpochCoordinatorMessage extends Serializable
@@ -82,7 +82,7 @@ private[sql] object EpochCoordinatorRef extends Logging {
    * Create a reference to a new [[EpochCoordinator]].
    */
   def create(
-      writeSupport: StreamingWriteSupport,
+      writeSupport: StreamingWrite,
       stream: ContinuousStream,
       query: ContinuousExecution,
       epochCoordinatorId: String,
@@ -115,7 +115,7 @@ private[sql] object EpochCoordinatorRef extends Logging {
  *   have both committed and reported an end offset for a given epoch.
  */
 private[continuous] class EpochCoordinator(
-    writeSupport: StreamingWriteSupport,
+    writeSupport: StreamingWrite,
     stream: ContinuousStream,
     query: ContinuousExecution,
     startEpoch: Long,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala
index 7ad21cc..54f484c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala
@@ -19,13 +19,13 @@ package org.apache.spark.sql.execution.streaming.continuous
 
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
 
 /**
  * The logical plan for writing data in a continuous stream.
  */
-case class WriteToContinuousDataSource(
-    writeSupport: StreamingWriteSupport, query: LogicalPlan) extends LogicalPlan {
+case class WriteToContinuousDataSource(write: StreamingWrite, query: LogicalPlan)
+  extends LogicalPlan {
   override def children: Seq[LogicalPlan] = Seq(query)
   override def output: Seq[Attribute] = Nil
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
index 2178466..2f3af6a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
@@ -26,21 +26,22 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.execution.streaming.StreamExecution
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
 
 /**
- * The physical plan for writing data into a continuous processing [[StreamingWriteSupport]].
+ * The physical plan for writing data into a continuous processing [[StreamingWrite]].
  */
-case class WriteToContinuousDataSourceExec(writeSupport: StreamingWriteSupport, query: SparkPlan)
-    extends UnaryExecNode with Logging {
+case class WriteToContinuousDataSourceExec(write: StreamingWrite, query: SparkPlan)
+  extends UnaryExecNode with Logging {
+
   override def child: SparkPlan = query
   override def output: Seq[Attribute] = Nil
 
   override protected def doExecute(): RDD[InternalRow] = {
-    val writerFactory = writeSupport.createStreamingWriterFactory()
+    val writerFactory = write.createStreamingWriterFactory()
     val rdd = new ContinuousWriteRDD(query.execute(), writerFactory)
 
-    logInfo(s"Start processing data source write support: $writeSupport. " +
+    logInfo(s"Start processing data source write support: $write. " +
       s"The input RDD has ${rdd.partitions.length} partitions.")
     EpochCoordinatorRef.get(
       sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWrite.scala
similarity index 94%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriteSupport.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWrite.scala
index 833e62f..f2ff30b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriteSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWrite.scala
@@ -22,12 +22,12 @@ import org.apache.spark.sql.{Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
 import org.apache.spark.sql.types.StructType
 
 /** Common methods used to create writes for the the console sink */
-class ConsoleWriteSupport(schema: StructType, options: DataSourceOptions)
-    extends StreamingWriteSupport with Logging {
+class ConsoleWrite(schema: StructType, options: DataSourceOptions)
+    extends StreamingWrite with Logging {
 
   // Number of rows to display, by default 20 rows
   protected val numRowsToShow = options.getInt("numRows", 20)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriteSupportProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
similarity index 66%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriteSupportProvider.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
index 4218fd5..6fbb59c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriteSupportProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
@@ -22,63 +22,73 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.python.PythonForeachWriter
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamingWriteSupportProvider}
-import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriterCommitMessage}
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite, Table}
+import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriteBuilder, WriterCommitMessage}
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 
 /**
- * A [[org.apache.spark.sql.sources.v2.DataSourceV2]] for forwarding data into the specified
- * [[ForeachWriter]].
+ * A write-only table for forwarding data into the specified [[ForeachWriter]].
  *
  * @param writer The [[ForeachWriter]] to process all data.
  * @param converter An object to convert internal rows to target type T. Either it can be
  *                  a [[ExpressionEncoder]] or a direct converter function.
  * @tparam T The expected type of the sink.
  */
-case class ForeachWriteSupportProvider[T](
+case class ForeachWriterTable[T](
     writer: ForeachWriter[T],
     converter: Either[ExpressionEncoder[T], InternalRow => T])
-  extends StreamingWriteSupportProvider {
-
-  override def createStreamingWriteSupport(
-      queryId: String,
-      schema: StructType,
-      mode: OutputMode,
-      options: DataSourceOptions): StreamingWriteSupport = {
-    new StreamingWriteSupport {
-      override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
-      override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
-
-      override def createStreamingWriterFactory(): StreamingDataWriterFactory = {
-        val rowConverter: InternalRow => T = converter match {
-          case Left(enc) =>
-            val boundEnc = enc.resolveAndBind(
-              schema.toAttributes,
-              SparkSession.getActiveSession.get.sessionState.analyzer)
-            boundEnc.fromRow
-          case Right(func) =>
-            func
-        }
-        ForeachWriterFactory(writer, rowConverter)
+  extends Table with SupportsStreamingWrite {
+
+  override def name(): String = "ForeachSink"
+
+  override def schema(): StructType = StructType(Nil)
+
+  override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
+    new WriteBuilder with SupportsOutputMode {
+      private var inputSchema: StructType = _
+
+      override def withInputDataSchema(schema: StructType): WriteBuilder = {
+        this.inputSchema = schema
+        this
       }
 
-      override def toString: String = "ForeachSink"
+      override def outputMode(mode: OutputMode): WriteBuilder = this
+
+      override def buildForStreaming(): StreamingWrite = {
+        new StreamingWrite {
+          override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
+          override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
+
+          override def createStreamingWriterFactory(): StreamingDataWriterFactory = {
+            val rowConverter: InternalRow => T = converter match {
+              case Left(enc) =>
+                val boundEnc = enc.resolveAndBind(
+                  inputSchema.toAttributes,
+                  SparkSession.getActiveSession.get.sessionState.analyzer)
+                boundEnc.fromRow
+              case Right(func) =>
+                func
+            }
+            ForeachWriterFactory(writer, rowConverter)
+          }
+        }
+      }
     }
   }
 }
 
-object ForeachWriteSupportProvider {
+object ForeachWriterTable {
   def apply[T](
       writer: ForeachWriter[T],
-      encoder: ExpressionEncoder[T]): ForeachWriteSupportProvider[_] = {
+      encoder: ExpressionEncoder[T]): ForeachWriterTable[_] = {
     writer match {
       case pythonWriter: PythonForeachWriter =>
-        new ForeachWriteSupportProvider[UnsafeRow](
+        new ForeachWriterTable[UnsafeRow](
           pythonWriter, Right((x: InternalRow) => x.asInstanceOf[UnsafeRow]))
       case _ =>
-        new ForeachWriteSupportProvider[T](writer, Left(encoder))
+        new ForeachWriterTable[T](writer, Left(encoder))
     }
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala
index 143235e..f395189 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala
@@ -19,14 +19,14 @@ package org.apache.spark.sql.execution.streaming.sources
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriter, DataWriterFactory, WriterCommitMessage}
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
 
 /**
  * A [[BatchWrite]] used to hook V2 stream writers into a microbatch plan. It implements
  * the non-streaming interface, forwarding the epoch ID determined at construction to a wrapped
  * streaming write support.
  */
-class MicroBatchWrite(eppchId: Long, val writeSupport: StreamingWriteSupport) extends BatchWrite {
+class MicroBatchWrite(eppchId: Long, val writeSupport: StreamingWrite) extends BatchWrite {
 
   override def commit(messages: Array[WriterCommitMessage]): Unit = {
     writeSupport.commit(eppchId, messages)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
index 075c6b9..3a00825 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
@@ -40,8 +40,7 @@ import org.apache.spark.sql.types._
  *    generated rows. The source will try its best to reach `rowsPerSecond`, but the query may
  *    be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed.
  */
-class RateStreamProvider extends DataSourceV2
-  with TableProvider with DataSourceRegister {
+class RateStreamProvider extends TableProvider with DataSourceRegister {
   import RateStreamProvider._
 
   override def getTable(options: DataSourceOptions): Table = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
index c3b24a8..8ac5bfc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
@@ -31,8 +31,7 @@ import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
 import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
 
-class TextSocketSourceProvider extends DataSourceV2
-  with TableProvider with DataSourceRegister with Logging {
+class TextSocketSourceProvider extends TableProvider with DataSourceRegister with Logging {
 
   private def checkParameters(params: DataSourceOptions): Unit = {
     logWarning("The socket source should not be used for production applications! " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
index c50dc7b..3fc2cbe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
@@ -32,9 +32,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
 import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update}
 import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink}
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, StreamingWriteSupportProvider}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite}
 import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 
@@ -42,15 +42,31 @@ import org.apache.spark.sql.types.StructType
  * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit
  * tests and does not provide durability.
  */
-class MemorySinkV2 extends DataSourceV2 with StreamingWriteSupportProvider
-  with MemorySinkBase with Logging {
-
-  override def createStreamingWriteSupport(
-      queryId: String,
-      schema: StructType,
-      mode: OutputMode,
-      options: DataSourceOptions): StreamingWriteSupport = {
-    new MemoryStreamingWriteSupport(this, mode, schema)
+class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Logging {
+
+  override def name(): String = "MemorySinkV2"
+
+  override def schema(): StructType = StructType(Nil)
+
+  override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
+    new WriteBuilder with SupportsOutputMode {
+      private var mode: OutputMode = _
+      private var inputSchema: StructType = _
+
+      override def outputMode(mode: OutputMode): WriteBuilder = {
+        this.mode = mode
+        this
+      }
+
+      override def withInputDataSchema(schema: StructType): WriteBuilder = {
+        this.inputSchema = schema
+        this
+      }
+
+      override def buildForStreaming(): StreamingWrite = {
+        new MemoryStreamingWrite(MemorySinkV2.this, mode, inputSchema)
+      }
+    }
   }
 
   private case class AddedData(batchId: Long, data: Array[Row])
@@ -122,9 +138,9 @@ class MemorySinkV2 extends DataSourceV2 with StreamingWriteSupportProvider
 case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row])
   extends WriterCommitMessage {}
 
-class MemoryStreamingWriteSupport(
+class MemoryStreamingWrite(
     val sink: MemorySinkV2, outputMode: OutputMode, schema: StructType)
-  extends StreamingWriteSupport {
+  extends StreamingWrite {
 
   override def createStreamingWriterFactory: MemoryWriterFactory = {
     MemoryWriterFactory(outputMode, schema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 8666818..ef21caa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -173,7 +173,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
     ds match {
       case provider: TableProvider =>
         val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
-          ds = provider, conf = sparkSession.sessionState.conf)
+          source = provider, conf = sparkSession.sessionState.conf)
         val options = sessionOptions ++ extraOptions
         val dsOptions = new DataSourceOptions(options.asJava)
         val table = userSpecifiedSchema match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index ea596ba..9841994 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
 import org.apache.spark.sql.execution.streaming.sources._
-import org.apache.spark.sql.sources.v2.StreamingWriteSupportProvider
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite, TableProvider}
 
 /**
  * Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems,
@@ -278,7 +278,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
       query
     } else if (source == "foreach") {
       assertNotPartitioned("foreach")
-      val sink = ForeachWriteSupportProvider[T](foreachWriter, ds.exprEnc)
+      val sink = ForeachWriterTable[T](foreachWriter, ds.exprEnc)
       df.sparkSession.sessionState.streamingQueryManager.startQuery(
         extraOptions.get("queryName"),
         extraOptions.get("checkpointLocation"),
@@ -304,30 +304,29 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
         useTempCheckpointLocation = true,
         trigger = trigger)
     } else {
-      val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
+      val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
       val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",")
-      var options = extraOptions.toMap
-      val sink = ds.getConstructor().newInstance() match {
-        case w: StreamingWriteSupportProvider
-            if !disabledSources.contains(w.getClass.getCanonicalName) =>
-          val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
-            w, df.sparkSession.sessionState.conf)
-          options = sessionOptions ++ extraOptions
-          w
-        case _ =>
-          val ds = DataSource(
-            df.sparkSession,
-            className = source,
-            options = options,
-            partitionColumns = normalizedParCols.getOrElse(Nil))
-          ds.createSink(outputMode)
+      val useV1Source = disabledSources.contains(cls.getCanonicalName)
+
+      val sink = if (classOf[TableProvider].isAssignableFrom(cls) && !useV1Source) {
+        val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
+        val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+          source = provider, conf = df.sparkSession.sessionState.conf)
+        val options = sessionOptions ++ extraOptions
+        val dsOptions = new DataSourceOptions(options.asJava)
+        provider.getTable(dsOptions) match {
+          case s: SupportsStreamingWrite => s
+          case _ => createV1Sink()
+        }
+      } else {
+        createV1Sink()
       }
 
       df.sparkSession.sessionState.streamingQueryManager.startQuery(
-        options.get("queryName"),
-        options.get("checkpointLocation"),
+        extraOptions.get("queryName"),
+        extraOptions.get("checkpointLocation"),
         df,
-        options,
+        extraOptions.toMap,
         sink,
         outputMode,
         useTempCheckpointLocation = source == "console" || source == "noop",
@@ -336,6 +335,15 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
     }
   }
 
+  private def createV1Sink(): BaseStreamingSink = {
+    val ds = DataSource(
+      df.sparkSession,
+      className = source,
+      options = extraOptions.toMap,
+      partitionColumns = normalizedParCols.getOrElse(Nil))
+    ds.createSink(outputMode)
+  }
+
   /**
    * Sets the output of the streaming query to be processed using the provided writer object.
    * object. See [[org.apache.spark.sql.ForeachWriter]] for more details on the lifecycle and
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 0bd8a92..a7fa800 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution,
 import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS
-import org.apache.spark.sql.sources.v2.StreamingWriteSupportProvider
+import org.apache.spark.sql.sources.v2.SupportsStreamingWrite
 import org.apache.spark.util.{Clock, SystemClock, Utils}
 
 /**
@@ -261,7 +261,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
     }
 
     (sink, trigger) match {
-      case (v2Sink: StreamingWriteSupportProvider, trigger: ContinuousTrigger) =>
+      case (v2Sink: SupportsStreamingWrite, trigger: ContinuousTrigger) =>
         if (operationCheckEnabled) {
           UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode)
         }
diff --git a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index a36b0cf..914af58 100644
--- a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -9,6 +9,6 @@ org.apache.spark.sql.streaming.sources.FakeReadMicroBatchOnly
 org.apache.spark.sql.streaming.sources.FakeReadContinuousOnly
 org.apache.spark.sql.streaming.sources.FakeReadBothModes
 org.apache.spark.sql.streaming.sources.FakeReadNeitherMode
-org.apache.spark.sql.streaming.sources.FakeWriteSupportProvider
+org.apache.spark.sql.streaming.sources.FakeWriteOnly
 org.apache.spark.sql.streaming.sources.FakeNoWrite
 org.apache.spark.sql.streaming.sources.FakeWriteSupportProviderV1Fallback
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
index 6185736..e804377 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
@@ -43,9 +43,9 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter {
 
   test("streaming writer") {
     val sink = new MemorySinkV2
-    val writeSupport = new MemoryStreamingWriteSupport(
+    val write = new MemoryStreamingWrite(
       sink, OutputMode.Append(), new StructType().add("i", "int"))
-    writeSupport.commit(0,
+    write.commit(0,
       Array(
         MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))),
         MemoryWriterCommitMessage(1, Seq(Row(3), Row(4))),
@@ -53,7 +53,7 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter {
       ))
     assert(sink.latestBatchId.contains(0))
     assert(sink.latestBatchData.map(_.getInt(0)).sorted == Seq(1, 2, 3, 4, 6, 7))
-    writeSupport.commit(19,
+    write.commit(19,
       Array(
         MemoryWriterCommitMessage(3, Seq(Row(11), Row(22))),
         MemoryWriterCommitMessage(0, Seq(Row(33)))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2UtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2UtilsSuite.scala
index f903c17..0b1e3b5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2UtilsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2UtilsSuite.scala
@@ -33,8 +33,8 @@ class DataSourceV2UtilsSuite extends SparkFunSuite {
     conf.setConfString(s"spark.sql.$keyPrefix.config.name", "false")
     conf.setConfString("spark.datasource.another.config.name", "123")
     conf.setConfString(s"spark.datasource.$keyPrefix.", "123")
-    val cs = classOf[DataSourceV2WithSessionConfig].getConstructor().newInstance()
-    val confs = DataSourceV2Utils.extractSessionConfigs(cs.asInstanceOf[DataSourceV2], conf)
+    val source = new DataSourceV2WithSessionConfig
+    val confs = DataSourceV2Utils.extractSessionConfigs(source, conf)
     assert(confs.size == 2)
     assert(confs.keySet.filter(_.startsWith("spark.datasource")).size == 0)
     assert(confs.keySet.filter(_.startsWith("not.exist.prefix")).size == 0)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index daca65f..c56a545 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -38,8 +38,7 @@ import org.apache.spark.util.SerializableConfiguration
  * Each task writes data to `target/_temporary/uniqueId/$jobId-$partitionId-$attemptNumber`.
  * Each job moves files from `target/_temporary/uniqueId/` to `target`.
  */
-class SimpleWritableDataSource extends DataSourceV2
-  with TableProvider with SessionConfigSupport {
+class SimpleWritableDataSource extends TableProvider with SessionConfigSupport {
 
   private val tableSchema = new StructType().add("i", "long").add("j", "long")
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
index d3d210c..bad2259 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.execution.streaming.continuous._
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReader, ContinuousStream, PartitionOffset}
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
 import org.apache.spark.sql.streaming.StreamTest
 import org.apache.spark.sql.types.{DataType, IntegerType, StructType}
 
@@ -43,7 +43,7 @@ class ContinuousQueuedDataReaderSuite extends StreamTest with MockitoSugar {
   override def beforeEach(): Unit = {
     super.beforeEach()
     epochEndpoint = EpochCoordinatorRef.create(
-      mock[StreamingWriteSupport],
+      mock[StreamingWrite],
       mock[ContinuousStream],
       mock[ContinuousExecution],
       coordinatorId,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
index a0b56ec..f74285f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.LocalSparkSession
 import org.apache.spark.sql.execution.streaming.continuous._
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
 import org.apache.spark.sql.test.TestSparkSession
 
 class EpochCoordinatorSuite
@@ -40,13 +40,13 @@ class EpochCoordinatorSuite
 
   private var epochCoordinator: RpcEndpointRef = _
 
-  private var writeSupport: StreamingWriteSupport = _
+  private var writeSupport: StreamingWrite = _
   private var query: ContinuousExecution = _
   private var orderVerifier: InOrder = _
 
   override def beforeEach(): Unit = {
     val stream = mock[ContinuousStream]
-    writeSupport = mock[StreamingWriteSupport]
+    writeSupport = mock[StreamingWrite]
     query = mock[ContinuousExecution]
     orderVerifier = inOrder(writeSupport, query)
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index 62f1666..c841793 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.reader.streaming._
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
+import org.apache.spark.sql.sources.v2.writer.WriteBuilder
 import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamTest, Trigger}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
@@ -71,13 +71,10 @@ trait FakeContinuousReadTable extends Table with SupportsContinuousRead {
   override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new FakeScanBuilder
 }
 
-trait FakeStreamingWriteSupportProvider extends StreamingWriteSupportProvider {
-  override def createStreamingWriteSupport(
-      queryId: String,
-      schema: StructType,
-      mode: OutputMode,
-      options: DataSourceOptions): StreamingWriteSupport = {
-    LastWriteOptions.options = options
+trait FakeStreamingWriteTable extends Table with SupportsStreamingWrite {
+  override def name(): String = "fake"
+  override def schema(): StructType = StructType(Seq())
+  override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
     throw new IllegalStateException("fake sink - cannot actually write")
   }
 }
@@ -129,20 +126,33 @@ class FakeReadNeitherMode extends DataSourceRegister with TableProvider {
   }
 }
 
-class FakeWriteSupportProvider
+class FakeWriteOnly
     extends DataSourceRegister
-    with FakeStreamingWriteSupportProvider
+    with TableProvider
     with SessionConfigSupport {
   override def shortName(): String = "fake-write-microbatch-continuous"
 
   override def keyPrefix: String = shortName()
+
+  override def getTable(options: DataSourceOptions): Table = {
+    LastWriteOptions.options = options
+    new Table with FakeStreamingWriteTable {
+      override def name(): String = "fake"
+      override def schema(): StructType = StructType(Nil)
+    }
+  }
 }
 
-class FakeNoWrite extends DataSourceRegister {
+class FakeNoWrite extends DataSourceRegister with TableProvider {
   override def shortName(): String = "fake-write-neither-mode"
+  override def getTable(options: DataSourceOptions): Table = {
+    new Table {
+      override def name(): String = "fake"
+      override def schema(): StructType = StructType(Nil)
+    }
+  }
 }
 
-
 case class FakeWriteV1FallbackException() extends Exception
 
 class FakeSink extends Sink {
@@ -150,17 +160,24 @@ class FakeSink extends Sink {
 }
 
 class FakeWriteSupportProviderV1Fallback extends DataSourceRegister
-  with FakeStreamingWriteSupportProvider with StreamSinkProvider {
+  with TableProvider with StreamSinkProvider {
 
   override def createSink(
-    sqlContext: SQLContext,
-    parameters: Map[String, String],
-    partitionColumns: Seq[String],
-    outputMode: OutputMode): Sink = {
+      sqlContext: SQLContext,
+      parameters: Map[String, String],
+      partitionColumns: Seq[String],
+      outputMode: OutputMode): Sink = {
     new FakeSink()
   }
 
   override def shortName(): String = "fake-write-v1-fallback"
+
+  override def getTable(options: DataSourceOptions): Table = {
+    new Table with FakeStreamingWriteTable {
+      override def name(): String = "fake"
+      override def schema(): StructType = StructType(Nil)
+    }
+  }
 }
 
 object LastReadOptions {
@@ -260,7 +277,7 @@ class StreamingDataSourceV2Suite extends StreamTest {
     testPositiveCaseWithQuery(
       "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once()) { v2Query =>
       assert(v2Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
-        .isInstanceOf[FakeWriteSupportProviderV1Fallback])
+        .isInstanceOf[Table])
     }
 
     // Ensure we create a V1 sink with the config. Note the config is a comma separated
@@ -319,19 +336,20 @@ class StreamingDataSourceV2Suite extends StreamTest {
 
   for ((read, write, trigger) <- cases) {
     testQuietly(s"stream with read format $read, write format $write, trigger $trigger") {
-      val table = DataSource.lookupDataSource(read, spark.sqlContext.conf).getConstructor()
+      val sourceTable = DataSource.lookupDataSource(read, spark.sqlContext.conf).getConstructor()
+        .newInstance().asInstanceOf[TableProvider].getTable(DataSourceOptions.empty())
+
+      val sinkTable = DataSource.lookupDataSource(write, spark.sqlContext.conf).getConstructor()
         .newInstance().asInstanceOf[TableProvider].getTable(DataSourceOptions.empty())
-      val writeSource = DataSource.lookupDataSource(write, spark.sqlContext.conf).
-        getConstructor().newInstance()
 
-      (table, writeSource, trigger) match {
+      (sourceTable, sinkTable, trigger) match {
         // Valid microbatch queries.
-        case (_: SupportsMicroBatchRead, _: StreamingWriteSupportProvider, t)
+        case (_: SupportsMicroBatchRead, _: SupportsStreamingWrite, t)
             if !t.isInstanceOf[ContinuousTrigger] =>
           testPositiveCase(read, write, trigger)
 
         // Valid continuous queries.
-        case (_: SupportsContinuousRead, _: StreamingWriteSupportProvider,
+        case (_: SupportsContinuousRead, _: SupportsStreamingWrite,
               _: ContinuousTrigger) =>
           testPositiveCase(read, write, trigger)
 
@@ -342,12 +360,12 @@ class StreamingDataSourceV2Suite extends StreamTest {
             s"Data source $read does not support streamed reading")
 
         // Invalid - can't write
-        case (_, w, _) if !w.isInstanceOf[StreamingWriteSupportProvider] =>
+        case (_, w, _) if !w.isInstanceOf[SupportsStreamingWrite] =>
           testNegativeCase(read, write, trigger,
             s"Data source $write does not support streamed writing")
 
         // Invalid - trigger is continuous but reader is not
-        case (r, _: StreamingWriteSupportProvider, _: ContinuousTrigger)
+        case (r, _: SupportsStreamingWrite, _: ContinuousTrigger)
             if !r.isInstanceOf[SupportsContinuousRead] =>
           testNegativeCase(read, write, trigger,
             s"Data source $read does not support continuous processing")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org