You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2018/01/08 21:23:14 UTC

spark git commit: [SPARK-22912] v2 data source support in MicroBatchExecution

Repository: spark
Updated Branches:
  refs/heads/master eed82a0b2 -> 4f7e75883


[SPARK-22912] v2 data source support in MicroBatchExecution

## What changes were proposed in this pull request?

Support for v2 data sources in microbatch streaming.

## How was this patch tested?

A very basic new unit test on the toy v2 implementation of rate source. Once we have a v1 source fully migrated to v2, we'll need to do more detailed compatibility testing.

Author: Jose Torres <jo...@databricks.com>

Closes #20097 from jose-torres/v2-impl.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f7e7588
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f7e7588
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f7e7588

Branch: refs/heads/master
Commit: 4f7e75883436069c2d9028c4cd5daa78e8d59560
Parents: eed82a0
Author: Jose Torres <jo...@databricks.com>
Authored: Mon Jan 8 13:24:08 2018 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Jan 8 13:24:08 2018 -0800

----------------------------------------------------------------------
 ....apache.spark.sql.sources.DataSourceRegister |   1 +
 .../datasources/v2/DataSourceV2Relation.scala   |  10 ++
 .../streaming/MicroBatchExecution.scala         | 112 +++++++++++++++----
 .../execution/streaming/ProgressReporter.scala  |   6 +-
 .../streaming/RateSourceProvider.scala          |  10 +-
 .../execution/streaming/StreamExecution.scala   |   4 +-
 .../execution/streaming/StreamingRelation.scala |   4 +-
 .../continuous/ContinuousExecution.scala        |   4 +-
 .../continuous/ContinuousRateStreamSource.scala |  17 +--
 .../streaming/sources/RateStreamSourceV2.scala  |  31 ++++-
 .../spark/sql/streaming/DataStreamReader.scala  |  25 ++++-
 .../sql/streaming/StreamingQueryManager.scala   |  24 ++--
 .../execution/streaming/RateSourceV2Suite.scala |  68 +++++++++--
 .../apache/spark/sql/streaming/StreamTest.scala |   2 +-
 .../streaming/continuous/ContinuousSuite.scala  |   2 +-
 15 files changed, 241 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 6cdfe2f..0259c77 100644
--- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -7,3 +7,4 @@ org.apache.spark.sql.execution.datasources.text.TextFileFormat
 org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
 org.apache.spark.sql.execution.streaming.TextSocketSourceProvider
 org.apache.spark.sql.execution.streaming.RateSourceProvider
+org.apache.spark.sql.execution.streaming.sources.RateSourceProviderV2

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 7eb99a6..cba20dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -35,6 +35,16 @@ case class DataSourceV2Relation(
   }
 }
 
+/**
+ * A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical
+ * to the non-streaming relation.
+ */
+class StreamingDataSourceV2Relation(
+    fullOutput: Seq[AttributeReference],
+    reader: DataSourceV2Reader) extends DataSourceV2Relation(fullOutput, reader) {
+  override def isStreaming: Boolean = true
+}
+
 object DataSourceV2Relation {
   def apply(reader: DataSourceV2Reader): DataSourceV2Relation = {
     new DataSourceV2Relation(reader.readSchema().toAttributes, reader)

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 9a7a13f..42240ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.util.Optional
+
+import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
 
 import org.apache.spark.sql.{Dataset, SparkSession}
@@ -24,7 +27,10 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
+import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2}
+import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.v2.streaming.{MicroBatchReadSupport, MicroBatchWriteSupport}
+import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset => OffsetV2}
 import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
 import org.apache.spark.util.{Clock, Utils}
 
@@ -33,10 +39,11 @@ class MicroBatchExecution(
     name: String,
     checkpointRoot: String,
     analyzedPlan: LogicalPlan,
-    sink: Sink,
+    sink: BaseStreamingSink,
     trigger: Trigger,
     triggerClock: Clock,
     outputMode: OutputMode,
+    extraOptions: Map[String, String],
     deleteCheckpointOnStop: Boolean)
   extends StreamExecution(
     sparkSession, name, checkpointRoot, analyzedPlan, sink,
@@ -57,6 +64,13 @@ class MicroBatchExecution(
     var nextSourceId = 0L
     val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]()
     val v2ToExecutionRelationMap = MutableMap[StreamingRelationV2, StreamingExecutionRelation]()
+    // We transform each distinct streaming relation into a StreamingExecutionRelation, keeping a
+    // map as we go to ensure each identical relation gets the same StreamingExecutionRelation
+    // object. For each microbatch, the StreamingExecutionRelation will be replaced with a logical
+    // plan for the data within that batch.
+    // Note that we have to use the previous `output` as attributes in StreamingExecutionRelation,
+    // since the existing logical plan has already used those attributes. The per-microbatch
+    // transformation is responsible for replacing attributes with their final values.
     val _logicalPlan = analyzedPlan.transform {
       case streamingRelation@StreamingRelation(dataSource, _, output) =>
         toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
@@ -64,19 +78,26 @@ class MicroBatchExecution(
           val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
           val source = dataSource.createSource(metadataPath)
           nextSourceId += 1
-          // We still need to use the previous `output` instead of `source.schema` as attributes in
-          // "df.logicalPlan" has already used attributes of the previous `output`.
           StreamingExecutionRelation(source, output)(sparkSession)
         })
-      case s @ StreamingRelationV2(v2DataSource, _, _, output, v1DataSource)
-          if !v2DataSource.isInstanceOf[MicroBatchReadSupport] =>
+      case s @ StreamingRelationV2(source: MicroBatchReadSupport, _, options, output, _) =>
+        v2ToExecutionRelationMap.getOrElseUpdate(s, {
+          // Materialize source to avoid creating it in every batch
+          val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
+          val reader = source.createMicroBatchReader(
+            Optional.empty(), // user specified schema
+            metadataPath,
+            new DataSourceV2Options(options.asJava))
+          nextSourceId += 1
+          StreamingExecutionRelation(reader, output)(sparkSession)
+        })
+      case s @ StreamingRelationV2(_, _, _, output, v1Relation) =>
         v2ToExecutionRelationMap.getOrElseUpdate(s, {
           // Materialize source to avoid creating it in every batch
           val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
-          val source = v1DataSource.createSource(metadataPath)
+          assert(v1Relation.isDefined, "v2 execution didn't match but v1 was unavailable")
+          val source = v1Relation.get.dataSource.createSource(metadataPath)
           nextSourceId += 1
-          // We still need to use the previous `output` instead of `source.schema` as attributes in
-          // "df.logicalPlan" has already used attributes of the previous `output`.
           StreamingExecutionRelation(source, output)(sparkSession)
         })
     }
@@ -192,7 +213,8 @@ class MicroBatchExecution(
                     source.getBatch(start, end)
                   }
                 case nonV1Tuple =>
-                  throw new IllegalStateException(s"Unexpected V2 source in $nonV1Tuple")
+                  // The V2 API does not have the same edge case requiring getBatch to be called
+                  // here, so we do nothing here.
               }
               currentBatchId = latestCommittedBatchId + 1
               committedOffsets ++= availableOffsets
@@ -236,14 +258,27 @@ class MicroBatchExecution(
     val hasNewData = {
       awaitProgressLock.lock()
       try {
-        val latestOffsets: Map[Source, Option[Offset]] = uniqueSources.map {
+        // Generate a map from each unique source to the next available offset.
+        val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map {
           case s: Source =>
             updateStatusMessage(s"Getting offsets from $s")
             reportTimeTaken("getOffset") {
               (s, s.getOffset)
             }
+          case s: MicroBatchReader =>
+            updateStatusMessage(s"Getting offsets from $s")
+            reportTimeTaken("getOffset") {
+            // Once v1 streaming source execution is gone, we can refactor this away.
+            // For now, we set the range here to get the source to infer the available end offset,
+            // get that offset, and then set the range again when we later execute.
+            s.setOffsetRange(
+              toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))),
+              Optional.empty())
+
+              (s, Some(s.getEndOffset))
+            }
         }.toMap
-        availableOffsets ++= latestOffsets.filter { case (s, o) => o.nonEmpty }.mapValues(_.get)
+        availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get)
 
         if (dataAvailable) {
           true
@@ -317,6 +352,8 @@ class MicroBatchExecution(
           if (prevBatchOff.isDefined) {
             prevBatchOff.get.toStreamProgress(sources).foreach {
               case (src: Source, off) => src.commit(off)
+              case (reader: MicroBatchReader, off) =>
+                reader.commit(reader.deserializeOffset(off.json))
             }
           } else {
             throw new IllegalStateException(s"batch $currentBatchId doesn't exist")
@@ -357,7 +394,16 @@ class MicroBatchExecution(
             s"DataFrame returned by getBatch from $source did not have isStreaming=true\n" +
               s"${batch.queryExecution.logical}")
           logDebug(s"Retrieving data from $source: $current -> $available")
-          Some(source -> batch)
+          Some(source -> batch.logicalPlan)
+        case (reader: MicroBatchReader, available)
+          if committedOffsets.get(reader).map(_ != available).getOrElse(true) =>
+          val current = committedOffsets.get(reader).map(off => reader.deserializeOffset(off.json))
+          reader.setOffsetRange(
+            toJava(current),
+            Optional.of(available.asInstanceOf[OffsetV2]))
+          logDebug(s"Retrieving data from $reader: $current -> $available")
+          Some(reader ->
+            new StreamingDataSourceV2Relation(reader.readSchema().toAttributes, reader))
         case _ => None
       }
     }
@@ -365,15 +411,14 @@ class MicroBatchExecution(
     // A list of attributes that will need to be updated.
     val replacements = new ArrayBuffer[(Attribute, Attribute)]
     // Replace sources in the logical plan with data that has arrived since the last batch.
-    val withNewSources = logicalPlan transform {
+    val newBatchesPlan = logicalPlan transform {
       case StreamingExecutionRelation(source, output) =>
-        newData.get(source).map { data =>
-          val newPlan = data.logicalPlan
-          assert(output.size == newPlan.output.size,
+        newData.get(source).map { dataPlan =>
+          assert(output.size == dataPlan.output.size,
             s"Invalid batch: ${Utils.truncatedString(output, ",")} != " +
-              s"${Utils.truncatedString(newPlan.output, ",")}")
-          replacements ++= output.zip(newPlan.output)
-          newPlan
+              s"${Utils.truncatedString(dataPlan.output, ",")}")
+          replacements ++= output.zip(dataPlan.output)
+          dataPlan
         }.getOrElse {
           LocalRelation(output, isStreaming = true)
         }
@@ -381,7 +426,7 @@ class MicroBatchExecution(
 
     // Rewire the plan to use the new attributes that were returned by the source.
     val replacementMap = AttributeMap(replacements)
-    val triggerLogicalPlan = withNewSources transformAllExpressions {
+    val newAttributePlan = newBatchesPlan transformAllExpressions {
       case a: Attribute if replacementMap.contains(a) =>
         replacementMap(a).withMetadata(a.metadata)
       case ct: CurrentTimestamp =>
@@ -392,6 +437,20 @@ class MicroBatchExecution(
           cd.dataType, cd.timeZoneId)
     }
 
+    val triggerLogicalPlan = sink match {
+      case _: Sink => newAttributePlan
+      case s: MicroBatchWriteSupport =>
+        val writer = s.createMicroBatchWriter(
+          s"$runId",
+          currentBatchId,
+          newAttributePlan.schema,
+          outputMode,
+          new DataSourceV2Options(extraOptions.asJava))
+        assert(writer.isPresent, "microbatch writer must always be present")
+        WriteToDataSourceV2(writer.get, newAttributePlan)
+      case _ => throw new IllegalArgumentException(s"unknown sink type for $sink")
+    }
+
     reportTimeTaken("queryPlanning") {
       lastExecution = new IncrementalExecution(
         sparkSessionToRunBatch,
@@ -409,7 +468,12 @@ class MicroBatchExecution(
 
     reportTimeTaken("addBatch") {
       SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) {
-        sink.addBatch(currentBatchId, nextBatch)
+        sink match {
+          case s: Sink => s.addBatch(currentBatchId, nextBatch)
+          case s: MicroBatchWriteSupport =>
+            // This doesn't accumulate any data - it just forces execution of the microbatch writer.
+            nextBatch.collect()
+        }
       }
     }
 
@@ -421,4 +485,8 @@ class MicroBatchExecution(
       awaitProgressLock.unlock()
     }
   }
+
+  private def toJava(scalaOption: Option[OffsetV2]): Optional[OffsetV2] = {
+    Optional.ofNullable(scalaOption.orNull)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 1c90436..d1e5be9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -53,7 +53,7 @@ trait ProgressReporter extends Logging {
   protected def triggerClock: Clock
   protected def logicalPlan: LogicalPlan
   protected def lastExecution: QueryExecution
-  protected def newData: Map[BaseStreamingSource, DataFrame]
+  protected def newData: Map[BaseStreamingSource, LogicalPlan]
   protected def availableOffsets: StreamProgress
   protected def committedOffsets: StreamProgress
   protected def sources: Seq[BaseStreamingSource]
@@ -225,8 +225,8 @@ trait ProgressReporter extends Logging {
     //
     // 3. For each source, we sum the metrics of the associated execution plan leaves.
     //
-    val logicalPlanLeafToSource = newData.flatMap { case (source, df) =>
-      df.logicalPlan.collectLeaves().map { leaf => leaf -> source }
+    val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) =>
+      logicalPlan.collectLeaves().map { leaf => leaf -> source }
     }
     val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming
     val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
index d02cf88..66eb016 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
@@ -29,12 +29,12 @@ import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
-import org.apache.spark.sql.execution.streaming.continuous.ContinuousRateStreamReader
-import org.apache.spark.sql.execution.streaming.sources.RateStreamV2Reader
+import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader
+import org.apache.spark.sql.execution.streaming.sources.RateStreamMicroBatchReader
 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
 import org.apache.spark.sql.sources.v2._
-import org.apache.spark.sql.sources.v2.streaming.ContinuousReadSupport
-import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader
+import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, MicroBatchReader}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.{ManualClock, SystemClock}
 
@@ -112,7 +112,7 @@ class RateSourceProvider extends StreamSourceProvider with DataSourceRegister
       schema: Optional[StructType],
       checkpointLocation: String,
       options: DataSourceV2Options): ContinuousReader = {
-    new ContinuousRateStreamReader(options)
+    new RateStreamContinuousReader(options)
   }
 
   override def shortName(): String = "rate"

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 3e76bf7..24a8b00 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -163,7 +163,7 @@ abstract class StreamExecution(
   var lastExecution: IncrementalExecution = _
 
   /** Holds the most recent input data for each source. */
-  protected var newData: Map[BaseStreamingSource, DataFrame] = _
+  protected var newData: Map[BaseStreamingSource, LogicalPlan] = _
 
   @volatile
   protected var streamDeathCause: StreamingQueryException = null
@@ -418,7 +418,7 @@ abstract class StreamExecution(
    * Blocks the current thread until processing for data from the given `source` has reached at
    * least the given `Offset`. This method is intended for use primarily when writing tests.
    */
-  private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = {
+  private[sql] def awaitOffset(source: BaseStreamingSource, newOffset: Offset): Unit = {
     assertAwaitThread()
     def notDone = {
       val localCommittedOffsets = committedOffsets

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index a9d50e3..a0ee683 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -61,7 +61,7 @@ case class StreamingRelation(dataSource: DataSource, sourceName: String, output:
  * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]].
  */
 case class StreamingExecutionRelation(
-    source: Source,
+    source: BaseStreamingSource,
     output: Seq[Attribute])(session: SparkSession)
   extends LeafNode {
 
@@ -92,7 +92,7 @@ case class StreamingRelationV2(
     sourceName: String,
     extraOptions: Map[String, String],
     output: Seq[Attribute],
-    v1DataSource: DataSource)(session: SparkSession)
+    v1Relation: Option[StreamingRelation])(session: SparkSession)
   extends LeafNode {
   override def isStreaming: Boolean = true
   override def toString: String = sourceName

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 2843ab1..9657b5e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, StreamingDataSourceV2Relation, WriteToDataSourceV2}
 import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
 import org.apache.spark.sql.sources.v2.DataSourceV2Options
 import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, ContinuousWriteSupport}
@@ -174,7 +174,7 @@ class ContinuousExecution(
         val loggedOffset = offsets.offsets(0)
         val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json))
         reader.setOffset(java.util.Optional.ofNullable(realOffset.orNull))
-        DataSourceV2Relation(newOutput, reader)
+        new StreamingDataSourceV2Relation(newOutput, reader)
     }
 
     // Rewire the plan to use the new attributes that were returned by the source.

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
index c9aa78a..b4b21e7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
@@ -32,10 +32,10 @@ import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
 import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType}
 
-case class ContinuousRateStreamPartitionOffset(
+case class RateStreamPartitionOffset(
    partition: Int, currentValue: Long, currentTimeMs: Long) extends PartitionOffset
 
-class ContinuousRateStreamReader(options: DataSourceV2Options)
+class RateStreamContinuousReader(options: DataSourceV2Options)
   extends ContinuousReader {
   implicit val defaultFormats: DefaultFormats = DefaultFormats
 
@@ -48,7 +48,7 @@ class ContinuousRateStreamReader(options: DataSourceV2Options)
   override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
     assert(offsets.length == numPartitions)
     val tuples = offsets.map {
-      case ContinuousRateStreamPartitionOffset(i, currVal, nextRead) =>
+      case RateStreamPartitionOffset(i, currVal, nextRead) =>
         (i, ValueRunTimeMsPair(currVal, nextRead))
     }
     RateStreamOffset(Map(tuples: _*))
@@ -86,7 +86,7 @@ class ContinuousRateStreamReader(options: DataSourceV2Options)
       val start = partitionStartMap(i)
       // Have each partition advance by numPartitions each row, with starting points staggered
       // by their partition index.
-      RateStreamReadTask(
+      RateStreamContinuousReadTask(
         start.value,
         start.runTimeMs,
         i,
@@ -101,7 +101,7 @@ class ContinuousRateStreamReader(options: DataSourceV2Options)
 
 }
 
-case class RateStreamReadTask(
+case class RateStreamContinuousReadTask(
     startValue: Long,
     startTimeMs: Long,
     partitionIndex: Int,
@@ -109,10 +109,11 @@ case class RateStreamReadTask(
     rowsPerSecond: Double)
   extends ReadTask[Row] {
   override def createDataReader(): DataReader[Row] =
-    new RateStreamDataReader(startValue, startTimeMs, partitionIndex, increment, rowsPerSecond)
+    new RateStreamContinuousDataReader(
+      startValue, startTimeMs, partitionIndex, increment, rowsPerSecond)
 }
 
-class RateStreamDataReader(
+class RateStreamContinuousDataReader(
     startValue: Long,
     startTimeMs: Long,
     partitionIndex: Int,
@@ -151,5 +152,5 @@ class RateStreamDataReader(
   override def close(): Unit = {}
 
   override def getOffset(): PartitionOffset =
-    ContinuousRateStreamPartitionOffset(partitionIndex, currentValue, nextReadTime)
+    RateStreamPartitionOffset(partitionIndex, currentValue, nextReadTime)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
index 97bada08..c0ed12c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
@@ -28,17 +28,38 @@ import org.json4s.jackson.Serialization
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.streaming.{RateStreamOffset, ValueRunTimeMsPair}
-import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
 import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
 import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset}
 import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType}
-import org.apache.spark.util.SystemClock
+import org.apache.spark.util.{ManualClock, SystemClock}
 
-class RateStreamV2Reader(options: DataSourceV2Options)
+/**
+ * This is a temporary register as we build out v2 migration. Microbatch read support should
+ * be implemented in the same register as v1.
+ */
+class RateSourceProviderV2 extends DataSourceV2 with MicroBatchReadSupport with DataSourceRegister {
+  override def createMicroBatchReader(
+      schema: Optional[StructType],
+      checkpointLocation: String,
+      options: DataSourceV2Options): MicroBatchReader = {
+    new RateStreamMicroBatchReader(options)
+  }
+
+  override def shortName(): String = "ratev2"
+}
+
+class RateStreamMicroBatchReader(options: DataSourceV2Options)
   extends MicroBatchReader {
   implicit val defaultFormats: DefaultFormats = DefaultFormats
 
-  val clock = new SystemClock
+  val clock = {
+    // The option to use a manual clock is provided only for unit testing purposes.
+    if (options.get("useManualClock").orElse("false").toBoolean) new ManualClock
+    else new SystemClock
+  }
 
   private val numPartitions =
     options.get(RateStreamSourceV2.NUM_PARTITIONS).orElse("5").toInt
@@ -111,7 +132,7 @@ class RateStreamV2Reader(options: DataSourceV2Options)
 
       val packedRows = mutable.ListBuffer[(Long, Long)]()
       var outVal = startVal + numPartitions
-      var outTimeMs = startTimeMs + msPerPartitionBetweenRows
+      var outTimeMs = startTimeMs
       while (outVal <= endVal) {
         packedRows.append((outTimeMs, outVal))
         outVal += numPartitions

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 2e92bee..52f2e26 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.streaming
 
-import java.util.Locale
+import java.util.{Locale, Optional}
 
 import scala.collection.JavaConverters._
 
@@ -27,8 +27,9 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2}
+import org.apache.spark.sql.sources.StreamSourceProvider
 import org.apache.spark.sql.sources.v2.DataSourceV2Options
-import org.apache.spark.sql.sources.v2.streaming.ContinuousReadSupport
+import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, MicroBatchReadSupport}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
@@ -166,19 +167,31 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
       userSpecifiedSchema = userSpecifiedSchema,
       className = source,
       options = extraOptions.toMap)
+    val v1Relation = ds match {
+      case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource))
+      case _ => None
+    }
     ds match {
+      case s: MicroBatchReadSupport =>
+        val tempReader = s.createMicroBatchReader(
+          Optional.ofNullable(userSpecifiedSchema.orNull),
+          Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
+          options)
+        Dataset.ofRows(
+          sparkSession,
+          StreamingRelationV2(
+            s, source, extraOptions.toMap,
+            tempReader.readSchema().toAttributes, v1Relation)(sparkSession))
       case s: ContinuousReadSupport =>
         val tempReader = s.createContinuousReader(
-          java.util.Optional.ofNullable(userSpecifiedSchema.orNull),
+          Optional.ofNullable(userSpecifiedSchema.orNull),
           Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
           options)
-        // Generate the V1 node to catch errors thrown within generation.
-        StreamingRelation(v1DataSource)
         Dataset.ofRows(
           sparkSession,
           StreamingRelationV2(
             s, source, extraOptions.toMap,
-            tempReader.readSchema().toAttributes, v1DataSource)(sparkSession))
+            tempReader.readSchema().toAttributes, v1Relation)(sparkSession))
       case _ =>
         // Code path for data source v1.
         Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource))

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index b508f44..4b27e0d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -29,10 +29,10 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
 import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, ContinuousTrigger}
 import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.streaming.ContinuousWriteSupport
+import org.apache.spark.sql.sources.v2.streaming.{ContinuousWriteSupport, MicroBatchWriteSupport}
 import org.apache.spark.util.{Clock, SystemClock, Utils}
 
 /**
@@ -240,31 +240,35 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
           "is not supported in streaming DataFrames/Datasets and will be disabled.")
     }
 
-    sink match {
-      case v1Sink: Sink =>
-        new StreamingQueryWrapper(new MicroBatchExecution(
+    (sink, trigger) match {
+      case (v2Sink: ContinuousWriteSupport, trigger: ContinuousTrigger) =>
+        UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode)
+        new StreamingQueryWrapper(new ContinuousExecution(
           sparkSession,
           userSpecifiedName.orNull,
           checkpointLocation,
           analyzedPlan,
-          v1Sink,
+          v2Sink,
           trigger,
           triggerClock,
           outputMode,
+          extraOptions,
           deleteCheckpointOnStop))
-      case v2Sink: ContinuousWriteSupport =>
-        UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode)
-        new StreamingQueryWrapper(new ContinuousExecution(
+      case (_: MicroBatchWriteSupport, _) | (_: Sink, _) =>
+        new StreamingQueryWrapper(new MicroBatchExecution(
           sparkSession,
           userSpecifiedName.orNull,
           checkpointLocation,
           analyzedPlan,
-          v2Sink,
+          sink,
           trigger,
           triggerClock,
           outputMode,
           extraOptions,
           deleteCheckpointOnStop))
+      case (_: ContinuousWriteSupport, t) if !t.isInstanceOf[ContinuousTrigger] =>
+        throw new AnalysisException(
+          "Sink only supports continuous writes, but a continuous trigger was not specified.")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
index e11705a..85085d4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
@@ -18,20 +18,64 @@
 package org.apache.spark.sql.execution.streaming
 
 import java.util.Optional
+import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming.continuous._
-import org.apache.spark.sql.execution.streaming.sources.{RateStreamBatchTask, RateStreamSourceV2, RateStreamV2Reader}
+import org.apache.spark.sql.execution.streaming.sources.{RateStreamBatchTask, RateStreamMicroBatchReader, RateStreamSourceV2}
 import org.apache.spark.sql.sources.v2.DataSourceV2Options
-import org.apache.spark.sql.sources.v2.streaming.ContinuousReadSupport
+import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, MicroBatchReadSupport}
 import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.util.ManualClock
 
 class RateSourceV2Suite extends StreamTest {
+  import testImplicits._
+
+  case class AdvanceRateManualClock(seconds: Long) extends AddData {
+    override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = {
+      assert(query.nonEmpty)
+      val rateSource = query.get.logicalPlan.collect {
+        case StreamingExecutionRelation(source: RateStreamMicroBatchReader, _) => source
+      }.head
+      rateSource.clock.asInstanceOf[ManualClock].advance(TimeUnit.SECONDS.toMillis(seconds))
+      rateSource.setOffsetRange(Optional.empty(), Optional.empty())
+      (rateSource, rateSource.getEndOffset())
+    }
+  }
+
+  test("microbatch in registry") {
+    DataSource.lookupDataSource("ratev2", spark.sqlContext.conf).newInstance() match {
+      case ds: MicroBatchReadSupport =>
+        val reader = ds.createMicroBatchReader(Optional.empty(), "", DataSourceV2Options.empty())
+        assert(reader.isInstanceOf[RateStreamMicroBatchReader])
+      case _ =>
+        throw new IllegalStateException("Could not find v2 read support for rate")
+    }
+  }
+
+  test("basic microbatch execution") {
+    val input = spark.readStream
+      .format("rateV2")
+      .option("numPartitions", "1")
+      .option("rowsPerSecond", "10")
+      .option("useManualClock", "true")
+      .load()
+    testStream(input, useV2Sink = true)(
+      AdvanceRateManualClock(seconds = 1),
+      CheckLastBatch((0 until 10).map(v => new java.sql.Timestamp(v * 100L) -> v): _*),
+      StopStream,
+      StartStream(),
+      // Advance 2 seconds because creating a new RateSource will also create a new ManualClock
+      AdvanceRateManualClock(seconds = 2),
+      CheckLastBatch((10 until 20).map(v => new java.sql.Timestamp(v * 100L) -> v): _*)
+    )
+  }
+
   test("microbatch - numPartitions propagated") {
-    val reader = new RateStreamV2Reader(
+    val reader = new RateStreamMicroBatchReader(
       new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" -> "33").asJava))
     reader.setOffsetRange(Optional.empty(), Optional.empty())
     val tasks = reader.createReadTasks()
@@ -39,7 +83,7 @@ class RateSourceV2Suite extends StreamTest {
   }
 
   test("microbatch - set offset") {
-    val reader = new RateStreamV2Reader(DataSourceV2Options.empty())
+    val reader = new RateStreamMicroBatchReader(DataSourceV2Options.empty())
     val startOffset = RateStreamOffset(Map((0, ValueRunTimeMsPair(0, 1000))))
     val endOffset = RateStreamOffset(Map((0, ValueRunTimeMsPair(0, 2000))))
     reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
@@ -48,7 +92,7 @@ class RateSourceV2Suite extends StreamTest {
   }
 
   test("microbatch - infer offsets") {
-    val reader = new RateStreamV2Reader(
+    val reader = new RateStreamMicroBatchReader(
       new DataSourceV2Options(Map("numPartitions" -> "1", "rowsPerSecond" -> "100").asJava))
     reader.clock.waitTillTime(reader.clock.getTimeMillis() + 100)
     reader.setOffsetRange(Optional.empty(), Optional.empty())
@@ -69,7 +113,7 @@ class RateSourceV2Suite extends StreamTest {
   }
 
   test("microbatch - predetermined batch size") {
-    val reader = new RateStreamV2Reader(
+    val reader = new RateStreamMicroBatchReader(
       new DataSourceV2Options(Map("numPartitions" -> "1", "rowsPerSecond" -> "20").asJava))
     val startOffset = RateStreamOffset(Map((0, ValueRunTimeMsPair(0, 1000))))
     val endOffset = RateStreamOffset(Map((0, ValueRunTimeMsPair(20, 2000))))
@@ -80,7 +124,7 @@ class RateSourceV2Suite extends StreamTest {
   }
 
   test("microbatch - data read") {
-    val reader = new RateStreamV2Reader(
+    val reader = new RateStreamMicroBatchReader(
       new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" -> "33").asJava))
     val startOffset = RateStreamSourceV2.createInitialOffset(11, reader.creationTimeMs)
     val endOffset = RateStreamOffset(startOffset.partitionToValueAndRunTimeMs.toSeq.map {
@@ -107,14 +151,14 @@ class RateSourceV2Suite extends StreamTest {
     DataSource.lookupDataSource("rate", spark.sqlContext.conf).newInstance() match {
       case ds: ContinuousReadSupport =>
         val reader = ds.createContinuousReader(Optional.empty(), "", DataSourceV2Options.empty())
-        assert(reader.isInstanceOf[ContinuousRateStreamReader])
+        assert(reader.isInstanceOf[RateStreamContinuousReader])
       case _ =>
         throw new IllegalStateException("Could not find v2 read support for rate")
     }
   }
 
   test("continuous data") {
-    val reader = new ContinuousRateStreamReader(
+    val reader = new RateStreamContinuousReader(
       new DataSourceV2Options(Map("numPartitions" -> "2", "rowsPerSecond" -> "20").asJava))
     reader.setOffset(Optional.empty())
     val tasks = reader.createReadTasks()
@@ -122,17 +166,17 @@ class RateSourceV2Suite extends StreamTest {
 
     val data = scala.collection.mutable.ListBuffer[Row]()
     tasks.asScala.foreach {
-      case t: RateStreamReadTask =>
+      case t: RateStreamContinuousReadTask =>
         val startTimeMs = reader.getStartOffset()
           .asInstanceOf[RateStreamOffset]
           .partitionToValueAndRunTimeMs(t.partitionIndex)
           .runTimeMs
-        val r = t.createDataReader().asInstanceOf[RateStreamDataReader]
+        val r = t.createDataReader().asInstanceOf[RateStreamContinuousDataReader]
         for (rowIndex <- 0 to 9) {
           r.next()
           data.append(r.get())
           assert(r.getOffset() ==
-            ContinuousRateStreamPartitionOffset(
+            RateStreamPartitionOffset(
               t.partitionIndex,
               t.partitionIndex + rowIndex * 2,
               startTimeMs + (rowIndex + 1) * 100))

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 4b7f0fb..d46461f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -105,7 +105,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
      * the active query, and then return the source object the data was added, as well as the
      * offset of added data.
      */
-    def addData(query: Option[StreamExecution]): (Source, Offset)
+    def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset)
   }
 
   /** A trait that can be extended when testing a source. */

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index eda0d8a..9562c10 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -61,7 +61,7 @@ class ContinuousSuiteBase extends StreamTest {
       case s: ContinuousExecution =>
         assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure query is initialized")
         val reader = s.lastExecution.executedPlan.collectFirst {
-          case DataSourceV2ScanExec(_, r: ContinuousRateStreamReader) => r
+          case DataSourceV2ScanExec(_, r: RateStreamContinuousReader) => r
         }.get
 
         val deltaMs = numTriggers * 1000 + 300


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org