You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/01/29 05:10:44 UTC

spark git commit: [SPARK-23196] Unify continuous and microbatch V2 sinks

Repository: spark
Updated Branches:
  refs/heads/master 686a622c9 -> 49b0207dc


[SPARK-23196] Unify continuous and microbatch V2 sinks

## What changes were proposed in this pull request?

Replace streaming V2 sinks with a unified StreamWriteSupport interface, with a shim to use it with microbatch execution.

Add a new SQL config to use for disabling V2 sinks, falling back to the V1 sink implementation.

## How was this patch tested?

Existing tests, which in the case of Kafka (the only existing continuous V2 sink) now use V2 for microbatch.

Author: Jose Torres <jo...@databricks.com>

Closes #20369 from jose-torres/streaming-sink.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49b0207d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49b0207d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49b0207d

Branch: refs/heads/master
Commit: 49b0207dc9327989c72700b4d04d2a714c92e159
Parents: 686a622
Author: Jose Torres <jo...@databricks.com>
Authored: Mon Jan 29 13:10:38 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Jan 29 13:10:38 2018 +0800

----------------------------------------------------------------------
 .../sql/kafka010/KafkaContinuousWriter.scala    | 119 -------------------
 .../sql/kafka010/KafkaSourceProvider.scala      |  16 +--
 .../spark/sql/kafka010/KafkaStreamWriter.scala  | 115 ++++++++++++++++++
 .../sql/kafka010/KafkaContinuousSinkSuite.scala |   8 +-
 .../spark/sql/kafka010/KafkaSinkSuite.scala     |  14 ++-
 .../spark/sql/kafka010/KafkaSourceSuite.scala   |   8 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |   9 ++
 .../v2/streaming/ContinuousWriteSupport.java    |  56 ---------
 .../v2/streaming/MicroBatchWriteSupport.java    |  60 ----------
 .../v2/streaming/StreamWriteSupport.java        |  54 +++++++++
 .../v2/streaming/writer/ContinuousWriter.java   |  44 -------
 .../v2/streaming/writer/StreamWriter.java       |  72 +++++++++++
 .../sources/v2/writer/DataSourceV2Writer.java   |   4 +-
 .../datasources/v2/WriteToDataSourceV2.scala    |  11 +-
 .../streaming/MicroBatchExecution.scala         |  19 +--
 .../spark/sql/execution/streaming/console.scala |  27 ++---
 .../continuous/ContinuousExecution.scala        |  19 ++-
 .../streaming/continuous/EpochCoordinator.scala |   9 +-
 .../streaming/sources/ConsoleWriter.scala       |  59 ++-------
 .../streaming/sources/MicroBatchWriter.scala    |  54 +++++++++
 .../execution/streaming/sources/memoryV2.scala  |  29 ++---
 .../spark/sql/streaming/DataStreamWriter.scala  |  10 +-
 .../sql/streaming/StreamingQueryManager.scala   |   9 +-
 ....apache.spark.sql.sources.DataSourceRegister |   7 +-
 .../execution/streaming/MemorySinkV2Suite.scala |   2 +-
 .../sources/StreamingDataSourceV2Suite.scala    | 112 ++++++++---------
 26 files changed, 457 insertions(+), 489 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousWriter.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousWriter.scala
deleted file mode 100644
index 9843f46..0000000
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousWriter.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.kafka010
-
-import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata}
-import scala.collection.JavaConverters._
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection}
-import org.apache.spark.sql.kafka010.KafkaSourceProvider.{kafkaParamsForProducer, TOPIC_OPTION_KEY}
-import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
-import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
-import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.sql.streaming.OutputMode
-import org.apache.spark.sql.types.{BinaryType, StringType, StructType}
-
-/**
- * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we
- * don't need to really send one.
- */
-case object KafkaWriterCommitMessage extends WriterCommitMessage
-
-/**
- * A [[ContinuousWriter]] for Kafka writing. Responsible for generating the writer factory.
- * @param topic The topic this writer is responsible for. If None, topic will be inferred from
- *              a `topic` field in the incoming data.
- * @param producerParams Parameters for Kafka producers in each task.
- * @param schema The schema of the input data.
- */
-class KafkaContinuousWriter(
-    topic: Option[String], producerParams: Map[String, String], schema: StructType)
-  extends ContinuousWriter with SupportsWriteInternalRow {
-
-  validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic)
-
-  override def createInternalRowWriterFactory(): KafkaContinuousWriterFactory =
-    KafkaContinuousWriterFactory(topic, producerParams, schema)
-
-  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
-  override def abort(messages: Array[WriterCommitMessage]): Unit = {}
-}
-
-/**
- * A [[DataWriterFactory]] for Kafka writing. Will be serialized and sent to executors to generate
- * the per-task data writers.
- * @param topic The topic that should be written to. If None, topic will be inferred from
- *              a `topic` field in the incoming data.
- * @param producerParams Parameters for Kafka producers in each task.
- * @param schema The schema of the input data.
- */
-case class KafkaContinuousWriterFactory(
-    topic: Option[String], producerParams: Map[String, String], schema: StructType)
-  extends DataWriterFactory[InternalRow] {
-
-  override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
-    new KafkaContinuousDataWriter(topic, producerParams, schema.toAttributes)
-  }
-}
-
-/**
- * A [[DataWriter]] for Kafka writing. One data writer will be created in each partition to
- * process incoming rows.
- *
- * @param targetTopic The topic that this data writer is targeting. If None, topic will be inferred
- *                    from a `topic` field in the incoming data.
- * @param producerParams Parameters to use for the Kafka producer.
- * @param inputSchema The attributes in the input data.
- */
-class KafkaContinuousDataWriter(
-    targetTopic: Option[String], producerParams: Map[String, String], inputSchema: Seq[Attribute])
-  extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] {
-  import scala.collection.JavaConverters._
-
-  private lazy val producer = CachedKafkaProducer.getOrCreate(
-    new java.util.HashMap[String, Object](producerParams.asJava))
-
-  def write(row: InternalRow): Unit = {
-    checkForErrors()
-    sendRow(row, producer)
-  }
-
-  def commit(): WriterCommitMessage = {
-    // Send is asynchronous, but we can't commit until all rows are actually in Kafka.
-    // This requires flushing and then checking that no callbacks produced errors.
-    // We also check for errors before to fail as soon as possible - the check is cheap.
-    checkForErrors()
-    producer.flush()
-    checkForErrors()
-    KafkaWriterCommitMessage
-  }
-
-  def abort(): Unit = {}
-
-  def close(): Unit = {
-    checkForErrors()
-    if (producer != null) {
-      producer.flush()
-      checkForErrors()
-      CachedKafkaProducer.close(new java.util.HashMap[String, Object](producerParams.asJava))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 62a998f..2deb7fa 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -28,11 +28,11 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext}
-import org.apache.spark.sql.execution.streaming.{Offset, Sink, Source}
+import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
-import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, ContinuousWriteSupport}
-import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
+import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 
@@ -46,7 +46,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
     with StreamSinkProvider
     with RelationProvider
     with CreatableRelationProvider
-    with ContinuousWriteSupport
+    with StreamWriteSupport
     with ContinuousReadSupport
     with Logging {
   import KafkaSourceProvider._
@@ -223,11 +223,11 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
     }
   }
 
-  override def createContinuousWriter(
+  override def createStreamWriter(
       queryId: String,
       schema: StructType,
       mode: OutputMode,
-      options: DataSourceV2Options): Optional[ContinuousWriter] = {
+      options: DataSourceV2Options): StreamWriter = {
     import scala.collection.JavaConverters._
 
     val spark = SparkSession.getActiveSession.get
@@ -238,7 +238,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
     KafkaWriter.validateQuery(
       schema.toAttributes, new java.util.HashMap[String, Object](producerParams.asJava), topic)
 
-    Optional.of(new KafkaContinuousWriter(topic, producerParams, schema))
+    new KafkaStreamWriter(topic, producerParams, schema)
   }
 
   private def strategy(caseInsensitiveParams: Map[String, String]) =

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
new file mode 100644
index 0000000..a24efde
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
+import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we
+ * don't need to really send one.
+ */
+case object KafkaWriterCommitMessage extends WriterCommitMessage
+
+/**
+ * A [[StreamWriter]] for Kafka writing. Responsible for generating the writer factory.
+ *
+ * @param topic The topic this writer is responsible for. If None, topic will be inferred from
+ *              a `topic` field in the incoming data.
+ * @param producerParams Parameters for Kafka producers in each task.
+ * @param schema The schema of the input data.
+ */
+class KafkaStreamWriter(
+    topic: Option[String], producerParams: Map[String, String], schema: StructType)
+  extends StreamWriter with SupportsWriteInternalRow {
+
+  validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic)
+
+  override def createInternalRowWriterFactory(): KafkaStreamWriterFactory =
+    KafkaStreamWriterFactory(topic, producerParams, schema)
+
+  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
+  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
+}
+
+/**
+ * A [[DataWriterFactory]] for Kafka writing. Will be serialized and sent to executors to generate
+ * the per-task data writers.
+ * @param topic The topic that should be written to. If None, topic will be inferred from
+ *              a `topic` field in the incoming data.
+ * @param producerParams Parameters for Kafka producers in each task.
+ * @param schema The schema of the input data.
+ */
+case class KafkaStreamWriterFactory(
+    topic: Option[String], producerParams: Map[String, String], schema: StructType)
+  extends DataWriterFactory[InternalRow] {
+
+  override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
+    new KafkaStreamDataWriter(topic, producerParams, schema.toAttributes)
+  }
+}
+
+/**
+ * A [[DataWriter]] for Kafka writing. One data writer will be created in each partition to
+ * process incoming rows.
+ *
+ * @param targetTopic The topic that this data writer is targeting. If None, topic will be inferred
+ *                    from a `topic` field in the incoming data.
+ * @param producerParams Parameters to use for the Kafka producer.
+ * @param inputSchema The attributes in the input data.
+ */
+class KafkaStreamDataWriter(
+    targetTopic: Option[String], producerParams: Map[String, String], inputSchema: Seq[Attribute])
+  extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] {
+  import scala.collection.JavaConverters._
+
+  private lazy val producer = CachedKafkaProducer.getOrCreate(
+    new java.util.HashMap[String, Object](producerParams.asJava))
+
+  def write(row: InternalRow): Unit = {
+    checkForErrors()
+    sendRow(row, producer)
+  }
+
+  def commit(): WriterCommitMessage = {
+    // Send is asynchronous, but we can't commit until all rows are actually in Kafka.
+    // This requires flushing and then checking that no callbacks produced errors.
+    // We also check for errors before to fail as soon as possible - the check is cheap.
+    checkForErrors()
+    producer.flush()
+    checkForErrors()
+    KafkaWriterCommitMessage
+  }
+
+  def abort(): Unit = {}
+
+  def close(): Unit = {
+    checkForErrors()
+    if (producer != null) {
+      producer.flush()
+      checkForErrors()
+      CachedKafkaProducer.close(new java.util.HashMap[String, Object](producerParams.asJava))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
index 8487a69..fc890a0 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
@@ -18,16 +18,14 @@
 package org.apache.spark.sql.kafka010
 
 import java.util.Locale
-import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.scalatest.time.SpanSugar._
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
+import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection}
-import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.types.{BinaryType, DataType}
 import org.apache.spark.util.Utils
@@ -362,7 +360,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
     } finally {
       writer.stop()
     }
-    assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("job aborted"))
+    assert(ex.getCause.getCause.getMessage.toLowerCase(Locale.ROOT).contains("job aborted"))
   }
 
   test("streaming - exception on config serializer") {
@@ -424,7 +422,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
     options.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
     val inputSchema = Seq(AttributeReference("value", BinaryType)())
     val data = new Array[Byte](15000) // large value
-    val writeTask = new KafkaContinuousDataWriter(Some(topic), options.asScala.toMap, inputSchema)
+    val writeTask = new KafkaStreamDataWriter(Some(topic), options.asScala.toMap, inputSchema)
     try {
       val fieldTypes: Array[DataType] = Array(BinaryType)
       val converter = UnsafeProjection.create(fieldTypes)

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
index 2ab336c..42f8b4c 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -336,27 +336,31 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
     } finally {
       writer.stop()
     }
-    assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("job aborted"))
+    assert(ex.getCause.getCause.getMessage.toLowerCase(Locale.ROOT).contains("job aborted"))
   }
 
   test("streaming - exception on config serializer") {
     val input = MemoryStream[String]
     var writer: StreamingQuery = null
     var ex: Exception = null
-    ex = intercept[IllegalArgumentException] {
+    ex = intercept[StreamingQueryException] {
       writer = createKafkaWriter(
         input.toDF(),
         withOptions = Map("kafka.key.serializer" -> "foo"))()
+      input.addData("1")
+      writer.processAllAvailable()
     }
-    assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
+    assert(ex.getCause.getMessage.toLowerCase(Locale.ROOT).contains(
       "kafka option 'key.serializer' is not supported"))
 
-    ex = intercept[IllegalArgumentException] {
+    ex = intercept[StreamingQueryException] {
       writer = createKafkaWriter(
         input.toDF(),
         withOptions = Map("kafka.value.serializer" -> "foo"))()
+      input.addData("1")
+      writer.processAllAvailable()
     }
-    assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
+    assert(ex.getCause.getMessage.toLowerCase(Locale.ROOT).contains(
       "kafka option 'value.serializer' is not supported"))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index c4cb1bc..02c8764 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -29,19 +29,17 @@ import scala.util.Random
 
 import org.apache.kafka.clients.producer.RecordMetadata
 import org.apache.kafka.common.TopicPartition
-import org.scalatest.concurrent.Eventually._
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkContext
-import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row}
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2Exec}
+import org.apache.spark.sql.{Dataset, ForeachWriter}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
-import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryWriter
 import org.apache.spark.sql.functions.{count, window}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
 import org.apache.spark.sql.streaming.util.StreamManualClock
 import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
 import org.apache.spark.util.Utils

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 76b9d6f..2c70b00 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1127,6 +1127,13 @@ object SQLConf {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefault(100)
 
+  val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers")
+    .internal()
+    .doc("A comma-separated list of fully qualified data source register class names for which" +
+      " StreamWriteSupport is disabled. Writes to these sources will fail back to the V1 Sink.")
+    .stringConf
+    .createWithDefault("")
+
   object PartitionOverwriteMode extends Enumeration {
     val STATIC, DYNAMIC = Value
   }
@@ -1494,6 +1501,8 @@ class SQLConf extends Serializable with Logging {
   def continuousStreamingExecutorPollIntervalMs: Long =
     getConf(CONTINUOUS_STREAMING_EXECUTOR_POLL_INTERVAL_MS)
 
+  def disabledV2StreamingWriters: String = getConf(DISABLED_V2_STREAMING_WRITERS)
+
   def concatBinaryAsString: Boolean = getConf(CONCAT_BINARY_AS_STRING)
 
   def eltOutputAsString: Boolean = getConf(ELT_OUTPUT_AS_STRING)

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousWriteSupport.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousWriteSupport.java
deleted file mode 100644
index dee493c..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousWriteSupport.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.DataSourceV2Options;
-import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter;
-import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
-import org.apache.spark.sql.streaming.OutputMode;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
- * provide data writing ability for continuous stream processing.
- */
-@InterfaceStability.Evolving
-public interface ContinuousWriteSupport extends BaseStreamingSink {
-
-    /**
-     * Creates an optional {@link ContinuousWriter} to save the data to this data source. Data
-     * sources can return None if there is no writing needed to be done.
-     *
-     * @param queryId A unique string for the writing query. It's possible that there are many
-     *                writing queries running at the same time, and the returned
-     *                {@link DataSourceV2Writer} can use this id to distinguish itself from others.
-     * @param schema the schema of the data to be written.
-     * @param mode the output mode which determines what successive epoch output means to this
-     *             sink, please refer to {@link OutputMode} for more details.
-     * @param options the options for the returned data source writer, which is an immutable
-     *                case-insensitive string-to-string map.
-     */
-    Optional<ContinuousWriter> createContinuousWriter(
-        String queryId,
-        StructType schema,
-        OutputMode mode,
-        DataSourceV2Options options);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchWriteSupport.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchWriteSupport.java
deleted file mode 100644
index 53ffa95..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchWriteSupport.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.DataSourceV2Options;
-import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
-import org.apache.spark.sql.streaming.OutputMode;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
- * provide data writing ability and save the data from a microbatch to the data source.
- */
-@InterfaceStability.Evolving
-public interface MicroBatchWriteSupport extends BaseStreamingSink {
-
-  /**
-   * Creates an optional {@link DataSourceV2Writer} to save the data to this data source. Data
-   * sources can return None if there is no writing needed to be done.
-   *
-   * @param queryId A unique string for the writing query. It's possible that there are many writing
-   *                queries running at the same time, and the returned {@link DataSourceV2Writer}
-   *                can use this id to distinguish itself from others.
-   * @param epochId The unique numeric ID of the batch within this writing query. This is an
-   *                incrementing counter representing a consistent set of data; the same batch may
-   *                be started multiple times in failure recovery scenarios, but it will always
-   *                contain the same records.
-   * @param schema the schema of the data to be written.
-   * @param mode the output mode which determines what successive batch output means to this
-   *             sink, please refer to {@link OutputMode} for more details.
-   * @param options the options for the returned data source writer, which is an immutable
-   *                case-insensitive string-to-string map.
-   */
-  Optional<DataSourceV2Writer> createMicroBatchWriter(
-      String queryId,
-      long epochId,
-      StructType schema,
-      OutputMode mode,
-      DataSourceV2Options options);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java
new file mode 100644
index 0000000..6cd219c
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter;
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
+ * provide data writing ability for structured streaming.
+ */
+@InterfaceStability.Evolving
+public interface StreamWriteSupport extends BaseStreamingSink {
+
+    /**
+     * Creates an optional {@link StreamWriter} to save the data to this data source. Data
+     * sources can return None if there is no writing needed to be done.
+     *
+     * @param queryId A unique string for the writing query. It's possible that there are many
+     *                writing queries running at the same time, and the returned
+     *                {@link DataSourceV2Writer} can use this id to distinguish itself from others.
+     * @param schema the schema of the data to be written.
+     * @param mode the output mode which determines what successive epoch output means to this
+     *             sink, please refer to {@link OutputMode} for more details.
+     * @param options the options for the returned data source writer, which is an immutable
+     *                case-insensitive string-to-string map.
+     */
+    StreamWriter createStreamWriter(
+        String queryId,
+        StructType schema,
+        OutputMode mode,
+        DataSourceV2Options options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/ContinuousWriter.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/ContinuousWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/ContinuousWriter.java
deleted file mode 100644
index 723395b..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/ContinuousWriter.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming.writer;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
-import org.apache.spark.sql.sources.v2.writer.DataWriter;
-import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
-
-/**
- * A {@link DataSourceV2Writer} for use with continuous stream processing.
- */
-@InterfaceStability.Evolving
-public interface ContinuousWriter extends DataSourceV2Writer {
-  /**
-   * Commits this writing job for the specified epoch with a list of commit messages. The commit
-   * messages are collected from successful data writers and are produced by
-   * {@link DataWriter#commit()}.
-   *
-   * If this method fails (by throwing an exception), this writing job is considered to have been
-   * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
-   */
-  void commit(long epochId, WriterCommitMessage[] messages);
-
-  default void commit(WriterCommitMessage[] messages) {
-    throw new UnsupportedOperationException(
-       "Commit without epoch should not be called with ContinuousWriter");
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
new file mode 100644
index 0000000..3156c88
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+
+/**
+ * A {@link DataSourceV2Writer} for use with structured streaming. This writer handles commits and
+ * aborts relative to an epoch ID determined by the execution engine.
+ *
+ * {@link DataWriter} implementations generated by a StreamWriter may be reused for multiple epochs,
+ * and so must reset any internal state after a successful commit.
+ */
+@InterfaceStability.Evolving
+public interface StreamWriter extends DataSourceV2Writer {
+  /**
+   * Commits this writing job for the specified epoch with a list of commit messages. The commit
+   * messages are collected from successful data writers and are produced by
+   * {@link DataWriter#commit()}.
+   *
+   * If this method fails (by throwing an exception), this writing job is considered to have been
+   * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
+   *
+   * To support exactly-once processing, writer implementations should ensure that this method is
+   * idempotent. The execution engine may call commit() multiple times for the same epoch
+   * in some circumstances.
+   */
+  void commit(long epochId, WriterCommitMessage[] messages);
+
+  /**
+   * Aborts this writing job because some data writers are failed and keep failing when retry, or
+   * the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails.
+   *
+   * If this method fails (by throwing an exception), the underlying data source may require manual
+   * cleanup.
+   *
+   * Unless the abort is triggered by the failure of commit, the given messages should have some
+   * null slots as there maybe only a few data writers that are committed before the abort
+   * happens, or some data writers were committed but their commit messages haven't reached the
+   * driver when the abort is triggered. So this is just a "best effort" for data sources to
+   * clean up the data left by data writers.
+   */
+  void abort(long epochId, WriterCommitMessage[] messages);
+
+  default void commit(WriterCommitMessage[] messages) {
+    throw new UnsupportedOperationException(
+        "Commit without epoch should not be called with StreamWriter");
+  }
+
+  default void abort(WriterCommitMessage[] messages) {
+    throw new UnsupportedOperationException(
+        "Abort without epoch should not be called with StreamWriter");
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
index f1ef411..8048f50 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
@@ -28,9 +28,7 @@ import org.apache.spark.sql.types.StructType;
 /**
  * A data source writer that is returned by
  * {@link WriteSupport#createWriter(String, StructType, SaveMode, DataSourceV2Options)}/
- * {@link org.apache.spark.sql.sources.v2.streaming.MicroBatchWriteSupport#createMicroBatchWriter(
- * String, long, StructType, OutputMode, DataSourceV2Options)}/
- * {@link org.apache.spark.sql.sources.v2.streaming.ContinuousWriteSupport#createContinuousWriter(
+ * {@link org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport#createStreamWriter(
  * String, StructType, OutputMode, DataSourceV2Options)}.
  * It can mix in various writing optimization interfaces to speed up the data saving. The actual
  * writing logic is delegated to {@link DataWriter}.

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
index 3dbdae7..cd6b3e9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
@@ -26,9 +26,8 @@ import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.streaming.StreamExecution
 import org.apache.spark.sql.execution.streaming.continuous.{CommitPartitionEpoch, ContinuousExecution, EpochCoordinatorRef, SetWriterPartitions}
-import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
+import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
 import org.apache.spark.sql.sources.v2.writer._
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
@@ -62,7 +61,9 @@ case class WriteToDataSourceV2Exec(writer: DataSourceV2Writer, query: SparkPlan)
 
     try {
       val runTask = writer match {
-        case w: ContinuousWriter =>
+        // This case means that we're doing continuous processing. In microbatch streaming, the
+        // StreamWriter is wrapped in a MicroBatchWriter, which is executed as a normal batch.
+        case w: StreamWriter =>
           EpochCoordinatorRef.get(
             sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
             sparkContext.env)
@@ -82,13 +83,13 @@ case class WriteToDataSourceV2Exec(writer: DataSourceV2Writer, query: SparkPlan)
         (index, message: WriterCommitMessage) => messages(index) = message
       )
 
-      if (!writer.isInstanceOf[ContinuousWriter]) {
+      if (!writer.isInstanceOf[StreamWriter]) {
         logInfo(s"Data source writer $writer is committing.")
         writer.commit(messages)
         logInfo(s"Data source writer $writer committed.")
       }
     } catch {
-      case _: InterruptedException if writer.isInstanceOf[ContinuousWriter] =>
+      case _: InterruptedException if writer.isInstanceOf[StreamWriter] =>
         // Interruption is how continuous queries are ended, so accept and ignore the exception.
       case cause: Throwable =>
         logError(s"Data source writer $writer is aborting.")

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 7c38045..9759752 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -28,9 +28,11 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Curre
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2}
+import org.apache.spark.sql.execution.streaming.sources.{InternalRowMicroBatchWriter, MicroBatchWriter}
 import org.apache.spark.sql.sources.v2.DataSourceV2Options
-import org.apache.spark.sql.sources.v2.streaming.{MicroBatchReadSupport, MicroBatchWriteSupport}
+import org.apache.spark.sql.sources.v2.streaming.{MicroBatchReadSupport, StreamWriteSupport}
 import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset => OffsetV2}
+import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow
 import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
 import org.apache.spark.util.{Clock, Utils}
 
@@ -440,15 +442,18 @@ class MicroBatchExecution(
 
     val triggerLogicalPlan = sink match {
       case _: Sink => newAttributePlan
-      case s: MicroBatchWriteSupport =>
-        val writer = s.createMicroBatchWriter(
+      case s: StreamWriteSupport =>
+        val writer = s.createStreamWriter(
           s"$runId",
-          currentBatchId,
           newAttributePlan.schema,
           outputMode,
           new DataSourceV2Options(extraOptions.asJava))
-        assert(writer.isPresent, "microbatch writer must always be present")
-        WriteToDataSourceV2(writer.get, newAttributePlan)
+        if (writer.isInstanceOf[SupportsWriteInternalRow]) {
+          WriteToDataSourceV2(
+            new InternalRowMicroBatchWriter(currentBatchId, writer), newAttributePlan)
+        } else {
+          WriteToDataSourceV2(new MicroBatchWriter(currentBatchId, writer), newAttributePlan)
+        }
       case _ => throw new IllegalArgumentException(s"unknown sink type for $sink")
     }
 
@@ -471,7 +476,7 @@ class MicroBatchExecution(
       SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) {
         sink match {
           case s: Sink => s.addBatch(currentBatchId, nextBatch)
-          case s: MicroBatchWriteSupport =>
+          case _: StreamWriteSupport =>
             // This doesn't accumulate any data - it just forces execution of the microbatch writer.
             nextBatch.collect()
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
index f2aa325..d5ac0bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
@@ -17,15 +17,12 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import java.util.Optional
-
 import org.apache.spark.sql._
-import org.apache.spark.sql.execution.streaming.sources.{ConsoleContinuousWriter, ConsoleMicroBatchWriter, ConsoleWriter}
+import org.apache.spark.sql.execution.streaming.sources.ConsoleWriter
 import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister}
 import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
-import org.apache.spark.sql.sources.v2.streaming.{ContinuousWriteSupport, MicroBatchWriteSupport}
-import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
-import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer
+import org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport
+import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 
@@ -35,26 +32,16 @@ case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame)
 }
 
 class ConsoleSinkProvider extends DataSourceV2
-  with MicroBatchWriteSupport
-  with ContinuousWriteSupport
+  with StreamWriteSupport
   with DataSourceRegister
   with CreatableRelationProvider {
 
-  override def createMicroBatchWriter(
-      queryId: String,
-      batchId: Long,
-      schema: StructType,
-      mode: OutputMode,
-      options: DataSourceV2Options): Optional[DataSourceV2Writer] = {
-    Optional.of(new ConsoleMicroBatchWriter(batchId, schema, options))
-  }
-
-  override def createContinuousWriter(
+  override def createStreamWriter(
       queryId: String,
       schema: StructType,
       mode: OutputMode,
-      options: DataSourceV2Options): Optional[ContinuousWriter] = {
-    Optional.of(new ConsoleContinuousWriter(schema, options))
+      options: DataSourceV2Options): StreamWriter = {
+    new ConsoleWriter(schema, options)
   }
 
   def createRelation(

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 462e7d9..60f880f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -24,17 +24,16 @@ import java.util.function.UnaryOperator
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
 
-import org.apache.spark.{SparkEnv, SparkException}
-import org.apache.spark.sql.{AnalysisException, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentDate, CurrentTimestamp}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, StreamingDataSourceV2Relation, WriteToDataSourceV2}
 import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
 import org.apache.spark.sql.sources.v2.DataSourceV2Options
-import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, ContinuousWriteSupport}
-import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, Offset, PartitionOffset}
-import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
+import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, PartitionOffset}
 import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.{Clock, Utils}
@@ -44,7 +43,7 @@ class ContinuousExecution(
     name: String,
     checkpointRoot: String,
     analyzedPlan: LogicalPlan,
-    sink: ContinuousWriteSupport,
+    sink: StreamWriteSupport,
     trigger: Trigger,
     triggerClock: Clock,
     outputMode: OutputMode,
@@ -195,12 +194,12 @@ class ContinuousExecution(
           "CurrentTimestamp and CurrentDate not yet supported for continuous processing")
     }
 
-    val writer = sink.createContinuousWriter(
+    val writer = sink.createStreamWriter(
       s"$runId",
       triggerLogicalPlan.schema,
       outputMode,
       new DataSourceV2Options(extraOptions.asJava))
-    val withSink = WriteToDataSourceV2(writer.get(), triggerLogicalPlan)
+    val withSink = WriteToDataSourceV2(writer, triggerLogicalPlan)
 
     val reader = withSink.collect {
       case DataSourceV2Relation(_, r: ContinuousReader) => r
@@ -230,7 +229,7 @@ class ContinuousExecution(
     // Use the parent Spark session for the endpoint since it's where this query ID is registered.
     val epochEndpoint =
       EpochCoordinatorRef.create(
-        writer.get(), reader, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get)
+        writer, reader, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get)
     val epochUpdateThread = new Thread(new Runnable {
       override def run: Unit = {
         try {

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
index 90b3584..84d2621 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
@@ -17,17 +17,14 @@
 
 package org.apache.spark.sql.execution.streaming.continuous
 
-import java.util.concurrent.atomic.AtomicLong
-
 import scala.collection.mutable
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
 import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, PartitionOffset}
-import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
+import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
 import org.apache.spark.util.RpcUtils
 
@@ -85,7 +82,7 @@ private[sql] object EpochCoordinatorRef extends Logging {
    * Create a reference to a new [[EpochCoordinator]].
    */
   def create(
-      writer: ContinuousWriter,
+      writer: StreamWriter,
       reader: ContinuousReader,
       query: ContinuousExecution,
       epochCoordinatorId: String,
@@ -118,7 +115,7 @@ private[sql] object EpochCoordinatorRef extends Logging {
  *   have both committed and reported an end offset for a given epoch.
  */
 private[continuous] class EpochCoordinator(
-    writer: ContinuousWriter,
+    writer: StreamWriter,
     reader: ContinuousReader,
     query: ContinuousExecution,
     startEpoch: Long,

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
index 6fb61df..7c1700f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
@@ -20,14 +20,13 @@ package org.apache.spark.sql.execution.streaming.sources
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.sources.v2.DataSourceV2Options
-import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
-import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, WriterCommitMessage}
+import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
+import org.apache.spark.sql.sources.v2.writer.{DataWriterFactory, WriterCommitMessage}
 import org.apache.spark.sql.types.StructType
 
 /** Common methods used to create writes for the the console sink */
-trait ConsoleWriter extends Logging {
-
-  def options: DataSourceV2Options
+class ConsoleWriter(schema: StructType, options: DataSourceV2Options)
+    extends StreamWriter with Logging {
 
   // Number of rows to display, by default 20 rows
   protected val numRowsToShow = options.getInt("numRows", 20)
@@ -40,14 +39,20 @@ trait ConsoleWriter extends Logging {
 
   def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory
 
-  def abort(messages: Array[WriterCommitMessage]): Unit = {}
+  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
+    // We have to print a "Batch" label for the epoch for compatibility with the pre-data source V2
+    // behavior.
+    printRows(messages, schema, s"Batch: $epochId")
+  }
+
+  def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
 
   protected def printRows(
       commitMessages: Array[WriterCommitMessage],
       schema: StructType,
       printMessage: String): Unit = {
     val rows = commitMessages.collect {
-      case PackedRowCommitMessage(rows) => rows
+      case PackedRowCommitMessage(rs) => rs
     }.flatten
 
     // scalastyle:off println
@@ -59,46 +64,8 @@ trait ConsoleWriter extends Logging {
       .createDataFrame(spark.sparkContext.parallelize(rows), schema)
       .show(numRowsToShow, isTruncated)
   }
-}
-
-
-/**
- * A [[DataSourceV2Writer]] that collects results from a micro-batch query to the driver and
- * prints them in the console. Created by
- * [[org.apache.spark.sql.execution.streaming.ConsoleSinkProvider]].
- *
- * This sink should not be used for production, as it requires sending all rows to the driver
- * and does not support recovery.
- */
-class ConsoleMicroBatchWriter(batchId: Long, schema: StructType, val options: DataSourceV2Options)
-  extends DataSourceV2Writer with ConsoleWriter {
-
-  override def commit(messages: Array[WriterCommitMessage]): Unit = {
-    printRows(messages, schema, s"Batch: $batchId")
-  }
-
-  override def toString(): String = {
-    s"ConsoleMicroBatchWriter[numRows=$numRowsToShow, truncate=$isTruncated]"
-  }
-}
-
-
-/**
- * A [[DataSourceV2Writer]] that collects results from a continuous query to the driver and
- * prints them in the console. Created by
- * [[org.apache.spark.sql.execution.streaming.ConsoleSinkProvider]].
- *
- * This sink should not be used for production, as it requires sending all rows to the driver
- * and does not support recovery.
- */
-class ConsoleContinuousWriter(schema: StructType, val options: DataSourceV2Options)
-  extends ContinuousWriter with ConsoleWriter {
-
-  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
-    printRows(messages, schema, s"Continuous processing epoch $epochId")
-  }
 
   override def toString(): String = {
-    s"ConsoleContinuousWriter[numRows=$numRowsToShow, truncate=$isTruncated]"
+    s"ConsoleWriter[numRows=$numRowsToShow, truncate=$isTruncated]"
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWriter.scala
new file mode 100644
index 0000000..d7f3ba8
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWriter.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
+import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, SupportsWriteInternalRow, WriterCommitMessage}
+
+/**
+ * A [[DataSourceV2Writer]] used to hook V2 stream writers into a microbatch plan. It implements
+ * the non-streaming interface, forwarding the batch ID determined at construction to a wrapped
+ * streaming writer.
+ */
+class MicroBatchWriter(batchId: Long, writer: StreamWriter) extends DataSourceV2Writer {
+  override def commit(messages: Array[WriterCommitMessage]): Unit = {
+    writer.commit(batchId, messages)
+  }
+
+  override def abort(messages: Array[WriterCommitMessage]): Unit = writer.abort(batchId, messages)
+
+  override def createWriterFactory(): DataWriterFactory[Row] = writer.createWriterFactory()
+}
+
+class InternalRowMicroBatchWriter(batchId: Long, writer: StreamWriter)
+  extends DataSourceV2Writer with SupportsWriteInternalRow {
+  override def commit(messages: Array[WriterCommitMessage]): Unit = {
+    writer.commit(batchId, messages)
+  }
+
+  override def abort(messages: Array[WriterCommitMessage]): Unit = writer.abort(batchId, messages)
+
+  override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] =
+    writer match {
+      case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory()
+      case _ => throw new IllegalStateException(
+        "InternalRowMicroBatchWriter should only be created with base writer support")
+    }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
index da7c31c..ce55e44 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
@@ -30,8 +30,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update}
 import org.apache.spark.sql.execution.streaming.Sink
 import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
-import org.apache.spark.sql.sources.v2.streaming.{ContinuousWriteSupport, MicroBatchWriteSupport}
-import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
+import org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport
+import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
 import org.apache.spark.sql.sources.v2.writer._
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
@@ -40,24 +40,13 @@ import org.apache.spark.sql.types.StructType
  * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit
  * tests and does not provide durability.
  */
-class MemorySinkV2 extends DataSourceV2
-  with MicroBatchWriteSupport with ContinuousWriteSupport with Logging {
-
-  override def createMicroBatchWriter(
+class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with Logging {
+  override def createStreamWriter(
       queryId: String,
-      batchId: Long,
       schema: StructType,
       mode: OutputMode,
-      options: DataSourceV2Options): java.util.Optional[DataSourceV2Writer] = {
-    java.util.Optional.of(new MemoryWriter(this, batchId, mode))
-  }
-
-  override def createContinuousWriter(
-      queryId: String,
-      schema: StructType,
-      mode: OutputMode,
-      options: DataSourceV2Options): java.util.Optional[ContinuousWriter] = {
-    java.util.Optional.of(new ContinuousMemoryWriter(this, mode))
+      options: DataSourceV2Options): StreamWriter = {
+    new MemoryStreamWriter(this, mode)
   }
 
   private case class AddedData(batchId: Long, data: Array[Row])
@@ -141,8 +130,8 @@ class MemoryWriter(sink: MemorySinkV2, batchId: Long, outputMode: OutputMode)
   }
 }
 
-class ContinuousMemoryWriter(val sink: MemorySinkV2, outputMode: OutputMode)
-  extends ContinuousWriter {
+class MemoryStreamWriter(val sink: MemorySinkV2, outputMode: OutputMode)
+  extends StreamWriter {
 
   override def createWriterFactory: MemoryWriterFactory = MemoryWriterFactory(outputMode)
 
@@ -153,7 +142,7 @@ class ContinuousMemoryWriter(val sink: MemorySinkV2, outputMode: OutputMode)
     sink.write(epochId, outputMode, newRows)
   }
 
-  override def abort(messages: Array[WriterCommitMessage]): Unit = {
+  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
     // Don't accept any of the new input.
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index d24f0dd..3b5b30d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
 import org.apache.spark.sql.execution.streaming.sources.{MemoryPlanV2, MemorySinkV2}
-import org.apache.spark.sql.sources.v2.streaming.{ContinuousWriteSupport, MicroBatchWriteSupport}
+import org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport
 
 /**
  * Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems,
@@ -281,11 +281,9 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
         trigger = trigger)
     } else {
       val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
-      val sink = (ds.newInstance(), trigger) match {
-        case (w: ContinuousWriteSupport, _: ContinuousTrigger) => w
-        case (_, _: ContinuousTrigger) => throw new UnsupportedOperationException(
-            s"Data source $source does not support continuous writing")
-        case (w: MicroBatchWriteSupport, _) => w
+      val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",")
+      val sink = ds.newInstance() match {
+        case w: StreamWriteSupport if !disabledSources.contains(w.getClass.getCanonicalName) => w
         case _ =>
           val ds = DataSource(
             df.sparkSession,

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 4b27e0d..fdd709c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, ContinuousTrigger}
 import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.streaming.{ContinuousWriteSupport, MicroBatchWriteSupport}
+import org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport
 import org.apache.spark.util.{Clock, SystemClock, Utils}
 
 /**
@@ -241,7 +241,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
     }
 
     (sink, trigger) match {
-      case (v2Sink: ContinuousWriteSupport, trigger: ContinuousTrigger) =>
+      case (v2Sink: StreamWriteSupport, trigger: ContinuousTrigger) =>
         UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode)
         new StreamingQueryWrapper(new ContinuousExecution(
           sparkSession,
@@ -254,7 +254,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
           outputMode,
           extraOptions,
           deleteCheckpointOnStop))
-      case (_: MicroBatchWriteSupport, _) | (_: Sink, _) =>
+      case _ =>
         new StreamingQueryWrapper(new MicroBatchExecution(
           sparkSession,
           userSpecifiedName.orNull,
@@ -266,9 +266,6 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
           outputMode,
           extraOptions,
           deleteCheckpointOnStop))
-      case (_: ContinuousWriteSupport, t) if !t.isInstanceOf[ContinuousTrigger] =>
-        throw new AnalysisException(
-          "Sink only supports continuous writes, but a continuous trigger was not specified.")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index a0b25b4..46b38be 100644
--- a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -9,7 +9,6 @@ org.apache.spark.sql.streaming.sources.FakeReadMicroBatchOnly
 org.apache.spark.sql.streaming.sources.FakeReadContinuousOnly
 org.apache.spark.sql.streaming.sources.FakeReadBothModes
 org.apache.spark.sql.streaming.sources.FakeReadNeitherMode
-org.apache.spark.sql.streaming.sources.FakeWriteMicroBatchOnly
-org.apache.spark.sql.streaming.sources.FakeWriteContinuousOnly
-org.apache.spark.sql.streaming.sources.FakeWriteBothModes
-org.apache.spark.sql.streaming.sources.FakeWriteNeitherMode
+org.apache.spark.sql.streaming.sources.FakeWrite
+org.apache.spark.sql.streaming.sources.FakeNoWrite
+org.apache.spark.sql.streaming.sources.FakeWriteV1Fallback

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
index 00d4f0b..9be22d9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
@@ -40,7 +40,7 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter {
 
   test("continuous writer") {
     val sink = new MemorySinkV2
-    val writer = new ContinuousMemoryWriter(sink, OutputMode.Append())
+    val writer = new MemoryStreamWriter(sink, OutputMode.Append())
     writer.commit(0,
       Array(
         MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))),

http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index f152174..d4f8bae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -19,18 +19,18 @@ package org.apache.spark.sql.streaming.sources
 
 import java.util.Optional
 
-import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 import org.apache.spark.sql.execution.datasources.DataSource
-import org.apache.spark.sql.execution.streaming.{LongOffset, RateStreamOffset}
+import org.apache.spark.sql.execution.streaming.{RateStreamOffset, Sink, StreamingQueryWrapper}
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
-import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
+import org.apache.spark.sql.sources.v2.DataSourceV2Options
 import org.apache.spark.sql.sources.v2.reader.ReadTask
-import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, ContinuousWriteSupport, MicroBatchReadSupport, MicroBatchWriteSupport}
+import org.apache.spark.sql.sources.v2.streaming._
 import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, MicroBatchReader, Offset, PartitionOffset}
-import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
-import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer
-import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
+import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
@@ -64,23 +64,12 @@ trait FakeContinuousReadSupport extends ContinuousReadSupport {
       options: DataSourceV2Options): ContinuousReader = FakeReader()
 }
 
-trait FakeMicroBatchWriteSupport extends MicroBatchWriteSupport {
-  def createMicroBatchWriter(
+trait FakeStreamWriteSupport extends StreamWriteSupport {
+  override def createStreamWriter(
       queryId: String,
-      epochId: Long,
       schema: StructType,
       mode: OutputMode,
-      options: DataSourceV2Options): Optional[DataSourceV2Writer] = {
-    throw new IllegalStateException("fake sink - cannot actually write")
-  }
-}
-
-trait FakeContinuousWriteSupport extends ContinuousWriteSupport {
-  def createContinuousWriter(
-      queryId: String,
-      schema: StructType,
-      mode: OutputMode,
-      options: DataSourceV2Options): Optional[ContinuousWriter] = {
+      options: DataSourceV2Options): StreamWriter = {
     throw new IllegalStateException("fake sink - cannot actually write")
   }
 }
@@ -102,23 +91,36 @@ class FakeReadNeitherMode extends DataSourceRegister {
   override def shortName(): String = "fake-read-neither-mode"
 }
 
-class FakeWriteMicroBatchOnly extends DataSourceRegister with FakeMicroBatchWriteSupport {
-  override def shortName(): String = "fake-write-microbatch-only"
+class FakeWrite extends DataSourceRegister with FakeStreamWriteSupport {
+  override def shortName(): String = "fake-write-microbatch-continuous"
 }
 
-class FakeWriteContinuousOnly extends DataSourceRegister with FakeContinuousWriteSupport {
-  override def shortName(): String = "fake-write-continuous-only"
+class FakeNoWrite extends DataSourceRegister {
+  override def shortName(): String = "fake-write-neither-mode"
 }
 
-class FakeWriteBothModes extends DataSourceRegister
-    with FakeMicroBatchWriteSupport with FakeContinuousWriteSupport {
-  override def shortName(): String = "fake-write-microbatch-continuous"
+
+case class FakeWriteV1FallbackException() extends Exception
+
+class FakeSink extends Sink {
+  override def addBatch(batchId: Long, data: DataFrame): Unit = {}
 }
 
-class FakeWriteNeitherMode extends DataSourceRegister {
-  override def shortName(): String = "fake-write-neither-mode"
+class FakeWriteV1Fallback extends DataSourceRegister
+  with FakeStreamWriteSupport with StreamSinkProvider {
+
+  override def createSink(
+    sqlContext: SQLContext,
+    parameters: Map[String, String],
+    partitionColumns: Seq[String],
+    outputMode: OutputMode): Sink = {
+    new FakeSink()
+  }
+
+  override def shortName(): String = "fake-write-v1-fallback"
 }
 
+
 class StreamingDataSourceV2Suite extends StreamTest {
 
   override def beforeAll(): Unit = {
@@ -133,8 +135,6 @@ class StreamingDataSourceV2Suite extends StreamTest {
     "fake-read-microbatch-continuous",
     "fake-read-neither-mode")
   val writeFormats = Seq(
-    "fake-write-microbatch-only",
-    "fake-write-continuous-only",
     "fake-write-microbatch-continuous",
     "fake-write-neither-mode")
   val triggers = Seq(
@@ -151,6 +151,7 @@ class StreamingDataSourceV2Suite extends StreamTest {
       .trigger(trigger)
       .start()
     query.stop()
+    query
   }
 
   private def testNegativeCase(
@@ -184,6 +185,24 @@ class StreamingDataSourceV2Suite extends StreamTest {
     }
   }
 
+  test("disabled v2 write") {
+    // Ensure the V2 path works normally and generates a V2 sink..
+    val v2Query = testPositiveCase(
+      "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once())
+    assert(v2Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
+      .isInstanceOf[FakeWriteV1Fallback])
+
+    // Ensure we create a V1 sink with the config. Note the config is a comma separated
+    // list, including other fake entries.
+    val fullSinkName = "org.apache.spark.sql.streaming.sources.FakeWriteV1Fallback"
+    withSQLConf(SQLConf.DISABLED_V2_STREAMING_WRITERS.key -> s"a,b,c,test,$fullSinkName,d,e") {
+      val v1Query = testPositiveCase(
+        "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once())
+      assert(v1Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
+        .isInstanceOf[FakeSink])
+    }
+  }
+
   // Get a list of (read, write, trigger) tuples for test cases.
   val cases = readFormats.flatMap { read =>
     writeFormats.flatMap { write =>
@@ -199,12 +218,12 @@ class StreamingDataSourceV2Suite extends StreamTest {
       val writeSource = DataSource.lookupDataSource(write, spark.sqlContext.conf).newInstance()
       (readSource, writeSource, trigger) match {
         // Valid microbatch queries.
-        case (_: MicroBatchReadSupport, _: MicroBatchWriteSupport, t)
+        case (_: MicroBatchReadSupport, _: StreamWriteSupport, t)
           if !t.isInstanceOf[ContinuousTrigger] =>
           testPositiveCase(read, write, trigger)
 
         // Valid continuous queries.
-        case (_: ContinuousReadSupport, _: ContinuousWriteSupport, _: ContinuousTrigger) =>
+        case (_: ContinuousReadSupport, _: StreamWriteSupport, _: ContinuousTrigger) =>
           testPositiveCase(read, write, trigger)
 
         // Invalid - can't read at all
@@ -214,31 +233,18 @@ class StreamingDataSourceV2Suite extends StreamTest {
           testNegativeCase(read, write, trigger,
             s"Data source $read does not support streamed reading")
 
-        // Invalid - trigger is continuous but writer is not
-        case (_, w, _: ContinuousTrigger) if !w.isInstanceOf[ContinuousWriteSupport] =>
-          testNegativeCase(read, write, trigger,
-            s"Data source $write does not support continuous writing")
-
-        // Invalid - can't write at all
-        case (_, w, _)
-            if !w.isInstanceOf[MicroBatchWriteSupport]
-              && !w.isInstanceOf[ContinuousWriteSupport] =>
+        // Invalid - can't write
+        case (_, w, _) if !w.isInstanceOf[StreamWriteSupport] =>
           testNegativeCase(read, write, trigger,
             s"Data source $write does not support streamed writing")
 
-        // Invalid - trigger and writer are continuous but reader is not
-        case (r, _: ContinuousWriteSupport, _: ContinuousTrigger)
+        // Invalid - trigger is continuous but reader is not
+        case (r, _: StreamWriteSupport, _: ContinuousTrigger)
             if !r.isInstanceOf[ContinuousReadSupport] =>
           testNegativeCase(read, write, trigger,
             s"Data source $read does not support continuous processing")
 
-        // Invalid - trigger is microbatch but writer is not
-        case (_, w, t)
-            if !w.isInstanceOf[MicroBatchWriteSupport] && !t.isInstanceOf[ContinuousTrigger] =>
-          testNegativeCase(read, write, trigger,
-            s"Data source $write does not support streamed writing")
-
-        // Invalid - trigger and writer are microbatch but reader is not
+        // Invalid - trigger is microbatch but reader is not
         case (r, _, t)
            if !r.isInstanceOf[MicroBatchReadSupport] && !t.isInstanceOf[ContinuousTrigger] =>
           testPostCreationNegativeCase(read, write, trigger,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org