You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/04/26 07:44:52 UTC

[spark] branch master updated: [SPARK-27190][SQL] add table capability for streaming

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 85fd552  [SPARK-27190][SQL] add table capability for streaming
85fd552 is described below

commit 85fd552ed6304967f25574baef3cf9657957bcb1
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Fri Apr 26 15:44:23 2019 +0800

    [SPARK-27190][SQL] add table capability for streaming
    
    ## What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/24012 , to add the corresponding capabilities for streaming.
    
    ## How was this patch tested?
    
    existing tests
    
    Closes #24129 from cloud-fan/capability.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/kafka010/KafkaSourceProvider.scala   |  11 +-
 .../sql/sources/v2/SupportsContinuousRead.java     |  35 ------
 .../sql/sources/v2/SupportsMicroBatchRead.java     |  35 ------
 .../sql/sources/v2/SupportsStreamingWrite.java     |  34 ------
 .../spark/sql/sources/v2/TableCapability.java      |  19 +++
 .../apache/spark/sql/sources/v2/reader/Scan.java   |  10 +-
 .../datasources/noop/NoopDataSource.scala          |   7 +-
 .../v2/V2StreamingScanSupportCheck.scala           |  64 ++++++++++
 .../execution/streaming/MicroBatchExecution.scala  |  61 +++++-----
 .../sql/execution/streaming/StreamExecution.scala  |   4 +-
 .../spark/sql/execution/streaming/console.scala    |   9 +-
 .../streaming/continuous/ContinuousExecution.scala |  22 ++--
 .../spark/sql/execution/streaming/memory.scala     |   9 +-
 .../streaming/sources/ForeachWriterTable.scala     |  12 +-
 .../streaming/sources/RateStreamProvider.scala     |   9 +-
 .../sources/TextSocketSourceProvider.scala         |   9 +-
 .../sql/execution/streaming/sources/memoryV2.scala |  10 +-
 .../sql/internal/BaseSessionStateBuilder.scala     |   3 +-
 .../spark/sql/streaming/DataStreamReader.scala     |   5 +-
 .../spark/sql/streaming/DataStreamWriter.scala     |   7 +-
 .../sql/streaming/StreamingQueryManager.scala      |   6 +-
 .../v2/V2StreamingScanSupportCheckSuite.scala      | 130 +++++++++++++++++++++
 .../sources/StreamingDataSourceV2Suite.scala       | 106 ++++++++++-------
 .../spark/sql/hive/HiveSessionStateBuilder.scala   |   3 +-
 24 files changed, 389 insertions(+), 231 deletions(-)

diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index f7a2032..bb76a30 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
-import java.util.{Collections, Locale, UUID}
+import java.util.{Locale, UUID}
 
 import scala.collection.JavaConverters._
 
@@ -29,9 +29,10 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
 import org.apache.spark.internal.Logging
 import org.apache.spark.kafka010.KafkaConfigUpdater
 import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
-import org.apache.spark.sql.execution.streaming.{Sink, Source}
+import org.apache.spark.sql.execution.streaming.{BaseStreamingSink, Sink, Source}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.TableCapability._
 import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
 import org.apache.spark.sql.sources.v2.writer.WriteBuilder
@@ -353,13 +354,15 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
   }
 
   class KafkaTable(strategy: => ConsumerStrategy) extends Table
-    with SupportsMicroBatchRead with SupportsContinuousRead with SupportsStreamingWrite {
+    with SupportsRead with SupportsWrite with BaseStreamingSink {
 
     override def name(): String = s"Kafka $strategy"
 
     override def schema(): StructType = KafkaOffsetReader.kafkaSchema
 
-    override def capabilities(): ju.Set[TableCapability] = Collections.emptySet()
+    override def capabilities(): ju.Set[TableCapability] = {
+      Set(MICRO_BATCH_READ, CONTINUOUS_READ, STREAMING_WRITE).asJava
+    }
 
     override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
       () => new KafkaScan(options)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java
deleted file mode 100644
index 5cc9848..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.sources.v2.reader.Scan;
-import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
-import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-
-/**
- * An empty mix-in interface for {@link Table}, to indicate this table supports streaming scan with
- * continuous mode.
- * <p>
- * If a {@link Table} implements this interface, the
- * {@link SupportsRead#newScanBuilder(CaseInsensitiveStringMap)} must return a {@link ScanBuilder}
- * that builds {@link Scan} with {@link Scan#toContinuousStream(String)} implemented.
- * </p>
- */
-@Evolving
-public interface SupportsContinuousRead extends SupportsRead { }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java
deleted file mode 100644
index c98f3f1..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.sources.v2.reader.Scan;
-import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
-import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-
-/**
- * An empty mix-in interface for {@link Table}, to indicate this table supports streaming scan with
- * micro-batch mode.
- * <p>
- * If a {@link Table} implements this interface, the
- * {@link SupportsRead#newScanBuilder(CaseInsensitiveStringMap)} must return a {@link ScanBuilder}
- * that builds {@link Scan} with {@link Scan#toMicroBatchStream(String)} implemented.
- * </p>
- */
-@Evolving
-public interface SupportsMicroBatchRead extends SupportsRead { }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java
deleted file mode 100644
index ac11e48..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
-import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
-import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-
-/**
- * An empty mix-in interface for {@link Table}, to indicate this table supports streaming write.
- * <p>
- * If a {@link Table} implements this interface, the
- * {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)} must return a
- * {@link WriteBuilder} with {@link WriteBuilder#buildForStreaming()} implemented.
- * </p>
- */
-@Evolving
-public interface SupportsStreamingWrite extends SupportsWrite, BaseStreamingSink { }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java
index 8d3fdcd..4640c61 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java
@@ -34,6 +34,16 @@ public enum TableCapability {
   BATCH_READ,
 
   /**
+   * Signals that the table supports reads in micro-batch streaming execution mode.
+   */
+  MICRO_BATCH_READ,
+
+  /**
+   * Signals that the table supports reads in continuous streaming execution mode.
+   */
+  CONTINUOUS_READ,
+
+  /**
    * Signals that the table supports append writes in batch execution mode.
    * <p>
    * Tables that return this capability must support appending data and may also support additional
@@ -43,6 +53,15 @@ public enum TableCapability {
   BATCH_WRITE,
 
   /**
+   * Signals that the table supports append writes in streaming execution mode.
+   * <p>
+   * Tables that return this capability must support appending data and may also support additional
+   * write modes, like {@link #TRUNCATE}, {@link #OVERWRITE_BY_FILTER}, and
+   * {@link #OVERWRITE_DYNAMIC}.
+   */
+  STREAMING_WRITE,
+
+  /**
    * Signals that the table can be truncated in a write operation.
    * <p>
    * Truncating a table removes all existing rows.
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java
index f6085b9..c3964e2 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java
@@ -21,8 +21,6 @@ import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousStream;
 import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream;
 import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.sources.v2.SupportsContinuousRead;
-import org.apache.spark.sql.sources.v2.SupportsMicroBatchRead;
 import org.apache.spark.sql.sources.v2.Table;
 import org.apache.spark.sql.sources.v2.TableCapability;
 
@@ -74,8 +72,8 @@ public interface Scan {
   /**
    * Returns the physical representation of this scan for streaming query with micro-batch mode. By
    * default this method throws exception, data sources must overwrite this method to provide an
-   * implementation, if the {@link Table} that creates this scan implements
-   * {@link SupportsMicroBatchRead}.
+   * implementation, if the {@link Table} that creates this scan returns
+   * {@link TableCapability#MICRO_BATCH_READ} support in its {@link Table#capabilities()}.
    *
    * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
    *                           recovery. Data streams for the same logical source in the same query
@@ -90,8 +88,8 @@ public interface Scan {
   /**
    * Returns the physical representation of this scan for streaming query with continuous mode. By
    * default this method throws exception, data sources must overwrite this method to provide an
-   * implementation, if the {@link Table} that creates this scan implements
-   * {@link SupportsContinuousRead}.
+   * implementation, if the {@link Table} that creates this scan returns
+   * {@link TableCapability#CONTINUOUS_READ} support in its {@link Table#capabilities()}.
    *
    * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
    *                           recovery. Data streams for the same logical source in the same query
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
index 96a78d3..c8b2f65 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.streaming.BaseStreamingSink
 import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.writer._
@@ -39,11 +40,13 @@ class NoopDataSource extends TableProvider with DataSourceRegister {
   override def getTable(options: CaseInsensitiveStringMap): Table = NoopTable
 }
 
-private[noop] object NoopTable extends Table with SupportsWrite with SupportsStreamingWrite {
+private[noop] object NoopTable extends Table with SupportsWrite with BaseStreamingSink {
   override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = NoopWriteBuilder
   override def name(): String = "noop-table"
   override def schema(): StructType = new StructType()
-  override def capabilities(): util.Set[TableCapability] = Set(TableCapability.BATCH_WRITE).asJava
+  override def capabilities(): util.Set[TableCapability] = {
+    Set(TableCapability.BATCH_WRITE, TableCapability.STREAMING_WRITE).asJava
+  }
 }
 
 private[noop] object NoopWriteBuilder extends WriteBuilder
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2StreamingScanSupportCheck.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2StreamingScanSupportCheck.scala
new file mode 100644
index 0000000..c029acc
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2StreamingScanSupportCheck.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2}
+import org.apache.spark.sql.sources.v2.TableCapability.{CONTINUOUS_READ, MICRO_BATCH_READ}
+
+/**
+ * This rules adds some basic table capability check for streaming scan, without knowing the actual
+ * streaming execution mode.
+ */
+object V2StreamingScanSupportCheck extends (LogicalPlan => Unit) {
+  import DataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): Unit = {
+    plan.foreach {
+      case r: StreamingRelationV2 if !r.table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) =>
+        throw new AnalysisException(
+          s"Table ${r.table.name()} does not support either micro-batch or continuous scan.")
+      case _ =>
+    }
+
+    val streamingSources = plan.collect {
+      case r: StreamingRelationV2 => r.table
+    }
+    val v1StreamingRelations = plan.collect {
+      case r: StreamingRelation => r
+    }
+
+    if (streamingSources.length + v1StreamingRelations.length > 1) {
+      val allSupportsMicroBatch = streamingSources.forall(_.supports(MICRO_BATCH_READ))
+      // v1 streaming data source only supports micro-batch.
+      val allSupportsContinuous = streamingSources.forall(_.supports(CONTINUOUS_READ)) &&
+        v1StreamingRelations.isEmpty
+      if (!allSupportsMicroBatch && !allSupportsContinuous) {
+        val microBatchSources =
+          streamingSources.filter(_.supports(MICRO_BATCH_READ)).map(_.name()) ++
+            v1StreamingRelations.map(_.sourceName)
+        val continuousSources = streamingSources.filter(_.supports(CONTINUOUS_READ)).map(_.name())
+        throw new AnalysisException(
+          "The streaming sources in a query do not have a common supported execution mode.\n" +
+            "Sources support micro-batch: " + microBatchSources.mkString(", ") + "\n" +
+            "Sources support continuous: " + continuousSources.mkString(", "))
+      }
+    }
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index fdd80cc..d9fe836 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable.{Map => MutableMap}
 
 import org.apache.spark.sql.{Dataset, SparkSession}
@@ -78,6 +77,7 @@ class MicroBatchExecution(
     val disabledSources =
       sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders.split(",")
 
+    import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
     val _logicalPlan = analyzedPlan.transform {
       case streamingRelation@StreamingRelation(dataSourceV1, sourceName, output) =>
         toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
@@ -88,31 +88,33 @@ class MicroBatchExecution(
           logInfo(s"Using Source [$source] from DataSourceV1 named '$sourceName' [$dataSourceV1]")
           StreamingExecutionRelation(source, output)(sparkSession)
         })
-      case s @ StreamingRelationV2(ds, dsName, table: SupportsMicroBatchRead, options, output, _)
-          if !disabledSources.contains(ds.getClass.getCanonicalName) =>
-        v2ToRelationMap.getOrElseUpdate(s, {
-          // Materialize source to avoid creating it in every batch
-          val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
-          nextSourceId += 1
-          logInfo(s"Reading table [$table] from DataSourceV2 named '$dsName' [$ds]")
-          // TODO: operator pushdown.
-          val scan = table.newScanBuilder(options).build()
-          val stream = scan.toMicroBatchStream(metadataPath)
-          StreamingDataSourceV2Relation(output, scan, stream)
-        })
-      case s @ StreamingRelationV2(ds, dsName, _, _, output, v1Relation) =>
-        v2ToExecutionRelationMap.getOrElseUpdate(s, {
-          // Materialize source to avoid creating it in every batch
-          val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
-          if (v1Relation.isEmpty) {
-            throw new UnsupportedOperationException(
-              s"Data source $dsName does not support microbatch processing.")
-          }
-          val source = v1Relation.get.dataSource.createSource(metadataPath)
-          nextSourceId += 1
-          logInfo(s"Using Source [$source] from DataSourceV2 named '$dsName' [$ds]")
-          StreamingExecutionRelation(source, output)(sparkSession)
-        })
+
+      case s @ StreamingRelationV2(src, srcName, table: SupportsRead, options, output, v1) =>
+        val v2Disabled = disabledSources.contains(src.getClass.getCanonicalName)
+        if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) {
+          v2ToRelationMap.getOrElseUpdate(s, {
+            // Materialize source to avoid creating it in every batch
+            val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
+            nextSourceId += 1
+            logInfo(s"Reading table [$table] from DataSourceV2 named '$srcName' [$src]")
+            // TODO: operator pushdown.
+            val scan = table.newScanBuilder(options).build()
+            val stream = scan.toMicroBatchStream(metadataPath)
+            StreamingDataSourceV2Relation(output, scan, stream)
+          })
+        } else if (v1.isEmpty) {
+          throw new UnsupportedOperationException(
+            s"Data source $srcName does not support microbatch processing.")
+        } else {
+          v2ToExecutionRelationMap.getOrElseUpdate(s, {
+            // Materialize source to avoid creating it in every batch
+            val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
+            val source = v1.get.dataSource.createSource(metadataPath)
+            nextSourceId += 1
+            logInfo(s"Using Source [$source] from DataSourceV2 named '$srcName' [$src]")
+            StreamingExecutionRelation(source, output)(sparkSession)
+          })
+        }
     }
     sources = _logicalPlan.collect {
       // v1 source
@@ -122,8 +124,9 @@ class MicroBatchExecution(
     }
     uniqueSources = sources.distinct
 
+    // TODO (SPARK-27484): we should add the writing node before the plan is analyzed.
     sink match {
-      case s: SupportsStreamingWrite =>
+      case s: SupportsWrite =>
         val streamingWrite = createStreamingWrite(s, extraOptions, _logicalPlan)
         WriteToMicroBatchDataSource(streamingWrite, _logicalPlan)
 
@@ -519,7 +522,7 @@ class MicroBatchExecution(
 
     val triggerLogicalPlan = sink match {
       case _: Sink => newAttributePlan
-      case _: SupportsStreamingWrite =>
+      case _: SupportsWrite =>
         newAttributePlan.asInstanceOf[WriteToMicroBatchDataSource].createPlan(currentBatchId)
       case _ => throw new IllegalArgumentException(s"unknown sink type for $sink")
     }
@@ -550,7 +553,7 @@ class MicroBatchExecution(
       SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) {
         sink match {
           case s: Sink => s.addBatch(currentBatchId, nextBatch)
-          case _: SupportsStreamingWrite =>
+          case _: SupportsWrite =>
             // This doesn't accumulate any data - it just forces execution of the microbatch writer.
             nextBatch.collect()
         }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index cc44193..fd95961 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.command.StreamingExplainCommand
 import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.SupportsStreamingWrite
+import org.apache.spark.sql.sources.v2.SupportsWrite
 import org.apache.spark.sql.sources.v2.writer.SupportsTruncate
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
 import org.apache.spark.sql.streaming._
@@ -582,7 +582,7 @@ abstract class StreamExecution(
   }
 
   protected def createStreamingWrite(
-      table: SupportsStreamingWrite,
+      table: SupportsWrite,
       options: Map[String, String],
       inputPlan: LogicalPlan): StreamingWrite = {
     val writeBuilder = table.newWriteBuilder(new CaseInsensitiveStringMap(options.asJava))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
index 884b92a..c7161d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.sql.execution.streaming
 
 import java.util
-import java.util.Collections
+
+import scala.collection.JavaConverters._
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.streaming.sources.ConsoleWrite
@@ -60,13 +61,15 @@ class ConsoleSinkProvider extends TableProvider
   def shortName(): String = "console"
 }
 
-object ConsoleTable extends Table with SupportsStreamingWrite {
+object ConsoleTable extends Table with SupportsWrite with BaseStreamingSink {
 
   override def name(): String = "console"
 
   override def schema(): StructType = StructType(Nil)
 
-  override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
+  override def capabilities(): util.Set[TableCapability] = {
+    Set(TableCapability.STREAMING_WRITE).asJava
+  }
 
   override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
     new WriteBuilder with SupportsTruncate {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 4c13e72..a1fb212 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
 import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _}
 import org.apache.spark.sql.sources.v2
-import org.apache.spark.sql.sources.v2.{SupportsContinuousRead, SupportsStreamingWrite}
+import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, TableCapability}
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
 import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
 import org.apache.spark.util.Clock
@@ -42,14 +42,14 @@ class ContinuousExecution(
     name: String,
     checkpointRoot: String,
     analyzedPlan: LogicalPlan,
-    sink: SupportsStreamingWrite,
+    sink: SupportsWrite,
     trigger: Trigger,
     triggerClock: Clock,
     outputMode: OutputMode,
     extraOptions: Map[String, String],
     deleteCheckpointOnStop: Boolean)
   extends StreamExecution(
-    sparkSession, name, checkpointRoot, analyzedPlan, sink,
+    sparkSession, name, checkpointRoot, analyzedPlan, sink.asInstanceOf[BaseStreamingSink],
     trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
 
   @volatile protected var sources: Seq[ContinuousStream] = Seq()
@@ -63,22 +63,23 @@ class ContinuousExecution(
   override val logicalPlan: WriteToContinuousDataSource = {
     val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2Relation]()
     var nextSourceId = 0
+    import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
     val _logicalPlan = analyzedPlan.transform {
-      case s @ StreamingRelationV2(
-          ds, dsName, table: SupportsContinuousRead, options, output, _) =>
+      case s @ StreamingRelationV2(ds, sourceName, table: SupportsRead, options, output, _) =>
+        if (!table.supports(TableCapability.CONTINUOUS_READ)) {
+          throw new UnsupportedOperationException(
+            s"Data source $sourceName does not support continuous processing.")
+        }
+
         v2ToRelationMap.getOrElseUpdate(s, {
           val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
           nextSourceId += 1
-          logInfo(s"Reading table [$table] from DataSourceV2 named '$dsName' [$ds]")
+          logInfo(s"Reading table [$table] from DataSourceV2 named '$sourceName' [$ds]")
           // TODO: operator pushdown.
           val scan = table.newScanBuilder(options).build()
           val stream = scan.toContinuousStream(metadataPath)
           StreamingDataSourceV2Relation(output, scan, stream)
         })
-
-      case StreamingRelationV2(_, sourceName, _, _, _, _) =>
-        throw new UnsupportedOperationException(
-          s"Data source $sourceName does not support continuous processing.")
     }
 
     sources = _logicalPlan.collect {
@@ -86,6 +87,7 @@ class ContinuousExecution(
     }
     uniqueSources = sources.distinct
 
+    // TODO (SPARK-27484): we should add the writing node before the plan is analyzed.
     WriteToContinuousDataSource(
       createStreamingWrite(sink, extraOptions, _logicalPlan), _logicalPlan)
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index bfa9c09..0dcbdd3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -18,10 +18,10 @@
 package org.apache.spark.sql.execution.streaming
 
 import java.util
-import java.util.Collections
 import java.util.concurrent.atomic.AtomicInteger
 import javax.annotation.concurrent.GuardedBy
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.util.control.NonFatal
 
@@ -92,14 +92,15 @@ object MemoryStreamTableProvider extends TableProvider {
   }
 }
 
-class MemoryStreamTable(val stream: MemoryStreamBase[_]) extends Table
-  with SupportsMicroBatchRead with SupportsContinuousRead {
+class MemoryStreamTable(val stream: MemoryStreamBase[_]) extends Table with SupportsRead {
 
   override def name(): String = "MemoryStreamDataSource"
 
   override def schema(): StructType = stream.fullSchema()
 
-  override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
+  override def capabilities(): util.Set[TableCapability] = {
+    Set(TableCapability.MICRO_BATCH_READ, TableCapability.CONTINUOUS_READ).asJava
+  }
 
   override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
     new MemoryStreamScanBuilder(stream)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
index 807e0b1..838ede6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
@@ -18,14 +18,16 @@
 package org.apache.spark.sql.execution.streaming.sources
 
 import java.util
-import java.util.Collections
+
+import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.{ForeachWriter, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.python.PythonForeachWriter
-import org.apache.spark.sql.sources.v2.{SupportsStreamingWrite, Table, TableCapability}
+import org.apache.spark.sql.execution.streaming.BaseStreamingSink
+import org.apache.spark.sql.sources.v2.{SupportsWrite, Table, TableCapability}
 import org.apache.spark.sql.sources.v2.writer.{DataWriter, SupportsTruncate, WriteBuilder, WriterCommitMessage}
 import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
 import org.apache.spark.sql.types.StructType
@@ -42,13 +44,15 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
 case class ForeachWriterTable[T](
     writer: ForeachWriter[T],
     converter: Either[ExpressionEncoder[T], InternalRow => T])
-  extends Table with SupportsStreamingWrite {
+  extends Table with SupportsWrite with BaseStreamingSink {
 
   override def name(): String = "ForeachSink"
 
   override def schema(): StructType = StructType(Nil)
 
-  override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
+  override def capabilities(): util.Set[TableCapability] = {
+    Set(TableCapability.STREAMING_WRITE).asJava
+  }
 
   override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
     new WriteBuilder with SupportsTruncate {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
index a652eeb..f61e9db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.sql.execution.streaming.sources
 
 import java.util
-import java.util.Collections
+
+import scala.collection.JavaConverters._
 
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.sql.SparkSession
@@ -78,7 +79,7 @@ class RateStreamTable(
     rowsPerSecond: Long,
     rampUpTimeSeconds: Long,
     numPartitions: Int)
-  extends Table with SupportsMicroBatchRead with SupportsContinuousRead {
+  extends Table with SupportsRead {
 
   override def name(): String = {
     s"RateStream(rowsPerSecond=$rowsPerSecond, rampUpTimeSeconds=$rampUpTimeSeconds, " +
@@ -87,7 +88,9 @@ class RateStreamTable(
 
   override def schema(): StructType = RateStreamProvider.SCHEMA
 
-  override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
+  override def capabilities(): util.Set[TableCapability] = {
+    Set(TableCapability.MICRO_BATCH_READ, TableCapability.CONTINUOUS_READ).asJava
+  }
 
   override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = () => new Scan {
     override def readSchema(): StructType = RateStreamProvider.SCHEMA
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
index a0452cf..0f807e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql.execution.streaming.sources
 
 import java.text.SimpleDateFormat
 import java.util
-import java.util.{Collections, Locale}
+import java.util.Locale
 
+import scala.collection.JavaConverters._
 import scala.util.{Failure, Success, Try}
 
 import org.apache.spark.internal.Logging
@@ -67,7 +68,7 @@ class TextSocketSourceProvider extends TableProvider with DataSourceRegister wit
 }
 
 class TextSocketTable(host: String, port: Int, numPartitions: Int, includeTimestamp: Boolean)
-  extends Table with SupportsMicroBatchRead with SupportsContinuousRead {
+  extends Table with SupportsRead {
 
   override def name(): String = s"Socket[$host:$port]"
 
@@ -79,7 +80,9 @@ class TextSocketTable(host: String, port: Int, numPartitions: Int, includeTimest
     }
   }
 
-  override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
+  override def capabilities(): util.Set[TableCapability] = {
+    Set(TableCapability.MICRO_BATCH_READ, TableCapability.CONTINUOUS_READ).asJava
+  }
 
   override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = () => new Scan {
     override def readSchema(): StructType = schema()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
index 8eb5de0..219e25c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
@@ -18,9 +18,9 @@
 package org.apache.spark.sql.execution.streaming.sources
 
 import java.util
-import java.util.Collections
 import javax.annotation.concurrent.GuardedBy
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
 import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
 import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink}
-import org.apache.spark.sql.sources.v2.{SupportsStreamingWrite, TableCapability}
+import org.apache.spark.sql.sources.v2.{SupportsWrite, Table, TableCapability}
 import org.apache.spark.sql.sources.v2.writer._
 import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
 import org.apache.spark.sql.types.StructType
@@ -43,13 +43,15 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
  * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit
  * tests and does not provide durability.
  */
-class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Logging {
+class MemorySinkV2 extends Table with SupportsWrite with MemorySinkBase with Logging {
 
   override def name(): String = "MemorySinkV2"
 
   override def schema(): StructType = StructType(Nil)
 
-  override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
+  override def capabilities(): util.Set[TableCapability] = {
+    Set(TableCapability.STREAMING_WRITE).asJava
+  }
 
   override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
     new WriteBuilder with SupportsTruncate {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index 3d29ff3..0275df9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser}
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.v2.V2WriteSupportCheck
+import org.apache.spark.sql.execution.datasources.v2.{V2StreamingScanSupportCheck, V2WriteSupportCheck}
 import org.apache.spark.sql.streaming.StreamingQueryManager
 import org.apache.spark.sql.util.ExecutionListenerManager
 
@@ -177,6 +177,7 @@ abstract class BaseSessionStateBuilder(
         PreReadCheck +:
         HiveOnlyCheck +:
         V2WriteSupportCheck +:
+        V2StreamingScanSupportCheck +:
         customCheckRules
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 01f29cd..01083a9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
 import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2}
 import org.apache.spark.sql.sources.StreamSourceProvider
 import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.TableCapability._
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -181,8 +182,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
           case Some(schema) => provider.getTable(dsOptions, schema)
           case _ => provider.getTable(dsOptions)
         }
+        import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
         table match {
-          case _: SupportsMicroBatchRead | _: SupportsContinuousRead =>
+          case _: SupportsRead if table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) =>
             Dataset.ofRows(
               sparkSession,
               StreamingRelationV2(
@@ -190,6 +192,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
                 sparkSession))
 
           // fallback to v1
+          // TODO (SPARK-27483): we should move this fallback logic to an analyzer rule.
           case _ => Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource))
         }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 33d032e..d2df3a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -31,7 +31,8 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
 import org.apache.spark.sql.execution.streaming.sources._
-import org.apache.spark.sql.sources.v2.{SupportsStreamingWrite, TableProvider}
+import org.apache.spark.sql.sources.v2.{SupportsWrite, TableProvider}
+import org.apache.spark.sql.sources.v2.TableCapability._
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 /**
@@ -315,8 +316,10 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
           source = provider, conf = df.sparkSession.sessionState.conf)
         val options = sessionOptions ++ extraOptions
         val dsOptions = new CaseInsensitiveStringMap(options.asJava)
+        import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
         provider.getTable(dsOptions) match {
-          case s: SupportsStreamingWrite => s
+          case table: SupportsWrite if table.supports(STREAMING_WRITE) =>
+            table.asInstanceOf[BaseStreamingSink]
           case _ => createV1Sink()
         }
       } else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 2e019ba..5a08049 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution,
 import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS
-import org.apache.spark.sql.sources.v2.SupportsStreamingWrite
+import org.apache.spark.sql.sources.v2.SupportsWrite
 import org.apache.spark.util.{Clock, SystemClock, Utils}
 
 /**
@@ -261,7 +261,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
     }
 
     (sink, trigger) match {
-      case (v2Sink: SupportsStreamingWrite, trigger: ContinuousTrigger) =>
+      case (table: SupportsWrite, trigger: ContinuousTrigger) =>
         if (operationCheckEnabled) {
           UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode)
         }
@@ -270,7 +270,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
           userSpecifiedName.orNull,
           checkpointLocation,
           analyzedPlan,
-          v2Sink,
+          table,
           trigger,
           triggerClock,
           outputMode,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2StreamingScanSupportCheckSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2StreamingScanSupportCheckSuite.scala
new file mode 100644
index 0000000..8a0450f
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2StreamingScanSupportCheckSuite.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
+import org.apache.spark.sql.catalyst.plans.logical.Union
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming.{Offset, Source, StreamingRelation, StreamingRelationV2}
+import org.apache.spark.sql.sources.StreamSourceProvider
+import org.apache.spark.sql.sources.v2.{Table, TableCapability, TableProvider}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class V2StreamingScanSupportCheckSuite extends SparkFunSuite with SharedSparkSession {
+  import TableCapability._
+
+  private def createStreamingRelation(table: Table, v1Relation: Option[StreamingRelation]) = {
+    StreamingRelationV2(FakeTableProvider, "fake", table, CaseInsensitiveStringMap.empty(),
+      FakeTableProvider.schema.toAttributes, v1Relation)(spark)
+  }
+
+  private def createStreamingRelationV1() = {
+    StreamingRelation(DataSource(spark, classOf[FakeStreamSourceProvider].getName))
+  }
+
+  test("check correct plan") {
+    val plan1 = createStreamingRelation(CapabilityTable(MICRO_BATCH_READ), None)
+    val plan2 = createStreamingRelation(CapabilityTable(CONTINUOUS_READ), None)
+    val plan3 = createStreamingRelation(CapabilityTable(MICRO_BATCH_READ, CONTINUOUS_READ), None)
+    val plan4 = createStreamingRelationV1()
+
+    V2StreamingScanSupportCheck(Union(plan1, plan1))
+    V2StreamingScanSupportCheck(Union(plan2, plan2))
+    V2StreamingScanSupportCheck(Union(plan1, plan3))
+    V2StreamingScanSupportCheck(Union(plan2, plan3))
+    V2StreamingScanSupportCheck(Union(plan1, plan4))
+    V2StreamingScanSupportCheck(Union(plan3, plan4))
+  }
+
+  test("table without scan capability") {
+    val e = intercept[AnalysisException] {
+      V2StreamingScanSupportCheck(createStreamingRelation(CapabilityTable(), None))
+    }
+    assert(e.message.contains("does not support either micro-batch or continuous scan"))
+  }
+
+  test("mix micro-batch only and continuous only") {
+    val plan1 = createStreamingRelation(CapabilityTable(MICRO_BATCH_READ), None)
+    val plan2 = createStreamingRelation(CapabilityTable(CONTINUOUS_READ), None)
+
+    val e = intercept[AnalysisException] {
+      V2StreamingScanSupportCheck(Union(plan1, plan2))
+    }
+    assert(e.message.contains(
+      "The streaming sources in a query do not have a common supported execution mode"))
+  }
+
+  test("mix continuous only and v1 relation") {
+    val plan1 = createStreamingRelation(CapabilityTable(CONTINUOUS_READ), None)
+    val plan2 = createStreamingRelationV1()
+    val e = intercept[AnalysisException] {
+      V2StreamingScanSupportCheck(Union(plan1, plan2))
+    }
+    assert(e.message.contains(
+      "The streaming sources in a query do not have a common supported execution mode"))
+  }
+}
+
+private object FakeTableProvider extends TableProvider {
+  val schema = new StructType().add("i", "int")
+
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
+    throw new UnsupportedOperationException
+  }
+}
+
+private case class CapabilityTable(_capabilities: TableCapability*) extends Table {
+  override def name(): String = "capability_test_table"
+  override def schema(): StructType = FakeTableProvider.schema
+  override def capabilities(): util.Set[TableCapability] = _capabilities.toSet.asJava
+}
+
+private class FakeStreamSourceProvider extends StreamSourceProvider {
+  override def sourceSchema(
+      sqlContext: SQLContext,
+      schema: Option[StructType],
+      providerName: String,
+      parameters: Map[String, String]): (String, StructType) = {
+    "fake" -> FakeTableProvider.schema
+  }
+
+  override def createSource(
+      sqlContext: SQLContext,
+      metadataPath: String,
+      schema: Option[StructType],
+      providerName: String,
+      parameters: Map[String, String]): Source = {
+    new Source {
+      override def schema: StructType = FakeTableProvider.schema
+      override def getOffset: Option[Offset] = {
+        throw new UnsupportedOperationException
+      }
+      override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+        throw new UnsupportedOperationException
+      }
+      override def stop(): Unit = {}
+    }
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index f022ede..25a68e4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -20,13 +20,16 @@ package org.apache.spark.sql.streaming.sources
 import java.util
 import java.util.Collections
 
-import org.apache.spark.sql.{DataFrame, SQLContext}
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
 import org.apache.spark.sql.execution.datasources.DataSource
-import org.apache.spark.sql.execution.streaming.{RateStreamOffset, Sink, StreamingQueryWrapper}
+import org.apache.spark.sql.execution.streaming.{BaseStreamingSink, RateStreamOffset, Sink, StreamingQueryWrapper}
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
 import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.TableCapability._
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.reader.streaming._
 import org.apache.spark.sql.sources.v2.writer.{WriteBuilder, WriterCommitMessage}
@@ -77,24 +80,12 @@ class FakeWriteBuilder extends WriteBuilder with StreamingWrite {
   }
 }
 
-trait FakeMicroBatchReadTable extends Table with SupportsMicroBatchRead {
-  override def name(): String = "fake"
-  override def schema(): StructType = StructType(Seq())
-  override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
-  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new FakeScanBuilder
-}
-
-trait FakeContinuousReadTable extends Table with SupportsContinuousRead {
-  override def name(): String = "fake"
-  override def schema(): StructType = StructType(Seq())
-  override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
-  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new FakeScanBuilder
-}
-
-trait FakeStreamingWriteTable extends Table with SupportsStreamingWrite {
+trait FakeStreamingWriteTable extends Table with SupportsWrite with BaseStreamingSink {
   override def name(): String = "fake"
   override def schema(): StructType = StructType(Seq())
-  override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
+  override def capabilities(): util.Set[TableCapability] = {
+    Set(STREAMING_WRITE).asJava
+  }
   override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
     new FakeWriteBuilder
   }
@@ -110,7 +101,16 @@ class FakeReadMicroBatchOnly
 
   override def getTable(options: CaseInsensitiveStringMap): Table = {
     LastReadOptions.options = options
-    new FakeMicroBatchReadTable {}
+    new Table with SupportsRead {
+      override def name(): String = "fake"
+      override def schema(): StructType = StructType(Seq())
+      override def capabilities(): util.Set[TableCapability] = {
+        Set(MICRO_BATCH_READ).asJava
+      }
+      override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+        new FakeScanBuilder
+      }
+    }
   }
 }
 
@@ -124,7 +124,16 @@ class FakeReadContinuousOnly
 
   override def getTable(options: CaseInsensitiveStringMap): Table = {
     LastReadOptions.options = options
-    new FakeContinuousReadTable {}
+    new Table with SupportsRead {
+      override def name(): String = "fake"
+      override def schema(): StructType = StructType(Seq())
+      override def capabilities(): util.Set[TableCapability] = {
+        Set(CONTINUOUS_READ).asJava
+      }
+      override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+        new FakeScanBuilder
+      }
+    }
   }
 }
 
@@ -132,7 +141,16 @@ class FakeReadBothModes extends DataSourceRegister with TableProvider {
   override def shortName(): String = "fake-read-microbatch-continuous"
 
   override def getTable(options: CaseInsensitiveStringMap): Table = {
-    new Table with FakeMicroBatchReadTable with FakeContinuousReadTable {}
+    new Table with SupportsRead {
+      override def name(): String = "fake"
+      override def schema(): StructType = StructType(Seq())
+      override def capabilities(): util.Set[TableCapability] = {
+        Set(MICRO_BATCH_READ, CONTINUOUS_READ).asJava
+      }
+      override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+        new FakeScanBuilder
+      }
+    }
   }
 }
 
@@ -365,39 +383,37 @@ class StreamingDataSourceV2Suite extends StreamTest {
       val sinkTable = DataSource.lookupDataSource(write, spark.sqlContext.conf).getConstructor()
         .newInstance().asInstanceOf[TableProvider].getTable(CaseInsensitiveStringMap.empty())
 
-      (sourceTable, sinkTable, trigger) match {
-        // Valid microbatch queries.
-        case (_: SupportsMicroBatchRead, _: SupportsStreamingWrite, t)
-            if !t.isInstanceOf[ContinuousTrigger] =>
-          testPositiveCase(read, write, trigger)
-
-        // Valid continuous queries.
-        case (_: SupportsContinuousRead, _: SupportsStreamingWrite,
-              _: ContinuousTrigger) =>
-          testPositiveCase(read, write, trigger)
-
+      import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
+      trigger match {
         // Invalid - can't read at all
-        case (r, _, _) if !r.isInstanceOf[SupportsMicroBatchRead] &&
-            !r.isInstanceOf[SupportsContinuousRead] =>
+        case _ if !sourceTable.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) =>
           testNegativeCase(read, write, trigger,
             s"Data source $read does not support streamed reading")
 
         // Invalid - can't write
-        case (_, w, _) if !w.isInstanceOf[SupportsStreamingWrite] =>
+        case _ if !sinkTable.supports(STREAMING_WRITE) =>
           testNegativeCase(read, write, trigger,
             s"Data source $write does not support streamed writing")
 
-        // Invalid - trigger is continuous but reader is not
-        case (r, _: SupportsStreamingWrite, _: ContinuousTrigger)
-            if !r.isInstanceOf[SupportsContinuousRead] =>
-          testNegativeCase(read, write, trigger,
-            s"Data source $read does not support continuous processing")
+        case _: ContinuousTrigger =>
+          if (sourceTable.supports(CONTINUOUS_READ)) {
+            // Valid microbatch queries.
+            testPositiveCase(read, write, trigger)
+          } else {
+            // Invalid - trigger is continuous but reader is not
+            testNegativeCase(
+              read, write, trigger, s"Data source $read does not support continuous processing")
+          }
 
-        // Invalid - trigger is microbatch but reader is not
-        case (r, _, t) if !r.isInstanceOf[SupportsMicroBatchRead] &&
-            !t.isInstanceOf[ContinuousTrigger] =>
-          testPostCreationNegativeCase(read, write, trigger,
-            s"Data source $read does not support microbatch processing")
+        case microBatchTrigger =>
+          if (sourceTable.supports(MICRO_BATCH_READ)) {
+            // Valid continuous queries.
+            testPositiveCase(read, write, trigger)
+          } else {
+            // Invalid - trigger is microbatch but reader is not
+            testPostCreationNegativeCase(read, write, trigger,
+              s"Data source $read does not support microbatch processing")
+          }
       }
     }
   }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index 3bca770..70364e5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.SparkPlanner
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.v2.V2WriteSupportCheck
+import org.apache.spark.sql.execution.datasources.v2.{V2StreamingScanSupportCheck, V2WriteSupportCheck}
 import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLoader, SessionState}
 
@@ -89,6 +89,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
       PreWriteCheck +:
         PreReadCheck +:
         V2WriteSupportCheck +:
+        V2StreamingScanSupportCheck +:
         customCheckRules
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org