You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/01/29 08:07:37 UTC

[spark] branch master updated: [SPARK-26695][SQL] data source v2 API refactor - continuous read

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e97ab1d  [SPARK-26695][SQL] data source v2 API refactor - continuous read
e97ab1d is described below

commit e97ab1d9807134bb557ae73920af61e8534b2b08
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Tue Jan 29 00:07:27 2019 -0800

    [SPARK-26695][SQL] data source v2 API refactor - continuous read
    
    ## What changes were proposed in this pull request?
    
    Following https://github.com/apache/spark/pull/23430, this PR does the API refactor for continuous read, w.r.t. the [doc](https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing)
    
    The major changes:
    1. rename `XXXContinuousReadSupport` to `XXXContinuousStream`
    2. at the beginning of continuous streaming execution, convert `StreamingRelationV2` to `StreamingDataSourceV2Relation` directly, instead of `StreamingExecutionRelation`.
    3. remove all the hacks as we have finished all the read side API refactor
    
    ## How was this patch tested?
    
    existing tests
    
    Closes #23619 from cloud-fan/continuous.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: gatorsmile <ga...@gmail.com>
---
 ...adSupport.scala => KafkaContinuousStream.scala} |  90 ++++++---------
 .../spark/sql/kafka010/KafkaSourceProvider.scala   |  79 ++++++-------
 .../sql/kafka010/KafkaContinuousSourceSuite.scala  |   8 +-
 .../spark/sql/kafka010/KafkaContinuousTest.scala   |   4 +-
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  |  20 +---
 .../sql/sources/v2/BatchReadSupportProvider.java   |  61 ----------
 .../sources/v2/ContinuousReadSupportProvider.java  |  70 ------------
 .../apache/spark/sql/sources/v2/DataSourceV2.java  |   2 +-
 ...figBuilder.java => SupportsContinuousRead.java} |  18 +--
 .../apache/spark/sql/sources/v2/reader/Batch.java  |   2 +-
 .../sql/sources/v2/reader/BatchReadSupport.java    |  51 ---------
 .../sql/sources/v2/reader/InputPartition.java      |   2 +-
 .../v2/reader/OldSupportsReportPartitioning.java   |  38 -------
 .../v2/reader/OldSupportsReportStatistics.java     |  38 -------
 .../spark/sql/sources/v2/reader/ReadSupport.java   |  50 --------
 .../apache/spark/sql/sources/v2/reader/Scan.java   |  22 +++-
 .../spark/sql/sources/v2/reader/ScanConfig.java    |  45 --------
 .../v2/reader/SupportsPushDownRequiredColumns.java |   2 +-
 .../v2/reader/SupportsReportPartitioning.java      |   6 +-
 .../v2/reader/SupportsReportStatistics.java        |   4 +-
 .../v2/reader/streaming/ContinuousReadSupport.java |  77 -------------
 ...MicroBatchStream.java => ContinuousStream.java} |  43 ++++---
 .../v2/reader/streaming/MicroBatchStream.java      |   2 +-
 .../sql/sources/v2/reader/streaming/Offset.java    |   2 +-
 .../v2/reader/streaming/SparkDataStream.java       |   3 +-
 .../v2/reader/streaming/StreamingReadSupport.java  |  52 ---------
 .../execution/datasources/v2/BatchScanExec.scala   |  49 ++++++++
 .../datasources/v2/ContinuousScanExec.scala        |  99 ++++------------
 .../datasources/v2/DataSourceV2Relation.scala      |  48 +-------
 ...anExec.scala => DataSourceV2ScanExecBase.scala} |  39 ++-----
 .../datasources/v2/DataSourceV2Strategy.scala      |  16 ++-
 .../datasources/v2/MicroBatchScanExec.scala        |  55 ++-------
 .../execution/streaming/MicroBatchExecution.scala  |   2 +-
 .../SimpleStreamingScanConfigBuilder.scala         |  40 -------
 .../execution/streaming/StreamingRelation.scala    |  26 +----
 .../streaming/continuous/ContinuousExecution.scala | 126 +++++++++------------
 .../continuous/ContinuousRateStreamSource.scala    |  26 ++---
 .../continuous/ContinuousTextSocketSource.scala    |  39 ++-----
 .../streaming/continuous/EpochCoordinator.scala    |  10 +-
 .../spark/sql/execution/streaming/memory.scala     |  33 +++---
 .../streaming/sources/ContinuousMemoryStream.scala |  27 +----
 .../streaming/sources/RateStreamProvider.scala     |  18 ++-
 .../sources/TextSocketMicroBatchStream.scala       |   8 +-
 .../sources/TextSocketSourceProvider.scala         |  25 ++--
 .../spark/sql/streaming/DataStreamReader.scala     |  35 +-----
 .../sources/RateStreamProviderSuite.scala          |  29 ++---
 .../streaming/sources/TextSocketStreamSuite.scala  |  61 +++++-----
 .../spark/sql/sources/v2/DataSourceV2Suite.scala   |   8 +-
 .../apache/spark/sql/streaming/StreamSuite.scala   |  12 +-
 .../apache/spark/sql/streaming/StreamTest.scala    |  11 +-
 .../spark/sql/streaming/StreamingQuerySuite.scala  |   4 +-
 .../ContinuousQueuedDataReaderSuite.scala          |   4 +-
 .../sql/streaming/continuous/ContinuousSuite.scala |   2 +-
 .../continuous/EpochCoordinatorSuite.scala         |   6 +-
 .../sources/StreamingDataSourceV2Suite.scala       |  88 ++++++--------
 55 files changed, 474 insertions(+), 1263 deletions(-)

diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
similarity index 84%
rename from external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
rename to external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
index f328567..0e61717 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
@@ -30,10 +30,9 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.reader.streaming._
-import org.apache.spark.sql.types.StructType
 
 /**
- * A [[ContinuousReadSupport]] for data from kafka.
+ * A [[ContinuousStream]] for data from kafka.
  *
  * @param offsetReader  a reader used to get kafka offsets. Note that the actual data will be
  *                      read by per-task consumers generated later.
@@ -46,17 +45,23 @@ import org.apache.spark.sql.types.StructType
  *                       scenarios, where some offsets after the specified initial ones can't be
  *                       properly read.
  */
-class KafkaContinuousReadSupport(
+class KafkaContinuousStream(
     offsetReader: KafkaOffsetReader,
     kafkaParams: ju.Map[String, Object],
     sourceOptions: Map[String, String],
     metadataPath: String,
     initialOffsets: KafkaOffsetRangeLimit,
     failOnDataLoss: Boolean)
-  extends ContinuousReadSupport with Logging {
+  extends ContinuousStream with Logging {
 
   private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
 
+  // Initialized when creating reader factories. If this diverges from the partitions at the latest
+  // offsets, we need to reconfigure.
+  // Exposed outside this object only for unit tests.
+  @volatile private[sql] var knownPartitions: Set[TopicPartition] = _
+
+
   override def initialOffset(): Offset = {
     val offsets = initialOffsets match {
       case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
@@ -67,18 +72,32 @@ class KafkaContinuousReadSupport(
     offsets
   }
 
-  override def fullSchema(): StructType = KafkaOffsetReader.kafkaSchema
-
-  override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = {
-    new KafkaContinuousScanConfigBuilder(fullSchema(), start, offsetReader, reportDataLoss)
-  }
-
   override def deserializeOffset(json: String): Offset = {
     KafkaSourceOffset(JsonUtils.partitionOffsets(json))
   }
 
-  override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
-    val startOffsets = config.asInstanceOf[KafkaContinuousScanConfig].startOffsets
+  override def planInputPartitions(start: Offset): Array[InputPartition] = {
+    val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(start)
+
+    val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet
+    val newPartitions = currentPartitionSet.diff(oldStartPartitionOffsets.keySet)
+    val newPartitionOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq)
+
+    val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
+    if (deletedPartitions.nonEmpty) {
+      val message = if (
+        offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+        s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}"
+      } else {
+        s"$deletedPartitions are gone. Some data may have been missed."
+      }
+      reportDataLoss(message)
+    }
+
+    val startOffsets = newPartitionOffsets ++
+      oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_))
+    knownPartitions = startOffsets.keySet
+
     startOffsets.toSeq.map {
       case (topicPartition, start) =>
         KafkaContinuousInputPartition(
@@ -86,8 +105,7 @@ class KafkaContinuousReadSupport(
     }.toArray
   }
 
-  override def createContinuousReaderFactory(
-      config: ScanConfig): ContinuousPartitionReaderFactory = {
+  override def createContinuousReaderFactory(): ContinuousPartitionReaderFactory = {
     KafkaContinuousReaderFactory
   }
 
@@ -105,8 +123,7 @@ class KafkaContinuousReadSupport(
     KafkaSourceOffset(mergedMap)
   }
 
-  override def needsReconfiguration(config: ScanConfig): Boolean = {
-    val knownPartitions = config.asInstanceOf[KafkaContinuousScanConfig].knownPartitions
+  override def needsReconfiguration(): Boolean = {
     offsetReader.fetchLatestOffsets(None).keySet != knownPartitions
   }
 
@@ -151,47 +168,6 @@ object KafkaContinuousReaderFactory extends ContinuousPartitionReaderFactory {
   }
 }
 
-class KafkaContinuousScanConfigBuilder(
-    schema: StructType,
-    startOffset: Offset,
-    offsetReader: KafkaOffsetReader,
-    reportDataLoss: String => Unit)
-  extends ScanConfigBuilder {
-
-  override def build(): ScanConfig = {
-    val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(startOffset)
-
-    val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet
-    val newPartitions = currentPartitionSet.diff(oldStartPartitionOffsets.keySet)
-    val newPartitionOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq)
-
-    val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
-    if (deletedPartitions.nonEmpty) {
-      val message = if (
-          offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
-        s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}"
-      } else {
-        s"$deletedPartitions are gone. Some data may have been missed."
-      }
-      reportDataLoss(message)
-    }
-
-    val startOffsets = newPartitionOffsets ++
-      oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_))
-    KafkaContinuousScanConfig(schema, startOffsets)
-  }
-}
-
-case class KafkaContinuousScanConfig(
-    readSchema: StructType,
-    startOffsets: Map[TopicPartition, Long])
-  extends ScanConfig {
-
-  // Created when building the scan config builder. If this diverges from the partitions at the
-  // latest offsets, we need to reconfigure the kafka read support.
-  def knownPartitions: Set[TopicPartition] = startOffsets.keySet
-}
-
 /**
  * A per-task data reader for continuous Kafka processing.
  *
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 58c90b8..9238899b 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
-import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
@@ -48,7 +48,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
     with RelationProvider
     with CreatableRelationProvider
     with StreamingWriteSupportProvider
-    with ContinuousReadSupportProvider
     with TableProvider
     with Logging {
   import KafkaSourceProvider._
@@ -108,46 +107,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
   }
 
   /**
-   * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to read
-   * Kafka data in a continuous streaming query.
-   */
-  override def createContinuousReadSupport(
-      metadataPath: String,
-      options: DataSourceOptions): KafkaContinuousReadSupport = {
-    val parameters = options.asMap().asScala.toMap
-    validateStreamOptions(parameters)
-    // Each running query should use its own group id. Otherwise, the query may be only assigned
-    // partial data since Kafka will assign partitions to multiple consumers having the same group
-    // id. Hence, we should generate a unique id for each query.
-    val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath)
-
-    val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
-    val specifiedKafkaParams =
-      parameters
-        .keySet
-        .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
-        .map { k => k.drop(6).toString -> parameters(k) }
-        .toMap
-
-    val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
-      STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
-
-    val kafkaOffsetReader = new KafkaOffsetReader(
-      strategy(caseInsensitiveParams),
-      kafkaParamsForDriver(specifiedKafkaParams),
-      parameters,
-      driverGroupIdPrefix = s"$uniqueGroupId-driver")
-
-    new KafkaContinuousReadSupport(
-      kafkaOffsetReader,
-      kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
-      parameters,
-      metadataPath,
-      startingStreamOffsets,
-      failOnDataLoss(caseInsensitiveParams))
-  }
-
-  /**
    * Returns a new base relation with the given parameters.
    *
    * @note The parameters' keywords are case insensitive and this insensitivity is enforced
@@ -406,7 +365,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
   }
 
   class KafkaTable(strategy: => ConsumerStrategy) extends Table
-    with SupportsMicroBatchRead {
+    with SupportsMicroBatchRead with SupportsContinuousRead {
 
     override def name(): String = s"Kafka $strategy"
 
@@ -449,6 +408,40 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
         startingStreamOffsets,
         failOnDataLoss(caseInsensitiveParams))
     }
+
+    override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
+      val parameters = options.asMap().asScala.toMap
+      validateStreamOptions(parameters)
+      // Each running query should use its own group id. Otherwise, the query may be only assigned
+      // partial data since Kafka will assign partitions to multiple consumers having the same group
+      // id. Hence, we should generate a unique id for each query.
+      val uniqueGroupId = streamingUniqueGroupId(parameters, checkpointLocation)
+
+      val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
+      val specifiedKafkaParams =
+        parameters
+          .keySet
+          .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
+          .map { k => k.drop(6).toString -> parameters(k) }
+          .toMap
+
+      val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
+        caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
+
+      val kafkaOffsetReader = new KafkaOffsetReader(
+        strategy(caseInsensitiveParams),
+        kafkaParamsForDriver(specifiedKafkaParams),
+        parameters,
+        driverGroupIdPrefix = s"$uniqueGroupId-driver")
+
+      new KafkaContinuousStream(
+        kafkaOffsetReader,
+        kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
+        parameters,
+        checkpointLocation,
+        startingStreamOffsets,
+        failOnDataLoss(caseInsensitiveParams))
+    }
   }
 }
 
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
index 2f7fd7f..be0cea2 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -209,11 +209,11 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
           assert(
             query.lastExecution.executedPlan.collectFirst {
               case scan: ContinuousScanExec
-                if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
-                scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
-            }.exists { config =>
+                if scan.stream.isInstanceOf[KafkaContinuousStream] =>
+                scan.stream.asInstanceOf[KafkaContinuousStream]
+            }.exists { stream =>
               // Ensure the new topic is present and the old topic is gone.
-              config.knownPartitions.exists(_.topic == topic2)
+              stream.knownPartitions.exists(_.topic == topic2)
             },
             s"query never reconfigured to new topic $topic2")
         }
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
index fa3b623..ad1c2c5 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
@@ -48,8 +48,8 @@ trait KafkaContinuousTest extends KafkaSourceTest {
       assert(
         query.lastExecution.executedPlan.collectFirst {
           case scan: ContinuousScanExec
-              if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
-            scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
+              if scan.stream.isInstanceOf[KafkaContinuousStream] =>
+            scan.stream.asInstanceOf[KafkaContinuousStream]
         }.exists(_.knownPartitions.size == newCount),
         s"query never reconfigured to $newCount partitions")
     }
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 90b5015..aa7baac 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -28,14 +28,13 @@ import scala.collection.JavaConverters._
 import scala.io.Source
 import scala.util.Random
 
-import org.apache.kafka.clients.admin.{AdminClient, ConsumerGroupListing}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
+import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
 import org.apache.kafka.common.TopicPartition
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.sql.{Dataset, ForeachWriter, SparkSession}
-import org.apache.spark.sql.execution.datasources.v2.{OldStreamingDataSourceV2Relation, StreamingDataSourceV2Relation}
+import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
 import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
@@ -118,17 +117,10 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf
       val sources: Seq[BaseStreamingSource] = {
         query.get.logicalPlan.collect {
           case StreamingExecutionRelation(source: KafkaSource, _) => source
-          case r: StreamingDataSourceV2Relation
-              if r.stream.isInstanceOf[KafkaMicroBatchStream] =>
-            r.stream.asInstanceOf[KafkaMicroBatchStream]
-        } ++ (query.get.lastExecution match {
-          case null => Seq()
-          case e => e.logical.collect {
-            case r: OldStreamingDataSourceV2Relation
-                if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
-              r.readSupport.asInstanceOf[KafkaContinuousReadSupport]
-          }
-        })
+          case r: StreamingDataSourceV2Relation if r.stream.isInstanceOf[KafkaMicroBatchStream] ||
+              r.stream.isInstanceOf[KafkaContinuousStream] =>
+            r.stream
+        }
       }.distinct
 
       if (sources.isEmpty) {
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java
deleted file mode 100644
index 2a4933d..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
-import org.apache.spark.sql.sources.v2.reader.BatchReadSupport;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
- * provide data reading ability for batch processing.
- *
- * This interface is used to create {@link BatchReadSupport} instances when end users run
- * {@code SparkSession.read.format(...).option(...).load()}.
- */
-@Evolving
-public interface BatchReadSupportProvider extends DataSourceV2 {
-
-  /**
-   * Creates a {@link BatchReadSupport} instance to load the data from this data source with a user
-   * specified schema, which is called by Spark at the beginning of each batch query.
-   *
-   * Spark will call this method at the beginning of each batch query to create a
-   * {@link BatchReadSupport} instance.
-   *
-   * By default this method throws {@link UnsupportedOperationException}, implementations should
-   * override this method to handle user specified schema.
-   *
-   * @param schema the user specified schema.
-   * @param options the options for the returned data source reader, which is an immutable
-   *                case-insensitive string-to-string map.
-   */
-  default BatchReadSupport createBatchReadSupport(StructType schema, DataSourceOptions options) {
-    return DataSourceV2Utils.failForUserSpecifiedSchema(this);
-  }
-
-  /**
-   * Creates a {@link BatchReadSupport} instance to scan the data from this data source, which is
-   * called by Spark at the beginning of each batch query.
-   *
-   * @param options the options for the returned data source reader, which is an immutable
-   *                case-insensitive string-to-string map.
-   */
-  BatchReadSupport createBatchReadSupport(DataSourceOptions options);
-}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java
deleted file mode 100644
index b4f2eb3..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
-import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
- * provide data reading ability for continuous stream processing.
- *
- * This interface is used to create {@link ContinuousReadSupport} instances when end users run
- * {@code SparkSession.readStream.format(...).option(...).load()} with a continuous trigger.
- */
-@Evolving
-public interface ContinuousReadSupportProvider extends DataSourceV2 {
-
-  /**
-   * Creates a {@link ContinuousReadSupport} instance to scan the data from this streaming data
-   * source with a user specified schema, which is called by Spark at the beginning of each
-   * continuous streaming query.
-   *
-   * By default this method throws {@link UnsupportedOperationException}, implementations should
-   * override this method to handle user specified schema.
-   *
-   * @param schema the user provided schema.
-   * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
-   *                           recovery. Readers for the same logical source in the same query
-   *                           will be given the same checkpointLocation.
-   * @param options the options for the returned data source reader, which is an immutable
-   *                case-insensitive string-to-string map.
-   */
-  default ContinuousReadSupport createContinuousReadSupport(
-      StructType schema,
-      String checkpointLocation,
-      DataSourceOptions options) {
-    return DataSourceV2Utils.failForUserSpecifiedSchema(this);
-  }
-
-  /**
-   * Creates a {@link ContinuousReadSupport} instance to scan the data from this streaming data
-   * source, which is called by Spark at the beginning of each continuous streaming query.
-   *
-   * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
-   *                           recovery. Readers for the same logical source in the same query
-   *                           will be given the same checkpointLocation.
-   * @param options the options for the returned data source reader, which is an immutable
-   *                case-insensitive string-to-string map.
-   */
-  ContinuousReadSupport createContinuousReadSupport(
-      String checkpointLocation,
-      DataSourceOptions options);
-}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
index 4aaa57d..43bdcca 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
@@ -20,7 +20,7 @@ package org.apache.spark.sql.sources.v2;
 import org.apache.spark.annotation.Evolving;
 
 /**
- * TODO: remove it when we finish the API refactor for streaming side.
+ * TODO: remove it when we finish the API refactor for streaming write side.
  */
 @Evolving
 public interface DataSourceV2 {}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfigBuilder.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java
similarity index 59%
rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfigBuilder.java
rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java
index 4922962..b7fa3f2 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfigBuilder.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java
@@ -15,16 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.sources.v2.reader;
+package org.apache.spark.sql.sources.v2;
 
 import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.v2.reader.Scan;
+import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
 
 /**
- * An interface for building the {@link ScanConfig}. Implementations can mixin those
- * SupportsPushDownXYZ interfaces to do operator pushdown, and keep the operator pushdown result in
- * the returned {@link ScanConfig}.
+ * An empty mix-in interface for {@link Table}, to indicate this table supports streaming scan with
+ * continuous mode.
+ * <p>
+ * If a {@link Table} implements this interface, the
+ * {@link SupportsRead#newScanBuilder(DataSourceOptions)} must return a {@link ScanBuilder} that
+ * builds {@link Scan} with {@link Scan#toContinuousStream(String)} implemented.
+ * </p>
  */
 @Evolving
-public interface ScanConfigBuilder {
-  ScanConfig build();
-}
+public interface SupportsContinuousRead extends SupportsRead { }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java
index bcfa198..28d80b7 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java
@@ -42,7 +42,7 @@ public interface Batch {
   InputPartition[] planInputPartitions();
 
   /**
-   * Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}.
+   * Returns a factory to create a {@link PartitionReader} for each {@link InputPartition}.
    */
   PartitionReaderFactory createReaderFactory();
 }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java
deleted file mode 100644
index 518a8b0..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.Evolving;
-
-/**
- * An interface that defines how to load the data from data source for batch processing.
- *
- * The execution engine will get an instance of this interface from a data source provider
- * (e.g. {@link org.apache.spark.sql.sources.v2.BatchReadSupportProvider}) at the start of a batch
- * query, then call {@link #newScanConfigBuilder()} and create an instance of {@link ScanConfig}.
- * The {@link ScanConfigBuilder} can apply operator pushdown and keep the pushdown result in
- * {@link ScanConfig}. The {@link ScanConfig} will be used to create input partitions and reader
- * factory to scan data from the data source with a Spark job.
- */
-@Evolving
-public interface BatchReadSupport extends ReadSupport {
-
-  /**
-   * Returns a builder of {@link ScanConfig}. Spark will call this method and create a
-   * {@link ScanConfig} for each data scanning job.
-   *
-   * The builder can take some query specific information to do operators pushdown, and keep these
-   * information in the created {@link ScanConfig}.
-   *
-   * This is the first step of the data scan. All other methods in {@link BatchReadSupport} needs
-   * to take {@link ScanConfig} as an input.
-   */
-  ScanConfigBuilder newScanConfigBuilder();
-
-  /**
-   * Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}.
-   */
-  PartitionReaderFactory createReaderFactory(ScanConfig config);
-}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
index 5f52480..4133497 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
@@ -23,7 +23,7 @@ import org.apache.spark.annotation.Evolving;
 
 /**
  * A serializable representation of an input partition returned by
- * {@link ReadSupport#planInputPartitions(ScanConfig)}.
+ * {@link Batch#planInputPartitions()} and the corresponding ones in streaming .
  *
  * Note that {@link InputPartition} will be serialized and sent to executors, then
  * {@link PartitionReader} will be created by
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportPartitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportPartitioning.java
deleted file mode 100644
index 347a465..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportPartitioning.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
-
-/**
- * A mix in interface for {@link BatchReadSupport}. Data sources can implement this interface to
- * report data partitioning and try to avoid shuffle at Spark side.
- *
- * Note that, when a {@link ReadSupport} implementation creates exactly one {@link InputPartition},
- * Spark may avoid adding a shuffle even if the reader does not implement this interface.
- */
-@Evolving
-// TODO: remove it, after we finish the API refactor completely.
-public interface OldSupportsReportPartitioning extends ReadSupport {
-
-  /**
-   * Returns the output data partitioning that this reader guarantees.
-   */
-  Partitioning outputPartitioning(ScanConfig config);
-}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportStatistics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportStatistics.java
deleted file mode 100644
index 0d3ec17..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportStatistics.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.Evolving;
-
-/**
- * A mix in interface for {@link BatchReadSupport}. Data sources can implement this interface to
- * report statistics to Spark.
- *
- * As of Spark 2.4, statistics are reported to the optimizer before any operator is pushed to the
- * data source. Implementations that return more accurate statistics based on pushed operators will
- * not improve query performance until the planner can push operators before getting stats.
- */
-@Evolving
-// TODO: remove it, after we finish the API refactor completely.
-public interface OldSupportsReportStatistics extends ReadSupport {
-
-  /**
-   * Returns the estimated statistics of this data source scan.
-   */
-  Statistics estimateStatistics(ScanConfig config);
-}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java
deleted file mode 100644
index b1f610a..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * The base interface for all the batch and streaming read supports. Data sources should implement
- * concrete read support interfaces like {@link BatchReadSupport}.
- *
- * If Spark fails to execute any methods in the implementations of this interface (by throwing an
- * exception), the read action will fail and no Spark job will be submitted.
- */
-@Evolving
-public interface ReadSupport {
-
-  /**
-   * Returns the full schema of this data source, which is usually the physical schema of the
-   * underlying storage. This full schema should not be affected by column pruning or other
-   * optimizations.
-   */
-  StructType fullSchema();
-
-  /**
-   * Returns a list of {@link InputPartition input partitions}. Each {@link InputPartition}
-   * represents a data split that can be processed by one Spark task. The number of input
-   * partitions returned here is the same as the number of RDD partitions this scan outputs.
-   *
-   * Note that, this may not be a full scan if the data source supports optimization like filter
-   * push-down. Implementations should check the input {@link ScanConfig} and adjust the resulting
-   * {@link InputPartition input partitions}.
-   */
-  InputPartition[] planInputPartitions(ScanConfig config);
-}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java
index c60fb2b..25ab06e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java
@@ -18,9 +18,11 @@
 package org.apache.spark.sql.sources.v2.reader;
 
 import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousStream;
 import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.sources.v2.SupportsBatchRead;
+import org.apache.spark.sql.sources.v2.SupportsContinuousRead;
 import org.apache.spark.sql.sources.v2.SupportsMicroBatchRead;
 import org.apache.spark.sql.sources.v2.Table;
 
@@ -65,7 +67,7 @@ public interface Scan {
    * @throws UnsupportedOperationException
    */
   default Batch toBatch() {
-    throw new UnsupportedOperationException("Batch scans are not supported");
+    throw new UnsupportedOperationException(description() + ": Batch scan are not supported");
   }
 
   /**
@@ -81,6 +83,22 @@ public interface Scan {
    * @throws UnsupportedOperationException
    */
   default MicroBatchStream toMicroBatchStream(String checkpointLocation) {
-    throw new UnsupportedOperationException("Micro-batch scans are not supported");
+    throw new UnsupportedOperationException(description() + ": Micro-batch scan are not supported");
+  }
+
+  /**
+   * Returns the physical representation of this scan for streaming query with continuous mode. By
+   * default this method throws exception, data sources must overwrite this method to provide an
+   * implementation, if the {@link Table} that creates this scan implements
+   * {@link SupportsContinuousRead}.
+   *
+   * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
+   *                           recovery. Data streams for the same logical source in the same query
+   *                           will be given the same checkpointLocation.
+   *
+   * @throws UnsupportedOperationException
+   */
+  default ContinuousStream toContinuousStream(String checkpointLocation) {
+    throw new UnsupportedOperationException(description() + ": Continuous scan are not supported");
   }
 }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java
deleted file mode 100644
index c8cff68..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * An interface that carries query specific information for the data scanning job, like operator
- * pushdown information and streaming query offsets. This is defined as an empty interface, and data
- * sources should define their own {@link ScanConfig} classes.
- *
- * For APIs that take a {@link ScanConfig} as input, like
- * {@link ReadSupport#planInputPartitions(ScanConfig)},
- * {@link BatchReadSupport#createReaderFactory(ScanConfig)} and
- * {@link OldSupportsReportStatistics#estimateStatistics(ScanConfig)}, implementations mostly need
- * to cast the input {@link ScanConfig} to the concrete {@link ScanConfig} class of the data source.
- */
-@Evolving
-public interface ScanConfig {
-
-  /**
-   * Returns the actual schema of this data source reader, which may be different from the physical
-   * schema of the underlying storage, as column pruning or other optimizations may happen.
-   *
-   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
-   * submitted.
-   */
-  StructType readSchema();
-}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
index 60e71c5..862bd14 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
@@ -35,7 +35,7 @@ public interface SupportsPushDownRequiredColumns extends ScanBuilder {
    * also OK to do the pruning partially, e.g., a data source may not be able to prune nested
    * fields, and only prune top-level columns.
    *
-   * Note that, {@link ScanConfig#readSchema()} implementation should take care of the column
+   * Note that, {@link Scan#readSchema()} implementation should take care of the column
    * pruning applied here.
    */
   void pruneColumns(StructType requiredSchema);
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
index ba17581..4ce97bc 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
@@ -21,14 +21,14 @@ import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
 
 /**
- * A mix in interface for {@link Batch}. Data sources can implement this interface to
+ * A mix in interface for {@link Scan}. Data sources can implement this interface to
  * report data partitioning and try to avoid shuffle at Spark side.
  *
- * Note that, when a {@link Batch} implementation creates exactly one {@link InputPartition},
+ * Note that, when a {@link Scan} implementation creates exactly one {@link InputPartition},
  * Spark may avoid adding a shuffle even if the reader does not implement this interface.
  */
 @Evolving
-public interface SupportsReportPartitioning extends Batch {
+public interface SupportsReportPartitioning extends Scan {
 
   /**
    * Returns the output data partitioning that this reader guarantees.
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
index d9f5fb6..d7364af 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
@@ -20,7 +20,7 @@ package org.apache.spark.sql.sources.v2.reader;
 import org.apache.spark.annotation.Evolving;
 
 /**
- * A mix in interface for {@link Batch}. Data sources can implement this interface to
+ * A mix in interface for {@link Scan}. Data sources can implement this interface to
  * report statistics to Spark.
  *
  * As of Spark 2.4, statistics are reported to the optimizer before any operator is pushed to the
@@ -28,7 +28,7 @@ import org.apache.spark.annotation.Evolving;
  * not improve query performance until the planner can push operators before getting stats.
  */
 @Evolving
-public interface SupportsReportStatistics extends Batch {
+public interface SupportsReportStatistics extends Scan {
 
   /**
    * Returns the estimated statistics of this data source scan.
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java
deleted file mode 100644
index 2b784ac..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader.streaming;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.ScanConfig;
-import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
-
-/**
- * An interface that defines how to load the data from data source for continuous streaming
- * processing.
- *
- * The execution engine will get an instance of this interface from a data source provider
- * (e.g. {@link org.apache.spark.sql.sources.v2.ContinuousReadSupportProvider}) at the start of a
- * streaming query, then call {@link #newScanConfigBuilder(Offset)} and create an instance of
- * {@link ScanConfig} for the duration of the streaming query or until
- * {@link #needsReconfiguration(ScanConfig)} is true. The {@link ScanConfig} will be used to create
- * input partitions and reader factory to scan data with a Spark job for its duration. At the end
- * {@link #stop()} will be called when the streaming execution is completed. Note that a single
- * query may have multiple executions due to restart or failure recovery.
- */
-@Evolving
-public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource {
-
-  /**
-   * Returns a builder of {@link ScanConfig}. Spark will call this method and create a
-   * {@link ScanConfig} for each data scanning job.
-   *
-   * The builder can take some query specific information to do operators pushdown, store streaming
-   * offsets, etc., and keep these information in the created {@link ScanConfig}.
-   *
-   * This is the first step of the data scan. All other methods in {@link ContinuousReadSupport}
-   * needs to take {@link ScanConfig} as an input.
-   */
-  ScanConfigBuilder newScanConfigBuilder(Offset start);
-
-  /**
-   * Returns a factory, which produces one {@link ContinuousPartitionReader} for one
-   * {@link InputPartition}.
-   */
-  ContinuousPartitionReaderFactory createContinuousReaderFactory(ScanConfig config);
-
-  /**
-   * Merge partitioned offsets coming from {@link ContinuousPartitionReader} instances
-   * for each partition to a single global offset.
-   */
-  Offset mergeOffsets(PartitionOffset[] offsets);
-
-  /**
-   * The execution engine will call this method in every epoch to determine if new input
-   * partitions need to be generated, which may be required if for example the underlying
-   * source system has had partitions added or removed.
-   *
-   * If true, the query will be shut down and restarted with a new {@link ContinuousReadSupport}
-   * instance.
-   */
-  default boolean needsReconfiguration(ScanConfig config) {
-    return false;
-  }
-}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchStream.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousStream.java
similarity index 55%
copy from sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchStream.java
copy to sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousStream.java
index 2fb3957..fff5b95 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchStream.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousStream.java
@@ -19,23 +19,16 @@ package org.apache.spark.sql.sources.v2.reader.streaming;
 
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.PartitionReader;
-import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory;
 import org.apache.spark.sql.sources.v2.reader.Scan;
 
 /**
- * A {@link SparkDataStream} for streaming queries with micro-batch mode.
+ * A {@link SparkDataStream} for streaming queries with continuous mode.
  */
 @Evolving
-public interface MicroBatchStream extends SparkDataStream {
+public interface ContinuousStream extends SparkDataStream {
 
   /**
-   * Returns the most recent offset available.
-   */
-  Offset latestOffset();
-
-  /**
-   * Returns a list of {@link InputPartition input partitions} given the start and end offsets. Each
+   * Returns a list of {@link InputPartition input partitions} given the start offset. Each
    * {@link InputPartition} represents a data split that can be processed by one Spark task. The
    * number of input partitions returned here is the same as the number of RDD partitions this scan
    * outputs.
@@ -44,14 +37,34 @@ public interface MicroBatchStream extends SparkDataStream {
    * and is responsible for creating splits for that filter, which is not a full scan.
    * </p>
    * <p>
-   * This method will be called multiple times, to launch one Spark job for each micro-batch in this
-   * data stream.
+   * This method will be called to launch one Spark job for reading the data stream. It will be
+   * called more than once, if {@link #needsReconfiguration()} returns true and Spark needs to
+   * launch a new job.
    * </p>
    */
-  InputPartition[] planInputPartitions(Offset start, Offset end);
+  InputPartition[] planInputPartitions(Offset start);
+
+  /**
+   * Returns a factory to create a {@link ContinuousPartitionReader} for each
+   * {@link InputPartition}.
+   */
+  ContinuousPartitionReaderFactory createContinuousReaderFactory();
+
+  /**
+   * Merge partitioned offsets coming from {@link ContinuousPartitionReader} instances
+   * for each partition to a single global offset.
+   */
+  Offset mergeOffsets(PartitionOffset[] offsets);
 
   /**
-   * Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}.
+   * The execution engine will call this method in every epoch to determine if new input
+   * partitions need to be generated, which may be required if for example the underlying
+   * source system has had partitions added or removed.
+   *
+   * If true, the Spark job to scan this continuous data stream will be interrupted and Spark will
+   * launch it again with a new list of {@link InputPartition input partitions}.
    */
-  PartitionReaderFactory createReaderFactory();
+  default boolean needsReconfiguration() {
+    return false;
+  }
 }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchStream.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchStream.java
index 2fb3957..330f07b 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchStream.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchStream.java
@@ -51,7 +51,7 @@ public interface MicroBatchStream extends SparkDataStream {
   InputPartition[] planInputPartitions(Offset start, Offset end);
 
   /**
-   * Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}.
+   * Returns a factory to create a {@link PartitionReader} for each {@link InputPartition}.
    */
   PartitionReaderFactory createReaderFactory();
 }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
index 67bff0c..a066713 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
@@ -21,7 +21,7 @@ import org.apache.spark.annotation.Evolving;
 
 /**
  * An abstract representation of progress through a {@link MicroBatchStream} or
- * {@link ContinuousReadSupport}.
+ * {@link ContinuousStream}.
  * During execution, offsets provided by the data source implementation will be logged and used as
  * restart checkpoints. Each source should provide an offset implementation which the source can use
  * to reconstruct a position in the stream up to which data has been seen/processed.
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SparkDataStream.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SparkDataStream.java
index 8ea34be..30f38ce 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SparkDataStream.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SparkDataStream.java
@@ -24,7 +24,8 @@ import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
  * The base interface representing a readable data stream in a Spark streaming query. It's
  * responsible to manage the offsets of the streaming source in the streaming query.
  *
- * Data sources should implement concrete data stream interfaces: {@link MicroBatchStream}.
+ * Data sources should implement concrete data stream interfaces:
+ * {@link MicroBatchStream} and {@link ContinuousStream}.
  */
 @Evolving
 public interface SparkDataStream extends BaseStreamingSource {
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
deleted file mode 100644
index 9a8c1bd..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader.streaming;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.spark.sql.sources.v2.reader.ReadSupport;
-
-/**
- * A base interface for streaming read support. Data sources should implement concrete streaming
- * read support interfaces: {@link ContinuousReadSupport}.
- * This is exposed for a testing purpose.
- */
-@VisibleForTesting
-public interface StreamingReadSupport extends ReadSupport {
-
-  /**
-   * Returns the initial offset for a streaming query to start reading from. Note that the
-   * streaming data source should not assume that it will start reading from its initial offset:
-   * if Spark is restarting an existing query, it will restart from the check-pointed offset rather
-   * than the initial one.
-   */
-  Offset initialOffset();
-
-  /**
-   * Deserialize a JSON string into an Offset of the implementation-defined offset type.
-   *
-   * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
-   */
-  Offset deserializeOffset(String json);
-
-  /**
-   * Informs the source that Spark has completed processing all data for offsets less than or
-   * equal to `end` and will only request offsets greater than `end` in the future.
-   */
-  void commit(Offset end);
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
new file mode 100644
index 0000000..c7fcc67
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.sources.v2.reader._
+
+/**
+ * Physical plan node for scanning a batch of data from a data source v2.
+ */
+case class BatchScanExec(
+    output: Seq[AttributeReference],
+    @transient scan: Scan) extends DataSourceV2ScanExecBase {
+
+  @transient lazy val batch = scan.toBatch
+
+  // TODO: unify the equal/hashCode implementation for all data source v2 query plans.
+  override def equals(other: Any): Boolean = other match {
+    case other: BatchScanExec => this.batch == other.batch
+    case _ => false
+  }
+
+  override def hashCode(): Int = batch.hashCode()
+
+  override lazy val partitions: Seq[InputPartition] = batch.planInputPartitions()
+
+  override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory()
+
+  override lazy val inputRDD: RDD[InternalRow] = {
+    new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch)
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala
index c735b0e..f54ff60 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala
@@ -20,99 +20,44 @@ package org.apache.spark.sql.execution.datasources.v2
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical
-import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
-import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
 import org.apache.spark.sql.execution.streaming.continuous._
-import org.apache.spark.sql.sources.v2.DataSourceV2
 import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory, ContinuousReadSupport}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory, ContinuousStream, Offset}
 
 /**
  * Physical plan node for scanning data from a streaming data source with continuous mode.
  */
-// TODO: merge it and `MicroBatchScanExec`.
 case class ContinuousScanExec(
-    output: Seq[AttributeReference],
-    @transient source: DataSourceV2,
-    @transient options: Map[String, String],
-    @transient pushedFilters: Seq[Expression],
-    @transient readSupport: ReadSupport,
-    @transient scanConfig: ScanConfig)
-  extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan {
-
-  override def simpleString(maxFields: Int): String = "ScanV2 " + metadataString(maxFields)
+    output: Seq[Attribute],
+    @transient scan: Scan,
+    @transient stream: ContinuousStream,
+    @transient start: Offset) extends DataSourceV2ScanExecBase {
 
   // TODO: unify the equal/hashCode implementation for all data source v2 query plans.
   override def equals(other: Any): Boolean = other match {
-    case other: ContinuousScanExec =>
-      output == other.output && readSupport.getClass == other.readSupport.getClass &&
-        options == other.options
+    case other: ContinuousScanExec => this.stream == other.stream
     case _ => false
   }
 
-  override def hashCode(): Int = {
-    Seq(output, source, options).hashCode()
-  }
-
-  override def outputPartitioning: physical.Partitioning = readSupport match {
-    case _ if partitions.length == 1 =>
-      SinglePartition
+  override def hashCode(): Int = stream.hashCode()
 
-    case s: OldSupportsReportPartitioning =>
-      new DataSourcePartitioning(
-        s.outputPartitioning(scanConfig), AttributeMap(output.map(a => a -> a.name)))
+  override lazy val partitions: Seq[InputPartition] = stream.planInputPartitions(start)
 
-    case _ => super.outputPartitioning
+  override lazy val readerFactory: ContinuousPartitionReaderFactory = {
+    stream.createContinuousReaderFactory()
   }
 
-  private lazy val partitions: Seq[InputPartition] = readSupport.planInputPartitions(scanConfig)
-
-  private lazy val readerFactory = readSupport match {
-    case r: ContinuousReadSupport => r.createContinuousReaderFactory(scanConfig)
-    case _ => throw new IllegalStateException("unknown read support: " + readSupport)
-  }
-
-  override val supportsBatch: Boolean = {
-    require(partitions.forall(readerFactory.supportColumnarReads) ||
-      !partitions.exists(readerFactory.supportColumnarReads),
-      "Cannot mix row-based and columnar input partitions.")
-
-    partitions.exists(readerFactory.supportColumnarReads)
-  }
-
-  private lazy val inputRDD: RDD[InternalRow] = readSupport match {
-    case _: ContinuousReadSupport =>
-      assert(!supportsBatch,
-        "continuous stream reader does not support columnar read yet.")
-      EpochCoordinatorRef.get(
-          sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
-          sparkContext.env)
-        .askSync[Unit](SetReaderPartitions(partitions.size))
-      new ContinuousDataSourceRDD(
-        sparkContext,
-        sqlContext.conf.continuousStreamingExecutorQueueSize,
-        sqlContext.conf.continuousStreamingExecutorPollIntervalMs,
-        partitions,
-        schema,
-        readerFactory.asInstanceOf[ContinuousPartitionReaderFactory])
-
-    case _ =>
-      new DataSourceRDD(
-        sparkContext, partitions, readerFactory.asInstanceOf[PartitionReaderFactory], supportsBatch)
-  }
-
-  override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
-
-  override protected def doExecute(): RDD[InternalRow] = {
-    if (supportsBatch) {
-      WholeStageCodegenExec(this)(codegenStageId = 0).execute()
-    } else {
-      val numOutputRows = longMetric("numOutputRows")
-      inputRDD.map { r =>
-        numOutputRows += 1
-        r
-      }
-    }
+  override lazy val inputRDD: RDD[InternalRow] = {
+    EpochCoordinatorRef.get(
+      sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
+      sparkContext.env)
+      .askSync[Unit](SetReaderPartitions(partitions.size))
+    new ContinuousDataSourceRDD(
+      sparkContext,
+      sqlContext.conf.continuousStreamingExecutorQueueSize,
+      sqlContext.conf.continuousStreamingExecutorPollIntervalMs,
+      partitions,
+      schema,
+      readerFactory.asInstanceOf[ContinuousPartitionReaderFactory])
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 63e97e6..47cf26d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.sources.v2._
@@ -58,8 +58,6 @@ case class DataSourceV2Relation(
     case _ => throw new AnalysisException(s"Table is not readable: ${table.name()}")
   }
 
-
-
   def newWriteBuilder(schema: StructType): WriteBuilder = table match {
     case s: SupportsBatchWrite =>
       val dsOptions = new DataSourceOptions(options.asJava)
@@ -94,7 +92,7 @@ case class DataSourceV2Relation(
  */
 case class StreamingDataSourceV2Relation(
     output: Seq[Attribute],
-    scanDesc: String,
+    scan: Scan,
     stream: SparkDataStream,
     startOffset: Option[Offset] = None,
     endOffset: Option[Offset] = None)
@@ -104,7 +102,7 @@ case class StreamingDataSourceV2Relation(
 
   override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))
 
-  override def computeStats(): Statistics = stream match {
+  override def computeStats(): Statistics = scan match {
     case r: SupportsReportStatistics =>
       val statistics = r.estimateStatistics()
       Statistics(sizeInBytes = statistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
@@ -113,46 +111,6 @@ case class StreamingDataSourceV2Relation(
   }
 }
 
-// TODO: remove it after finish API refactor for continuous streaming.
-case class OldStreamingDataSourceV2Relation(
-    output: Seq[AttributeReference],
-    source: DataSourceV2,
-    options: Map[String, String],
-    readSupport: ReadSupport,
-    scanConfigBuilder: ScanConfigBuilder)
-  extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {
-
-  override def isStreaming: Boolean = true
-
-  override def simpleString(maxFields: Int): String = {
-    "Streaming RelationV2 " + metadataString(maxFields)
-  }
-
-  override def pushedFilters: Seq[Expression] = Nil
-
-  override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))
-
-  // TODO: unify the equal/hashCode implementation for all data source v2 query plans.
-  override def equals(other: Any): Boolean = other match {
-    case other: OldStreamingDataSourceV2Relation =>
-      output == other.output && readSupport.getClass == other.readSupport.getClass &&
-        options == other.options
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    Seq(output, source, options).hashCode()
-  }
-
-  override def computeStats(): Statistics = readSupport match {
-    case r: OldSupportsReportStatistics =>
-      val statistics = r.estimateStatistics(scanConfigBuilder.build())
-      Statistics(sizeInBytes = statistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
-    case _ =>
-      Statistics(sizeInBytes = conf.defaultSizeInBytes)
-  }
-}
-
 object DataSourceV2Relation {
   def create(table: Table, options: Map[String, String]): DataSourceV2Relation = {
     val output = table.schema().toAttributes
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
similarity index 66%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
index 53e4e77..da71e78 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
@@ -19,39 +19,26 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.AttributeMap
 import org.apache.spark.sql.catalyst.plans.physical
 import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReaderFactory, Scan, SupportsReportPartitioning}
 
-/**
- * Physical plan node for scanning a batch of data from a data source.
- */
-case class DataSourceV2ScanExec(
-    output: Seq[AttributeReference],
-    scanDesc: String,
-    @transient batch: Batch)
-  extends LeafExecNode with ColumnarBatchScan {
-
-  override def simpleString(maxFields: Int): String = {
-    s"ScanV2${truncatedString(output, "[", ", ", "]", maxFields)} $scanDesc"
-  }
+trait DataSourceV2ScanExecBase extends LeafExecNode with ColumnarBatchScan {
 
-  // TODO: unify the equal/hashCode implementation for all data source v2 query plans.
-  override def equals(other: Any): Boolean = other match {
-    case other: DataSourceV2ScanExec => this.batch == other.batch
-    case _ => false
-  }
+  def scan: Scan
 
-  override def hashCode(): Int = batch.hashCode()
+  def partitions: Seq[InputPartition]
 
-  private lazy val partitions = batch.planInputPartitions()
+  def readerFactory: PartitionReaderFactory
 
-  private lazy val readerFactory = batch.createReaderFactory()
+  override def simpleString(maxFields: Int): String = {
+    s"$nodeName${truncatedString(output, "[", ", ", "]", maxFields)} ${scan.description()}"
+  }
 
-  override def outputPartitioning: physical.Partitioning = batch match {
+  override def outputPartitioning: physical.Partitioning = scan match {
     case _ if partitions.length == 1 =>
       SinglePartition
 
@@ -70,13 +57,11 @@ case class DataSourceV2ScanExec(
     partitions.exists(readerFactory.supportColumnarReads)
   }
 
-  private lazy val inputRDD: RDD[InternalRow] = {
-    new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch)
-  }
+  def inputRDD: RDD[InternalRow]
 
   override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
 
-  override protected def doExecute(): RDD[InternalRow] = {
+  override def doExecute(): RDD[InternalRow] = {
     if (supportsBatch) {
       WholeStageCodegenExec(this)(codegenStageId = 0).execute()
     } else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index b4c5471..40ac5cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
 import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, MicroBatchStream}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
 import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode
 
 object DataSourceV2Strategy extends Strategy {
@@ -117,7 +117,7 @@ object DataSourceV2Strategy extends Strategy {
            |Output: ${output.mkString(", ")}
          """.stripMargin)
 
-      val plan = DataSourceV2ScanExec(output, scan.description(), scan.toBatch)
+      val plan = BatchScanExec(output, scan)
 
       val filterCondition = postScanFilters.reduceLeftOption(And)
       val withFilter = filterCondition.map(FilterExec(_, plan)).getOrElse(plan)
@@ -130,15 +130,14 @@ object DataSourceV2Strategy extends Strategy {
       // ensure there is a projection, which will produce unsafe rows required by some operators
       ProjectExec(r.output,
         MicroBatchScanExec(
-          r.output, r.scanDesc, microBatchStream, r.startOffset.get, r.endOffset.get)) :: Nil
+          r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get)) :: Nil
 
-    case r: OldStreamingDataSourceV2Relation =>
-      // TODO: support operator pushdown for streaming data sources.
-      val scanConfig = r.scanConfigBuilder.build()
+    case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isEmpty =>
+      val continuousStream = r.stream.asInstanceOf[ContinuousStream]
       // ensure there is a projection, which will produce unsafe rows required by some operators
       ProjectExec(r.output,
         ContinuousScanExec(
-          r.output, r.source, r.options, r.pushedFilters, r.readSupport, scanConfig)) :: Nil
+          r.output, r.scan, continuousStream, r.startOffset.get)) :: Nil
 
     case WriteToDataSourceV2(writer, query) =>
       WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
@@ -158,8 +157,7 @@ object DataSourceV2Strategy extends Strategy {
 
     case Repartition(1, false, child) =>
       val isContinuous = child.find {
-        case s: OldStreamingDataSourceV2Relation =>
-          s.readSupport.isInstanceOf[ContinuousReadSupport]
+        case r: StreamingDataSourceV2Relation => r.stream.isInstanceOf[ContinuousStream]
         case _ => false
       }.isDefined
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala
index feea8bc..d2e33d4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala
@@ -19,12 +19,8 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
-import org.apache.spark.sql.catalyst.plans.physical
-import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
-import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
-import org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReaderFactory, Scan}
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset}
 
 /**
@@ -32,14 +28,10 @@ import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offse
  */
 case class MicroBatchScanExec(
     output: Seq[Attribute],
-    scanDesc: String,
+    @transient scan: Scan,
     @transient stream: MicroBatchStream,
     @transient start: Offset,
-    @transient end: Offset) extends LeafExecNode with ColumnarBatchScan {
-
-  override def simpleString(maxFields: Int): String = {
-    s"ScanV2${truncatedString(output, "[", ", ", "]", maxFields)} $scanDesc"
-  }
+    @transient end: Offset) extends DataSourceV2ScanExecBase {
 
   // TODO: unify the equal/hashCode implementation for all data source v2 query plans.
   override def equals(other: Any): Boolean = other match {
@@ -49,44 +41,11 @@ case class MicroBatchScanExec(
 
   override def hashCode(): Int = stream.hashCode()
 
-  private lazy val partitions = stream.planInputPartitions(start, end)
-
-  private lazy val readerFactory = stream.createReaderFactory()
-
-  override def outputPartitioning: physical.Partitioning = stream match {
-    case _ if partitions.length == 1 =>
-      SinglePartition
-
-    case s: SupportsReportPartitioning =>
-      new DataSourcePartitioning(
-        s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
-
-    case _ => super.outputPartitioning
-  }
-
-  override def supportsBatch: Boolean = {
-    require(partitions.forall(readerFactory.supportColumnarReads) ||
-      !partitions.exists(readerFactory.supportColumnarReads),
-      "Cannot mix row-based and columnar input partitions.")
+  override lazy val partitions: Seq[InputPartition] = stream.planInputPartitions(start, end)
 
-    partitions.exists(readerFactory.supportColumnarReads)
-  }
+  override lazy val readerFactory: PartitionReaderFactory = stream.createReaderFactory()
 
-  private lazy val inputRDD: RDD[InternalRow] = {
+  override lazy val inputRDD: RDD[InternalRow] = {
     new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch)
   }
-
-  override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
-
-  override protected def doExecute(): RDD[InternalRow] = {
-    if (supportsBatch) {
-      WholeStageCodegenExec(this)(codegenStageId = 0).execute()
-    } else {
-      val numOutputRows = longMetric("numOutputRows")
-      inputRDD.map { r =>
-        numOutputRows += 1
-        r
-      }
-    }
-  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 792e3a3..2c33975 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -99,7 +99,7 @@ class MicroBatchExecution(
           // TODO: operator pushdown.
           val scan = table.newScanBuilder(dsOptions).build()
           val stream = scan.toMicroBatchStream(metadataPath)
-          StreamingDataSourceV2Relation(output, scan.description(), stream)
+          StreamingDataSourceV2Relation(output, scan, stream)
         })
       case s @ StreamingRelationV2(ds, dsName, _, _, output, v1Relation) =>
         v2ToExecutionRelationMap.getOrElseUpdate(s, {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SimpleStreamingScanConfigBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SimpleStreamingScanConfigBuilder.scala
deleted file mode 100644
index 1be0716..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SimpleStreamingScanConfigBuilder.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import org.apache.spark.sql.sources.v2.reader.{ScanConfig, ScanConfigBuilder}
-import org.apache.spark.sql.types.StructType
-
-/**
- * A very simple [[ScanConfigBuilder]] implementation that creates a simple [[ScanConfig]] to
- * carry schema and offsets for streaming data sources.
- */
-class SimpleStreamingScanConfigBuilder(
-    schema: StructType,
-    start: Offset,
-    end: Option[Offset] = None)
-  extends ScanConfigBuilder {
-
-  override def build(): ScanConfig = SimpleStreamingScanConfig(schema, start, end)
-}
-
-case class SimpleStreamingScanConfig(
-    readSchema: StructType,
-    start: Offset,
-    end: Option[Offset])
-  extends ScanConfig
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index 535fa1c..83d38dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
 import org.apache.spark.sql.execution.LeafExecNode
 import org.apache.spark.sql.execution.datasources.DataSource
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceV2, Table}
+import org.apache.spark.sql.sources.v2.{DataSourceV2, Table}
 
 object StreamingRelation {
   def apply(dataSource: DataSource): StreamingRelation = {
@@ -111,30 +111,6 @@ case class StreamingRelationV2(
 }
 
 /**
- * Used to link a [[DataSourceV2]] into a continuous processing execution.
- */
-case class ContinuousExecutionRelation(
-    source: ContinuousReadSupportProvider,
-    extraOptions: Map[String, String],
-    output: Seq[Attribute])(session: SparkSession)
-  extends LeafNode with MultiInstanceRelation {
-
-  override def otherCopyArgs: Seq[AnyRef] = session :: Nil
-  override def isStreaming: Boolean = true
-  override def toString: String = source.toString
-
-  // There's no sensible value here. On the execution path, this relation will be
-  // swapped out with microbatches. But some dataframe operations (in particular explain) do lead
-  // to this node surviving analysis. So we satisfy the LeafNode contract with the session default
-  // value.
-  override def computeStats(): Statistics = Statistics(
-    sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
-  )
-
-  override def newInstance(): LogicalPlan = this.copy(output = output.map(_.newInstance()))(session)
-}
-
-/**
  * A dummy physical plan for [[StreamingRelation]] to support
  * [[org.apache.spark.sql.Dataset.explain]]
  */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index c74fa14..b22795d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -22,20 +22,18 @@ import java.util.concurrent.TimeUnit
 import java.util.function.UnaryOperator
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+import scala.collection.mutable.{Map => MutableMap}
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTimestamp}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.execution.datasources.v2.{ContinuousScanExec, OldStreamingDataSourceV2Relation}
-import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
+import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _}
 import org.apache.spark.sql.sources.v2
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions, StreamingWriteSupportProvider}
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, PartitionOffset}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamingWriteSupportProvider, SupportsContinuousRead}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
 import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
 import org.apache.spark.util.Clock
 
@@ -54,25 +52,39 @@ class ContinuousExecution(
     sparkSession, name, checkpointRoot, analyzedPlan, sink,
     trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
 
-  @volatile protected var continuousSources: Seq[ContinuousReadSupport] = Seq()
-  override protected def sources: Seq[BaseStreamingSource] = continuousSources
+  @volatile protected var sources: Seq[ContinuousStream] = Seq()
 
   // For use only in test harnesses.
   private[sql] var currentEpochCoordinatorId: String = _
 
   override val logicalPlan: LogicalPlan = {
-    val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
-    analyzedPlan.transform {
-      case r @ StreamingRelationV2(
-          source: ContinuousReadSupportProvider, _, _, extraReaderOptions, output, _) =>
-        // TODO: shall we create `ContinuousReadSupport` here instead of each reconfiguration?
-        toExecutionRelationMap.getOrElseUpdate(r, {
-          ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
+    val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2Relation]()
+    var nextSourceId = 0
+    val _logicalPlan = analyzedPlan.transform {
+      case s @ StreamingRelationV2(
+          ds, dsName, table: SupportsContinuousRead, options, output, _) =>
+        v2ToRelationMap.getOrElseUpdate(s, {
+          val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
+          nextSourceId += 1
+          logInfo(s"Reading table [$table] from DataSourceV2 named '$dsName' [$ds]")
+          val dsOptions = new DataSourceOptions(options.asJava)
+          // TODO: operator pushdown.
+          val scan = table.newScanBuilder(dsOptions).build()
+          val stream = scan.toContinuousStream(metadataPath)
+          StreamingDataSourceV2Relation(output, scan, stream)
         })
+
       case StreamingRelationV2(_, sourceName, _, _, _, _) =>
         throw new UnsupportedOperationException(
           s"Data source $sourceName does not support continuous processing.")
     }
+
+    sources = _logicalPlan.collect {
+      case r: StreamingDataSourceV2Relation => r.stream.asInstanceOf[ContinuousStream]
+    }
+    uniqueSources = sources.distinct
+
+    _logicalPlan
   }
 
   private val triggerExecutor = trigger match {
@@ -92,6 +104,8 @@ class ContinuousExecution(
     do {
       runContinuous(sparkSessionForStream)
     } while (state.updateAndGet(stateUpdate) == ACTIVE)
+
+    stopSources()
   }
 
   /**
@@ -135,7 +149,7 @@ class ContinuousExecution(
         updateStatusMessage("Starting new streaming query")
         logInfo(s"Starting new streaming query.")
         currentBatchId = 0
-        OffsetSeq.fill(continuousSources.map(_ => null): _*)
+        OffsetSeq.fill(sources.map(_ => null): _*)
     }
   }
 
@@ -144,47 +158,17 @@ class ContinuousExecution(
    * @param sparkSessionForQuery Isolated [[SparkSession]] to run the continuous query with.
    */
   private def runContinuous(sparkSessionForQuery: SparkSession): Unit = {
-    // A list of attributes that will need to be updated.
-    val replacements = new ArrayBuffer[(Attribute, Attribute)]
-    // Translate from continuous relation to the underlying data source.
-    var nextSourceId = 0
-    continuousSources = logicalPlan.collect {
-      case ContinuousExecutionRelation(dataSource, extraReaderOptions, output) =>
-        val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
-        nextSourceId += 1
-
-        dataSource.createContinuousReadSupport(
-          metadataPath,
-          new DataSourceOptions(extraReaderOptions.asJava))
-    }
-    uniqueSources = continuousSources.distinct
-
     val offsets = getStartOffsets(sparkSessionForQuery)
 
-    var insertedSourceId = 0
-    val withNewSources = logicalPlan transform {
-      case ContinuousExecutionRelation(source, options, output) =>
-        val readSupport = continuousSources(insertedSourceId)
-        insertedSourceId += 1
-        val newOutput = readSupport.fullSchema().toAttributes
-        val maxFields = SQLConf.get.maxToStringFields
-        assert(output.size == newOutput.size,
-          s"Invalid reader: ${truncatedString(output, ",", maxFields)} != " +
-            s"${truncatedString(newOutput, ",", maxFields)}")
-        replacements ++= output.zip(newOutput)
-
+    val withNewSources: LogicalPlan = logicalPlan transform {
+      case relation: StreamingDataSourceV2Relation =>
         val loggedOffset = offsets.offsets(0)
-        val realOffset = loggedOffset.map(off => readSupport.deserializeOffset(off.json))
-        val startOffset = realOffset.getOrElse(readSupport.initialOffset)
-        val scanConfigBuilder = readSupport.newScanConfigBuilder(startOffset)
-        OldStreamingDataSourceV2Relation(newOutput, source, options, readSupport, scanConfigBuilder)
+        val realOffset = loggedOffset.map(off => relation.stream.deserializeOffset(off.json))
+        val startOffset = realOffset.getOrElse(relation.stream.initialOffset)
+        relation.copy(startOffset = Some(startOffset))
     }
 
-    // Rewire the plan to use the new attributes that were returned by the source.
-    val replacementMap = AttributeMap(replacements)
-    val triggerLogicalPlan = withNewSources transformAllExpressions {
-      case a: Attribute if replacementMap.contains(a) =>
-        replacementMap(a).withMetadata(a.metadata)
+    withNewSources.transformAllExpressions {
       case (_: CurrentTimestamp | _: CurrentDate) =>
         throw new IllegalStateException(
           "CurrentTimestamp and CurrentDate not yet supported for continuous processing")
@@ -192,15 +176,15 @@ class ContinuousExecution(
 
     val writer = sink.createStreamingWriteSupport(
       s"$runId",
-      triggerLogicalPlan.schema,
+      withNewSources.schema,
       outputMode,
       new DataSourceOptions(extraOptions.asJava))
-    val withSink = WriteToContinuousDataSource(writer, triggerLogicalPlan)
+    val planWithSink = WriteToContinuousDataSource(writer, withNewSources)
 
     reportTimeTaken("queryPlanning") {
       lastExecution = new IncrementalExecution(
         sparkSessionForQuery,
-        withSink,
+        planWithSink,
         outputMode,
         checkpointFile("state"),
         id,
@@ -210,10 +194,9 @@ class ContinuousExecution(
       lastExecution.executedPlan // Force the lazy generation of execution plan
     }
 
-    val (readSupport, scanConfig) = lastExecution.executedPlan.collect {
-      case scan: ContinuousScanExec
-          if scan.readSupport.isInstanceOf[ContinuousReadSupport] =>
-        scan.readSupport.asInstanceOf[ContinuousReadSupport] -> scan.scanConfig
+    val stream = planWithSink.collect {
+      case relation: StreamingDataSourceV2Relation =>
+        relation.stream.asInstanceOf[ContinuousStream]
     }.head
 
     sparkSessionForQuery.sparkContext.setLocalProperty(
@@ -233,16 +216,14 @@ class ContinuousExecution(
     // Use the parent Spark session for the endpoint since it's where this query ID is registered.
     val epochEndpoint =
       EpochCoordinatorRef.create(
-        writer, readSupport, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get)
+        writer, stream, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get)
     val epochUpdateThread = new Thread(new Runnable {
       override def run: Unit = {
         try {
           triggerExecutor.execute(() => {
             startTrigger()
 
-            val shouldReconfigure = readSupport.needsReconfiguration(scanConfig) &&
-              state.compareAndSet(ACTIVE, RECONFIGURING)
-            if (shouldReconfigure) {
+            if (stream.needsReconfiguration && state.compareAndSet(ACTIVE, RECONFIGURING)) {
               if (queryExecutionThread.isAlive) {
                 queryExecutionThread.interrupt()
               }
@@ -289,7 +270,6 @@ class ContinuousExecution(
       epochUpdateThread.interrupt()
       epochUpdateThread.join()
 
-      stopSources()
       sparkSession.sparkContext.cancelJobGroup(runId.toString)
     }
   }
@@ -299,11 +279,11 @@ class ContinuousExecution(
    */
   def addOffset(
       epoch: Long,
-      readSupport: ContinuousReadSupport,
+      stream: ContinuousStream,
       partitionOffsets: Seq[PartitionOffset]): Unit = {
-    assert(continuousSources.length == 1, "only one continuous source supported currently")
+    assert(sources.length == 1, "only one continuous source supported currently")
 
-    val globalOffset = readSupport.mergeOffsets(partitionOffsets.toArray)
+    val globalOffset = stream.mergeOffsets(partitionOffsets.toArray)
     val oldOffset = synchronized {
       offsetLog.add(epoch, OffsetSeq.fill(globalOffset))
       offsetLog.get(epoch - 1)
@@ -329,7 +309,7 @@ class ContinuousExecution(
   def commit(epoch: Long): Unit = {
     updateStatusMessage(s"Committing epoch $epoch")
 
-    assert(continuousSources.length == 1, "only one continuous source supported currently")
+    assert(sources.length == 1, "only one continuous source supported currently")
     assert(offsetLog.get(epoch).isDefined, s"offset for epoch $epoch not reported before commit")
 
     synchronized {
@@ -338,9 +318,9 @@ class ContinuousExecution(
       if (queryExecutionThread.isAlive) {
         commitLog.add(epoch, CommitMetadata())
         val offset =
-          continuousSources(0).deserializeOffset(offsetLog.get(epoch).get.offsets(0).get.json)
-        committedOffsets ++= Seq(continuousSources(0) -> offset)
-        continuousSources(0).commit(offset.asInstanceOf[v2.reader.streaming.Offset])
+          sources(0).deserializeOffset(offsetLog.get(epoch).get.offsets(0).get.json)
+        committedOffsets ++= Seq(sources(0) -> offset)
+        sources(0).commit(offset.asInstanceOf[v2.reader.streaming.Offset])
       } else {
         return
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
index a6cde2b..48ff70f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
@@ -22,23 +22,22 @@ import org.json4s.jackson.Serialization
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.streaming.{RateStreamOffset, SimpleStreamingScanConfig, SimpleStreamingScanConfigBuilder, ValueRunTimeMsPair}
-import org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
+import org.apache.spark.sql.execution.streaming.{RateStreamOffset, ValueRunTimeMsPair}
 import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.reader.streaming._
-import org.apache.spark.sql.types.StructType
 
 case class RateStreamPartitionOffset(
    partition: Int, currentValue: Long, currentTimeMs: Long) extends PartitionOffset
 
-class RateStreamContinuousReadSupport(options: DataSourceOptions) extends ContinuousReadSupport {
+class RateStreamContinuousStream(
+    rowsPerSecond: Long,
+    numPartitions: Int,
+    options: DataSourceOptions) extends ContinuousStream {
   implicit val defaultFormats: DefaultFormats = DefaultFormats
 
   val creationTime = System.currentTimeMillis()
 
-  val numPartitions = options.get(RateStreamProvider.NUM_PARTITIONS).orElse("5").toInt
-  val rowsPerSecond = options.get(RateStreamProvider.ROWS_PER_SECOND).orElse("6").toLong
   val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble
 
   override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
@@ -54,18 +53,10 @@ class RateStreamContinuousReadSupport(options: DataSourceOptions) extends Contin
     RateStreamOffset(Serialization.read[Map[Int, ValueRunTimeMsPair]](json))
   }
 
-  override def fullSchema(): StructType = RateStreamProvider.SCHEMA
-
-  override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = {
-    new SimpleStreamingScanConfigBuilder(fullSchema(), start)
-  }
-
   override def initialOffset: Offset = createInitialOffset(numPartitions, creationTime)
 
-  override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
-    val startOffset = config.asInstanceOf[SimpleStreamingScanConfig].start
-
-    val partitionStartMap = startOffset match {
+  override def planInputPartitions(start: Offset): Array[InputPartition] = {
+    val partitionStartMap = start match {
       case off: RateStreamOffset => off.partitionToValueAndRunTimeMs
       case off =>
         throw new IllegalArgumentException(
@@ -91,8 +82,7 @@ class RateStreamContinuousReadSupport(options: DataSourceOptions) extends Contin
     }.toArray
   }
 
-  override def createContinuousReaderFactory(
-      config: ScanConfig): ContinuousPartitionReaderFactory = {
+  override def createContinuousReaderFactory(): ContinuousPartitionReaderFactory = {
     RateStreamContinuousReaderFactory
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
index 28ab244..e7bc713 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
@@ -31,37 +31,29 @@ import org.json4s.jackson.Serialization
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc.RpcEndpointRef
-import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.streaming.{Offset => _, _}
 import org.apache.spark.sql.execution.streaming.sources.TextSocketReader
 import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.reader.streaming._
-import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.RpcUtils
 
 
 /**
- * A ContinuousReadSupport that reads text lines through a TCP socket, designed only for tutorials
- * and debugging. This ContinuousReadSupport will *not* work in production applications due to
+ * A [[ContinuousStream]] that reads text lines through a TCP socket, designed only for tutorials
+ * and debugging. This ContinuousStream will *not* work in production applications due to
  * multiple reasons, including no support for fault recovery.
  *
  * The driver maintains a socket connection to the host-port, keeps the received messages in
  * buckets and serves the messages to the executors via a RPC endpoint.
  */
-class TextSocketContinuousReadSupport(options: DataSourceOptions)
-  extends ContinuousReadSupport with Logging {
+class TextSocketContinuousStream(
+    host: String, port: Int, numPartitions: Int, options: DataSourceOptions)
+  extends ContinuousStream with Logging {
 
   implicit val defaultFormats: DefaultFormats = DefaultFormats
 
-  private val host: String = options.get("host").get()
-  private val port: Int = options.get("port").get().toInt
-
-  assert(SparkSession.getActiveSession.isDefined)
-  private val spark = SparkSession.getActiveSession.get
-  private val numPartitions = spark.sparkContext.defaultParallelism
-
   @GuardedBy("this")
   private var socket: Socket = _
 
@@ -101,21 +93,9 @@ class TextSocketContinuousReadSupport(options: DataSourceOptions)
     startOffset
   }
 
-  override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = {
-    new SimpleStreamingScanConfigBuilder(fullSchema(), start)
-  }
-
-  override def fullSchema(): StructType = {
-    if (includeTimestamp) {
-      TextSocketReader.SCHEMA_TIMESTAMP
-    } else {
-      TextSocketReader.SCHEMA_REGULAR
-    }
-  }
 
-  override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
-    val startOffset = config.asInstanceOf[SimpleStreamingScanConfig]
-      .start.asInstanceOf[TextSocketOffset]
+  override def planInputPartitions(start: Offset): Array[InputPartition] = {
+    val startOffset = start.asInstanceOf[TextSocketOffset]
     recordEndpoint.setStartOffsets(startOffset.offsets)
     val endpointName = s"TextSocketContinuousReaderEndpoint-${java.util.UUID.randomUUID()}"
     endpointRef = recordEndpoint.rpcEnv.setupEndpoint(endpointName, recordEndpoint)
@@ -140,8 +120,7 @@ class TextSocketContinuousReadSupport(options: DataSourceOptions)
     }.toArray
   }
 
-  override def createContinuousReaderFactory(
-      config: ScanConfig): ContinuousPartitionReaderFactory = {
+  override def createContinuousReaderFactory(): ContinuousPartitionReaderFactory = {
     TextSocketReaderFactory
   }
 
@@ -197,7 +176,7 @@ class TextSocketContinuousReadSupport(options: DataSourceOptions)
               logWarning(s"Stream closed by $host:$port")
               return
             }
-            TextSocketContinuousReadSupport.this.synchronized {
+            TextSocketContinuousStream.this.synchronized {
               currentOffset += 1
               val newData = (line,
                 Timestamp.valueOf(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
index 2238ce2..d1bda79 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
@@ -23,7 +23,7 @@ import org.apache.spark.SparkEnv
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, PartitionOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
 import org.apache.spark.util.RpcUtils
@@ -83,14 +83,14 @@ private[sql] object EpochCoordinatorRef extends Logging {
    */
   def create(
       writeSupport: StreamingWriteSupport,
-      readSupport: ContinuousReadSupport,
+      stream: ContinuousStream,
       query: ContinuousExecution,
       epochCoordinatorId: String,
       startEpoch: Long,
       session: SparkSession,
       env: SparkEnv): RpcEndpointRef = synchronized {
     val coordinator = new EpochCoordinator(
-      writeSupport, readSupport, query, startEpoch, session, env.rpcEnv)
+      writeSupport, stream, query, startEpoch, session, env.rpcEnv)
     val ref = env.rpcEnv.setupEndpoint(endpointName(epochCoordinatorId), coordinator)
     logInfo("Registered EpochCoordinator endpoint")
     ref
@@ -116,7 +116,7 @@ private[sql] object EpochCoordinatorRef extends Logging {
  */
 private[continuous] class EpochCoordinator(
     writeSupport: StreamingWriteSupport,
-    readSupport: ContinuousReadSupport,
+    stream: ContinuousStream,
     query: ContinuousExecution,
     startEpoch: Long,
     session: SparkSession,
@@ -220,7 +220,7 @@ private[continuous] class EpochCoordinator(
         partitionOffsets.collect { case ((e, _), o) if e == epoch => o }
       if (thisEpochOffsets.size == numReaderPartitions) {
         logDebug(s"Epoch $epoch has offsets reported from all partitions: $thisEpochOffsets")
-        query.addOffset(epoch, readSupport, thisEpochOffsets.toSeq)
+        query.addOffset(epoch, stream, thisEpochOffsets.toSeq)
         resolveCommitsAtEpoch(epoch)
       }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 5406679..e71f81c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -33,9 +33,9 @@ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUti
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsMicroBatchRead, Table, TableProvider}
+import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream, Offset => OffsetV2}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 
@@ -68,7 +68,15 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends Bas
 
   def fullSchema(): StructType = encoder.schema
 
-  protected def logicalPlan: LogicalPlan
+  protected val logicalPlan: LogicalPlan = {
+    StreamingRelationV2(
+      MemoryStreamTableProvider,
+      "memory",
+      new MemoryStreamTable(this),
+      Map.empty,
+      attributes,
+      None)(sqlContext.sparkSession)
+  }
 
   def addData(data: TraversableOnce[A]): Offset
 }
@@ -81,7 +89,8 @@ object MemoryStreamTableProvider extends TableProvider {
   }
 }
 
-class MemoryStreamTable(val stream: MemoryStreamBase[_]) extends Table with SupportsMicroBatchRead {
+class MemoryStreamTable(val stream: MemoryStreamBase[_]) extends Table
+  with SupportsMicroBatchRead with SupportsContinuousRead {
 
   override def name(): String = "MemoryStreamDataSource"
 
@@ -101,7 +110,11 @@ class MemoryStreamScanBuilder(stream: MemoryStreamBase[_]) extends ScanBuilder w
   override def readSchema(): StructType = stream.fullSchema()
 
   override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
-    stream.asInstanceOf[MemoryStream[_]]
+    stream.asInstanceOf[MicroBatchStream]
+  }
+
+  override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
+    stream.asInstanceOf[ContinuousStream]
   }
 }
 
@@ -113,16 +126,6 @@ class MemoryStreamScanBuilder(stream: MemoryStreamBase[_]) extends ScanBuilder w
 case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
     extends MemoryStreamBase[A](sqlContext) with MicroBatchStream with Logging {
 
-  protected val logicalPlan: LogicalPlan = {
-    StreamingRelationV2(
-      MemoryStreamTableProvider,
-      "memory",
-      new MemoryStreamTable(this),
-      Map.empty,
-      attributes,
-      None)(sqlContext.sparkSession)
-  }
-
   protected val output = logicalPlan.output
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
index 8c5c9ef..41eaf84 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
@@ -30,8 +30,7 @@ import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.sql.{Encoder, SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.streaming.{Offset => _, _}
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions}
-import org.apache.spark.sql.sources.v2.reader.{InputPartition, ScanConfig, ScanConfigBuilder}
+import org.apache.spark.sql.sources.v2.reader.InputPartition
 import org.apache.spark.sql.sources.v2.reader.streaming._
 import org.apache.spark.util.RpcUtils
 
@@ -44,15 +43,10 @@ import org.apache.spark.util.RpcUtils
  *    the specified offset within the list, or null if that offset doesn't yet have a record.
  */
 class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPartitions: Int = 2)
-  extends MemoryStreamBase[A](sqlContext)
-    with ContinuousReadSupportProvider with ContinuousReadSupport {
+  extends MemoryStreamBase[A](sqlContext) with ContinuousStream {
 
   private implicit val formats = Serialization.formats(NoTypeHints)
 
-  protected val logicalPlan =
-    // TODO: don't pass null as table after finish API refactor for continuous stream.
-    StreamingRelationV2(this, "memory", null, Map(), attributes, None)(sqlContext.sparkSession)
-
   // ContinuousReader implementation
 
   @GuardedBy("this")
@@ -87,13 +81,9 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa
     )
   }
 
-  override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = {
-    new SimpleStreamingScanConfigBuilder(fullSchema(), start)
-  }
 
-  override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
-    val startOffset = config.asInstanceOf[SimpleStreamingScanConfig]
-      .start.asInstanceOf[ContinuousMemoryStreamOffset]
+  override def planInputPartitions(start: Offset): Array[InputPartition] = {
+    val startOffset = start.asInstanceOf[ContinuousMemoryStreamOffset]
     synchronized {
       val endpointName = s"ContinuousMemoryStreamRecordEndpoint-${java.util.UUID.randomUUID()}-$id"
       endpointRef =
@@ -105,8 +95,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa
     }
   }
 
-  override def createContinuousReaderFactory(
-      config: ScanConfig): ContinuousPartitionReaderFactory = {
+  override def createContinuousReaderFactory(): ContinuousPartitionReaderFactory = {
     ContinuousMemoryStreamReaderFactory
   }
 
@@ -115,12 +104,6 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa
   }
 
   override def commit(end: Offset): Unit = {}
-
-  // ContinuousReadSupportProvider implementation
-  // This is necessary because of how StreamTest finds the source for AddDataMemory steps.
-  override def createContinuousReadSupport(
-      checkpointLocation: String,
-      options: DataSourceOptions): ContinuousReadSupport = this
 }
 
 object ContinuousMemoryStream {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
index 8d334f0..075c6b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
@@ -19,11 +19,11 @@ package org.apache.spark.sql.execution.streaming.sources
 
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReadSupport
+import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousStream
 import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, MicroBatchStream}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
 import org.apache.spark.sql.types._
 
 /**
@@ -41,7 +41,7 @@ import org.apache.spark.sql.types._
  *    be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed.
  */
 class RateStreamProvider extends DataSourceV2
-  with TableProvider with ContinuousReadSupportProvider with DataSourceRegister {
+  with TableProvider with DataSourceRegister {
   import RateStreamProvider._
 
   override def getTable(options: DataSourceOptions): Table = {
@@ -68,12 +68,6 @@ class RateStreamProvider extends DataSourceV2
     new RateStreamTable(rowsPerSecond, rampUpTimeSeconds, numPartitions)
   }
 
-  override def createContinuousReadSupport(
-     checkpointLocation: String,
-     options: DataSourceOptions): ContinuousReadSupport = {
-    new RateStreamContinuousReadSupport(options)
-  }
-
   override def shortName(): String = "rate"
 }
 
@@ -81,7 +75,7 @@ class RateStreamTable(
     rowsPerSecond: Long,
     rampUpTimeSeconds: Long,
     numPartitions: Int)
-  extends Table with SupportsMicroBatchRead {
+  extends Table with SupportsMicroBatchRead with SupportsContinuousRead {
 
   override def name(): String = {
     s"RateStream(rowsPerSecond=$rowsPerSecond, rampUpTimeSeconds=$rampUpTimeSeconds, " +
@@ -98,6 +92,10 @@ class RateStreamTable(
         new RateStreamMicroBatchStream(
           rowsPerSecond, rampUpTimeSeconds, numPartitions, options, checkpointLocation)
       }
+
+      override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
+        new RateStreamContinuousStream(rowsPerSecond, numPartitions, options)
+      }
     }
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
index ddf398b..540131c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
@@ -26,7 +26,6 @@ import javax.annotation.concurrent.GuardedBy
 import scala.collection.mutable.ListBuffer
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.streaming.LongOffset
@@ -40,7 +39,8 @@ import org.apache.spark.unsafe.types.UTF8String
  * and debugging. This MicroBatchReadSupport will *not* work in production applications due to
  * multiple reasons, including no support for fault recovery.
  */
-class TextSocketMicroBatchStream(host: String, port: Int, options: DataSourceOptions)
+class TextSocketMicroBatchStream(
+    host: String, port: Int, numPartitions: Int, options: DataSourceOptions)
   extends MicroBatchStream with Logging {
 
   @GuardedBy("this")
@@ -124,10 +124,6 @@ class TextSocketMicroBatchStream(host: String, port: Int, options: DataSourceOpt
       batches.slice(sliceStart, sliceEnd)
     }
 
-    assert(SparkSession.getActiveSession.isDefined)
-    val spark = SparkSession.getActiveSession.get
-    val numPartitions = spark.sparkContext.defaultParallelism
-
     val slices = Array.fill(numPartitions)(new ListBuffer[(UTF8String, Long)])
     rawList.zipWithIndex.foreach { case (r, idx) =>
       slices(idx % numPartitions).append(r)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
index 3500778..c3b24a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
@@ -24,16 +24,15 @@ import scala.util.{Failure, Success, Try}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
-import org.apache.spark.sql.execution.streaming.continuous.TextSocketContinuousReadSupport
+import org.apache.spark.sql.execution.streaming.continuous.TextSocketContinuousStream
 import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, MicroBatchStream}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
 import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
 
 class TextSocketSourceProvider extends DataSourceV2
-  with TableProvider with ContinuousReadSupportProvider
-  with DataSourceRegister with Logging {
+  with TableProvider with DataSourceRegister with Logging {
 
   private def checkParameters(params: DataSourceOptions): Unit = {
     logWarning("The socket source should not be used for production applications! " +
@@ -58,22 +57,16 @@ class TextSocketSourceProvider extends DataSourceV2
     new TextSocketTable(
       options.get("host").get,
       options.getInt("port", -1),
+      options.getInt("numPartitions", SparkSession.active.sparkContext.defaultParallelism),
       options.getBoolean("includeTimestamp", false))
   }
 
-  override def createContinuousReadSupport(
-      checkpointLocation: String,
-      options: DataSourceOptions): ContinuousReadSupport = {
-    checkParameters(options)
-    new TextSocketContinuousReadSupport(options)
-  }
-
   /** String that represents the format that this data source provider uses. */
   override def shortName(): String = "socket"
 }
 
-class TextSocketTable(host: String, port: Int, includeTimestamp: Boolean)
-  extends Table with SupportsMicroBatchRead {
+class TextSocketTable(host: String, port: Int, numPartitions: Int, includeTimestamp: Boolean)
+  extends Table with SupportsMicroBatchRead with SupportsContinuousRead {
 
   override def name(): String = s"Socket[$host:$port]"
 
@@ -90,7 +83,11 @@ class TextSocketTable(host: String, port: Int, includeTimestamp: Boolean)
       override def readSchema(): StructType = schema()
 
       override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
-        new TextSocketMicroBatchStream(host, port, options)
+        new TextSocketMicroBatchStream(host, port, numPartitions, options)
+      }
+
+      override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
+        new TextSocketContinuousStream(host, port, numPartitions, options)
       }
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 417dd55..8666818 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -30,9 +30,7 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
 import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2}
 import org.apache.spark.sql.sources.StreamSourceProvider
 import org.apache.spark.sql.sources.v2._
-import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.Utils
 
 /**
  * Interface used to load a streaming `Dataset` from external storage systems (e.g. file systems,
@@ -183,39 +181,12 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
           case _ => provider.getTable(dsOptions)
         }
         table match {
-          case s: SupportsMicroBatchRead =>
+          case _: SupportsMicroBatchRead | _: SupportsContinuousRead =>
             Dataset.ofRows(
               sparkSession,
               StreamingRelationV2(
-                provider, source, s, options,
-                table.schema.toAttributes, v1Relation)(sparkSession))
-
-          case _ if ds.isInstanceOf[ContinuousReadSupportProvider] =>
-            val provider = ds.asInstanceOf[ContinuousReadSupportProvider]
-            var tempReadSupport: ContinuousReadSupport = null
-            val schema = try {
-              val tmpCheckpointPath = Utils.createTempDir(namePrefix = s"tempCP").getCanonicalPath
-              tempReadSupport = if (userSpecifiedSchema.isDefined) {
-                provider.createContinuousReadSupport(
-                  userSpecifiedSchema.get, tmpCheckpointPath, dsOptions)
-              } else {
-                provider.createContinuousReadSupport(tmpCheckpointPath, dsOptions)
-              }
-              tempReadSupport.fullSchema()
-            } finally {
-              // Stop tempReader to avoid side-effect thing
-              if (tempReadSupport != null) {
-                tempReadSupport.stop()
-                tempReadSupport = null
-              }
-            }
-            Dataset.ofRows(
-              sparkSession,
-              // TODO: do not pass null as table after finish the API refactor for continuous
-              // stream.
-              StreamingRelationV2(
-                provider, source, table = null, options,
-                schema.toAttributes, v1Relation)(sparkSession))
+                provider, source, table, options, table.schema.toAttributes, v1Relation)(
+                sparkSession))
 
           // fallback to v1
           case _ => Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
index d40a1fd..d0418f8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relati
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous._
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.reader.streaming.Offset
 import org.apache.spark.sql.streaming.StreamTest
 import org.apache.spark.util.ManualClock
@@ -308,30 +308,17 @@ class RateStreamProviderSuite extends StreamTest {
       "rate source does not support user-specified schema"))
   }
 
-  test("continuous in registry") {
-    DataSource.lookupDataSource("rate", spark.sqlContext.conf).
-      getConstructor().newInstance() match {
-      case ds: ContinuousReadSupportProvider =>
-        val readSupport = ds.createContinuousReadSupport(
-          "", DataSourceOptions.empty())
-        assert(readSupport.isInstanceOf[RateStreamContinuousReadSupport])
-      case _ =>
-        throw new IllegalStateException("Could not find read support for continuous rate")
-    }
-  }
-
   test("continuous data") {
-    val readSupport = new RateStreamContinuousReadSupport(
-      new DataSourceOptions(Map("numPartitions" -> "2", "rowsPerSecond" -> "20").asJava))
-    val config = readSupport.newScanConfigBuilder(readSupport.initialOffset).build()
-    val tasks = readSupport.planInputPartitions(config)
-    val readerFactory = readSupport.createContinuousReaderFactory(config)
-    assert(tasks.size == 2)
+    val stream = new RateStreamContinuousStream(
+      rowsPerSecond = 20, numPartitions = 2, options = DataSourceOptions.empty())
+    val partitions = stream.planInputPartitions(stream.initialOffset)
+    val readerFactory = stream.createContinuousReaderFactory()
+    assert(partitions.size == 2)
 
     val data = scala.collection.mutable.ListBuffer[InternalRow]()
-    tasks.foreach {
+    partitions.foreach {
       case t: RateStreamContinuousInputPartition =>
-        val startTimeMs = readSupport.initialOffset()
+        val startTimeMs = stream.initialOffset()
           .asInstanceOf[RateStreamOffset]
           .partitionToValueAndRunTimeMs(t.partitionIndex)
           .runTimeMs
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
index cf069d5..33c65d7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
@@ -29,6 +29,7 @@ import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
 import org.apache.spark.sql.execution.streaming._
@@ -294,25 +295,25 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
     serverThread = new ServerThread()
     serverThread.start()
 
-    val readSupport = new TextSocketContinuousReadSupport(
-      new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost",
-        "port" -> serverThread.port.toString).asJava))
-
-    val scanConfig = readSupport.newScanConfigBuilder(readSupport.initialOffset()).build()
-    val tasks = readSupport.planInputPartitions(scanConfig)
-    assert(tasks.size == 2)
+    val stream = new TextSocketContinuousStream(
+      host = "localhost",
+      port = serverThread.port,
+      numPartitions = 2,
+      options = DataSourceOptions.empty())
+    val partitions = stream.planInputPartitions(stream.initialOffset())
+    assert(partitions.length == 2)
 
     val numRecords = 10
     val data = scala.collection.mutable.ListBuffer[Int]()
     val offsets = scala.collection.mutable.ListBuffer[Int]()
-    val readerFactory = readSupport.createContinuousReaderFactory(scanConfig)
+    val readerFactory = stream.createContinuousReaderFactory()
     import org.scalatest.time.SpanSugar._
     failAfter(5 seconds) {
       // inject rows, read and check the data and offsets
       for (i <- 0 until numRecords) {
         serverThread.enqueue(i.toString)
       }
-      tasks.foreach {
+      partitions.foreach {
         case t: TextSocketContinuousInputPartition =>
           val r = readerFactory.createReader(t).asInstanceOf[TextSocketContinuousPartitionReader]
           for (i <- 0 until numRecords / 2) {
@@ -330,15 +331,15 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
           data.clear()
         case _ => throw new IllegalStateException("Unexpected task type")
       }
-      assert(readSupport.startOffset.offsets == List(3, 3))
-      readSupport.commit(TextSocketOffset(List(5, 5)))
-      assert(readSupport.startOffset.offsets == List(5, 5))
+      assert(stream.startOffset.offsets == List(3, 3))
+      stream.commit(TextSocketOffset(List(5, 5)))
+      assert(stream.startOffset.offsets == List(5, 5))
     }
 
     def commitOffset(partition: Int, offset: Int): Unit = {
-      val offsetsToCommit = readSupport.startOffset.offsets.updated(partition, offset)
-      readSupport.commit(TextSocketOffset(offsetsToCommit))
-      assert(readSupport.startOffset.offsets == offsetsToCommit)
+      val offsetsToCommit = stream.startOffset.offsets.updated(partition, offset)
+      stream.commit(TextSocketOffset(offsetsToCommit))
+      assert(stream.startOffset.offsets == offsetsToCommit)
     }
   }
 
@@ -346,13 +347,15 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
     serverThread = new ServerThread()
     serverThread.start()
 
-    val readSupport = new TextSocketContinuousReadSupport(
-      new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost",
-        "port" -> serverThread.port.toString).asJava))
+    val stream = new TextSocketContinuousStream(
+      host = "localhost",
+      port = serverThread.port,
+      numPartitions = 2,
+      options = DataSourceOptions.empty())
 
-    readSupport.startOffset = TextSocketOffset(List(5, 5))
+    stream.startOffset = TextSocketOffset(List(5, 5))
     assertThrows[IllegalStateException] {
-      readSupport.commit(TextSocketOffset(List(6, 6)))
+      stream.commit(TextSocketOffset(List(6, 6)))
     }
   }
 
@@ -360,21 +363,21 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
     serverThread = new ServerThread()
     serverThread.start()
 
-    val readSupport = new TextSocketContinuousReadSupport(
-      new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost",
-        "includeTimestamp" -> "true",
-        "port" -> serverThread.port.toString).asJava))
-    val scanConfig = readSupport.newScanConfigBuilder(readSupport.initialOffset()).build()
-    val tasks = readSupport.planInputPartitions(scanConfig)
-    assert(tasks.size == 2)
+    val stream = new TextSocketContinuousStream(
+      host = "localhost",
+      port = serverThread.port,
+      numPartitions = 2,
+      options = new DataSourceOptions(Map("includeTimestamp" -> "true").asJava))
+    val partitions = stream.planInputPartitions(stream.initialOffset())
+    assert(partitions.size == 2)
 
     val numRecords = 4
     // inject rows, read and check the data and offsets
     for (i <- 0 until numRecords) {
       serverThread.enqueue(i.toString)
     }
-    val readerFactory = readSupport.createContinuousReaderFactory(scanConfig)
-    tasks.foreach {
+    val readerFactory = stream.createContinuousReaderFactory()
+    partitions.foreach {
       case t: TextSocketContinuousInputPartition =>
         val r = readerFactory.createReader(t).asInstanceOf[TextSocketContinuousPartitionReader]
         for (_ <- 0 until numRecords / 2) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index c60ea4a..511fdfe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -24,7 +24,7 @@ import test.org.apache.spark.sql.sources.v2._
 import org.apache.spark.SparkException
 import org.apache.spark.sql.{DataFrame, QueryTest, Row}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanExec}
+import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation}
 import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
 import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
 import org.apache.spark.sql.functions._
@@ -40,14 +40,14 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
 
   private def getBatch(query: DataFrame): AdvancedBatch = {
     query.queryExecution.executedPlan.collect {
-      case d: DataSourceV2ScanExec =>
+      case d: BatchScanExec =>
         d.batch.asInstanceOf[AdvancedBatch]
     }.head
   }
 
   private def getJavaBatch(query: DataFrame): JavaAdvancedDataSourceV2.AdvancedBatch = {
     query.queryExecution.executedPlan.collect {
-      case d: DataSourceV2ScanExec =>
+      case d: BatchScanExec =>
         d.batch.asInstanceOf[JavaAdvancedDataSourceV2.AdvancedBatch]
     }.head
   }
@@ -309,7 +309,7 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
       assert(logical.canonicalized.output.length == logicalNumOutput)
 
       val physical = df.queryExecution.executedPlan.collect {
-        case d: DataSourceV2ScanExec => d
+        case d: BatchScanExec => d
       }.head
       assert(physical.canonicalized.output.length == physicalNumOutput)
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 868b43c..659deb8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -495,7 +495,7 @@ class StreamSuite extends StreamTest {
       // `extended = false` only displays the physical plan.
       assert("StreamingDataSourceV2Relation".r
         .findAllMatchIn(explainWithoutExtended).size === 0)
-      assert("ScanV2".r
+      assert("BatchScan".r
         .findAllMatchIn(explainWithoutExtended).size === 1)
       // Use "StateStoreRestore" to verify that it does output a streaming physical plan
       assert(explainWithoutExtended.contains("StateStoreRestore"))
@@ -505,7 +505,7 @@ class StreamSuite extends StreamTest {
       // plan.
       assert("StreamingDataSourceV2Relation".r
         .findAllMatchIn(explainWithExtended).size === 3)
-      assert("ScanV2".r
+      assert("BatchScan".r
         .findAllMatchIn(explainWithExtended).size === 1)
       // Use "StateStoreRestore" to verify that it does output a streaming physical plan
       assert(explainWithExtended.contains("StateStoreRestore"))
@@ -548,17 +548,17 @@ class StreamSuite extends StreamTest {
       val explainWithoutExtended = q.explainInternal(false)
 
       // `extended = false` only displays the physical plan.
-      assert("Streaming RelationV2 ContinuousMemoryStream".r
+      assert("StreamingDataSourceV2Relation".r
         .findAllMatchIn(explainWithoutExtended).size === 0)
-      assert("ScanV2 ContinuousMemoryStream".r
+      assert("ContinuousScan".r
         .findAllMatchIn(explainWithoutExtended).size === 1)
 
       val explainWithExtended = q.explainInternal(true)
       // `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical
       // plan.
-      assert("Streaming RelationV2 ContinuousMemoryStream".r
+      assert("StreamingDataSourceV2Relation".r
         .findAllMatchIn(explainWithExtended).size === 3)
-      assert("ScanV2 ContinuousMemoryStream".r
+      assert("ContinuousScan".r
         .findAllMatchIn(explainWithExtended).size === 1)
     } finally {
       q.stop()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index b4bd6f6..da49683 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -39,12 +39,11 @@ import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, Ro
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.AllTuples
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.datasources.v2.{OldStreamingDataSourceV2Relation, StreamingDataSourceV2Relation}
+import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, EpochCoordinatorRef, IncrementAndGetEpoch}
 import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2
 import org.apache.spark.sql.execution.streaming.state.StateStore
-import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport
 import org.apache.spark.sql.streaming.StreamingQueryListener._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.{Clock, SystemClock, Utils}
@@ -692,16 +691,10 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
                   case r: StreamingExecutionRelation => r.source
                   // v2 source
                   case r: StreamingDataSourceV2Relation => r.stream
-                  case r: OldStreamingDataSourceV2Relation => r.readSupport
                   // We can add data to memory stream before starting it. Then the input plan has
                   // not been processed by the streaming engine and contains `StreamingRelationV2`.
                   case r: StreamingRelationV2 if r.sourceName == "memory" =>
-                    // TODO: remove this null hack after finish API refactor for continuous stream.
-                    if (r.table == null) {
-                      r.dataSource.asInstanceOf[ContinuousReadSupport]
-                    } else {
-                      r.table.asInstanceOf[MemoryStreamTable].stream
-                    }
+                    r.table.asInstanceOf[MemoryStreamTable].stream
                 }
                 .zipWithIndex
                 .find(_._1 == source)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 62fde98..dc22e31 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.sources.TestForeachWriter
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.reader.{InputPartition, ScanConfig}
+import org.apache.spark.sql.sources.v2.reader.InputPartition
 import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
 import org.apache.spark.sql.streaming.util.{BlockingSource, MockSourceProvider, StreamManualClock}
 import org.apache.spark.sql.types.StructType
@@ -911,7 +911,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
 
     testStream(df, useV2Sink = true)(
       StartStream(trigger = Trigger.Continuous(100)),
-      AssertOnQuery(_.logicalPlan.toJSON.contains("ContinuousExecutionRelation"))
+      AssertOnQuery(_.logicalPlan.toJSON.contains("StreamingDataSourceV2Relation"))
     )
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
index d6819ea..d3d210c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.execution.streaming.continuous._
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReader, ContinuousReadSupport, PartitionOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReader, ContinuousStream, PartitionOffset}
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
 import org.apache.spark.sql.streaming.StreamTest
 import org.apache.spark.sql.types.{DataType, IntegerType, StructType}
@@ -44,7 +44,7 @@ class ContinuousQueuedDataReaderSuite extends StreamTest with MockitoSugar {
     super.beforeEach()
     epochEndpoint = EpochCoordinatorRef.create(
       mock[StreamingWriteSupport],
-      mock[ContinuousReadSupport],
+      mock[ContinuousStream],
       mock[ContinuousExecution],
       coordinatorId,
       startEpoch,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index f85cae9..344a8aa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -41,7 +41,7 @@ class ContinuousSuiteBase extends StreamTest {
       case s: ContinuousExecution =>
         assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure query is initialized")
         val reader = s.lastExecution.executedPlan.collectFirst {
-          case ContinuousScanExec(_, _, _, _, r: RateStreamContinuousReadSupport, _) => r
+          case ContinuousScanExec(_, _, r: RateStreamContinuousStream, _) => r
         }.get
 
         val deltaMs = numTriggers * 1000 + 300
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
index e644c16..a0b56ec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark._
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.sql.LocalSparkSession
 import org.apache.spark.sql.execution.streaming.continuous._
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, PartitionOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
 import org.apache.spark.sql.test.TestSparkSession
@@ -45,7 +45,7 @@ class EpochCoordinatorSuite
   private var orderVerifier: InOrder = _
 
   override def beforeEach(): Unit = {
-    val reader = mock[ContinuousReadSupport]
+    val stream = mock[ContinuousStream]
     writeSupport = mock[StreamingWriteSupport]
     query = mock[ContinuousExecution]
     orderVerifier = inOrder(writeSupport, query)
@@ -53,7 +53,7 @@ class EpochCoordinatorSuite
     spark = new TestSparkSession()
 
     epochCoordinator
-      = EpochCoordinatorRef.create(writeSupport, reader, query, "test", 1, spark, SparkEnv.get)
+      = EpochCoordinatorRef.create(writeSupport, stream, query, "test", 1, spark, SparkEnv.get)
   }
 
   test("single epoch") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index d98cc41..62f1666 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -31,33 +31,23 @@ import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamTest, T
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
-class FakeDataStream extends MicroBatchStream {
+class FakeDataStream extends MicroBatchStream with ContinuousStream {
   override def deserializeOffset(json: String): Offset = RateStreamOffset(Map())
   override def commit(end: Offset): Unit = {}
   override def stop(): Unit = {}
   override def initialOffset(): Offset = RateStreamOffset(Map())
   override def latestOffset(): Offset = RateStreamOffset(Map())
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map())
   override def planInputPartitions(start: Offset, end: Offset): Array[InputPartition] = {
     throw new IllegalStateException("fake source - cannot actually read")
   }
-  override def createReaderFactory(): PartitionReaderFactory = {
+  override def planInputPartitions(start: Offset): Array[InputPartition] = {
     throw new IllegalStateException("fake source - cannot actually read")
   }
-}
-
-case class FakeReadSupport() extends ContinuousReadSupport {
-  override def deserializeOffset(json: String): Offset = RateStreamOffset(Map())
-  override def commit(end: Offset): Unit = {}
-  override def stop(): Unit = {}
-  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map())
-  override def fullSchema(): StructType = StructType(Seq())
-  override def initialOffset(): Offset = RateStreamOffset(Map())
-  override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = null
-  override def createContinuousReaderFactory(
-      config: ScanConfig): ContinuousPartitionReaderFactory = {
+  override def createReaderFactory(): PartitionReaderFactory = {
     throw new IllegalStateException("fake source - cannot actually read")
   }
-  override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
+  override def createContinuousReaderFactory(): ContinuousPartitionReaderFactory = {
     throw new IllegalStateException("fake source - cannot actually read")
   }
 }
@@ -66,21 +56,19 @@ class FakeScanBuilder extends ScanBuilder with Scan {
   override def build(): Scan = this
   override def readSchema(): StructType = StructType(Seq())
   override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = new FakeDataStream
+  override def toContinuousStream(checkpointLocation: String): ContinuousStream = new FakeDataStream
 }
 
-class FakeMicroBatchReadTable extends Table with SupportsMicroBatchRead {
+trait FakeMicroBatchReadTable extends Table with SupportsMicroBatchRead {
   override def name(): String = "fake"
   override def schema(): StructType = StructType(Seq())
   override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new FakeScanBuilder
 }
 
-trait FakeContinuousReadSupportProvider extends ContinuousReadSupportProvider {
-  override def createContinuousReadSupport(
-      checkpointLocation: String,
-      options: DataSourceOptions): ContinuousReadSupport = {
-    LastReadOptions.options = options
-    FakeReadSupport()
-  }
+trait FakeContinuousReadTable extends Table with SupportsContinuousRead {
+  override def name(): String = "fake"
+  override def schema(): StructType = StructType(Seq())
+  override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new FakeScanBuilder
 }
 
 trait FakeStreamingWriteSupportProvider extends StreamingWriteSupportProvider {
@@ -111,27 +99,34 @@ class FakeReadMicroBatchOnly
 class FakeReadContinuousOnly
     extends DataSourceRegister
     with TableProvider
-    with FakeContinuousReadSupportProvider
     with SessionConfigSupport {
   override def shortName(): String = "fake-read-continuous-only"
 
   override def keyPrefix: String = shortName()
 
-  override def getTable(options: DataSourceOptions): Table = new Table {
-    override def schema(): StructType = StructType(Seq())
-    override def name(): String = "fake"
+  override def getTable(options: DataSourceOptions): Table = {
+    LastReadOptions.options = options
+    new FakeContinuousReadTable {}
   }
 }
 
-class FakeReadBothModes extends DataSourceRegister
-    with TableProvider with FakeContinuousReadSupportProvider {
+class FakeReadBothModes extends DataSourceRegister with TableProvider {
   override def shortName(): String = "fake-read-microbatch-continuous"
 
-  override def getTable(options: DataSourceOptions): Table = new FakeMicroBatchReadTable {}
+  override def getTable(options: DataSourceOptions): Table = {
+    new Table with FakeMicroBatchReadTable with FakeContinuousReadTable {}
+  }
 }
 
-class FakeReadNeitherMode extends DataSourceRegister {
+class FakeReadNeitherMode extends DataSourceRegister with TableProvider {
   override def shortName(): String = "fake-read-neither-mode"
+
+  override def getTable(options: DataSourceOptions): Table = {
+    new Table {
+      override def name(): String = "fake"
+      override def schema(): StructType = StructType(Nil)
+    }
+  }
 }
 
 class FakeWriteSupportProvider
@@ -324,33 +319,25 @@ class StreamingDataSourceV2Suite extends StreamTest {
 
   for ((read, write, trigger) <- cases) {
     testQuietly(s"stream with read format $read, write format $write, trigger $trigger") {
-      val readSource = DataSource.lookupDataSource(read, spark.sqlContext.conf).
-        getConstructor().newInstance()
+      val table = DataSource.lookupDataSource(read, spark.sqlContext.conf).getConstructor()
+        .newInstance().asInstanceOf[TableProvider].getTable(DataSourceOptions.empty())
       val writeSource = DataSource.lookupDataSource(write, spark.sqlContext.conf).
         getConstructor().newInstance()
 
-      def isMicroBatch(ds: Any): Boolean = ds match {
-        case provider: TableProvider =>
-          val table = provider.getTable(DataSourceOptions.empty())
-          table.isInstanceOf[SupportsMicroBatchRead]
-        case _ => false
-      }
-
-      (readSource, writeSource, trigger) match {
+      (table, writeSource, trigger) match {
         // Valid microbatch queries.
-        case (_: TableProvider, _: StreamingWriteSupportProvider, t)
-          if isMicroBatch(readSource) && !t.isInstanceOf[ContinuousTrigger] =>
+        case (_: SupportsMicroBatchRead, _: StreamingWriteSupportProvider, t)
+            if !t.isInstanceOf[ContinuousTrigger] =>
           testPositiveCase(read, write, trigger)
 
         // Valid continuous queries.
-        case (_: ContinuousReadSupportProvider, _: StreamingWriteSupportProvider,
+        case (_: SupportsContinuousRead, _: StreamingWriteSupportProvider,
               _: ContinuousTrigger) =>
           testPositiveCase(read, write, trigger)
 
         // Invalid - can't read at all
-        case (r, _, _)
-            if !r.isInstanceOf[TableProvider]
-              && !r.isInstanceOf[ContinuousReadSupportProvider] =>
+        case (r, _, _) if !r.isInstanceOf[SupportsMicroBatchRead] &&
+            !r.isInstanceOf[SupportsContinuousRead] =>
           testNegativeCase(read, write, trigger,
             s"Data source $read does not support streamed reading")
 
@@ -361,14 +348,13 @@ class StreamingDataSourceV2Suite extends StreamTest {
 
         // Invalid - trigger is continuous but reader is not
         case (r, _: StreamingWriteSupportProvider, _: ContinuousTrigger)
-            if !r.isInstanceOf[ContinuousReadSupportProvider] =>
+            if !r.isInstanceOf[SupportsContinuousRead] =>
           testNegativeCase(read, write, trigger,
             s"Data source $read does not support continuous processing")
 
         // Invalid - trigger is microbatch but reader is not
-        case (r, _, t)
-           if !isMicroBatch(r) &&
-             !t.isInstanceOf[ContinuousTrigger] =>
+        case (r, _, t) if !r.isInstanceOf[SupportsMicroBatchRead] &&
+            !t.isInstanceOf[ContinuousTrigger] =>
           testPostCreationNegativeCase(read, write, trigger,
             s"Data source $read does not support microbatch processing")
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org