You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2018/01/08 21:23:14 UTC
spark git commit: [SPARK-22912] v2 data source support in
MicroBatchExecution
Repository: spark
Updated Branches:
refs/heads/master eed82a0b2 -> 4f7e75883
[SPARK-22912] v2 data source support in MicroBatchExecution
## What changes were proposed in this pull request?
Support for v2 data sources in microbatch streaming.
## How was this patch tested?
A very basic new unit test on the toy v2 implementation of rate source. Once we have a v1 source fully migrated to v2, we'll need to do more detailed compatibility testing.
Author: Jose Torres <jo...@databricks.com>
Closes #20097 from jose-torres/v2-impl.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f7e7588
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f7e7588
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f7e7588
Branch: refs/heads/master
Commit: 4f7e75883436069c2d9028c4cd5daa78e8d59560
Parents: eed82a0
Author: Jose Torres <jo...@databricks.com>
Authored: Mon Jan 8 13:24:08 2018 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Jan 8 13:24:08 2018 -0800
----------------------------------------------------------------------
....apache.spark.sql.sources.DataSourceRegister | 1 +
.../datasources/v2/DataSourceV2Relation.scala | 10 ++
.../streaming/MicroBatchExecution.scala | 112 +++++++++++++++----
.../execution/streaming/ProgressReporter.scala | 6 +-
.../streaming/RateSourceProvider.scala | 10 +-
.../execution/streaming/StreamExecution.scala | 4 +-
.../execution/streaming/StreamingRelation.scala | 4 +-
.../continuous/ContinuousExecution.scala | 4 +-
.../continuous/ContinuousRateStreamSource.scala | 17 +--
.../streaming/sources/RateStreamSourceV2.scala | 31 ++++-
.../spark/sql/streaming/DataStreamReader.scala | 25 ++++-
.../sql/streaming/StreamingQueryManager.scala | 24 ++--
.../execution/streaming/RateSourceV2Suite.scala | 68 +++++++++--
.../apache/spark/sql/streaming/StreamTest.scala | 2 +-
.../streaming/continuous/ContinuousSuite.scala | 2 +-
15 files changed, 241 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 6cdfe2f..0259c77 100644
--- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -7,3 +7,4 @@ org.apache.spark.sql.execution.datasources.text.TextFileFormat
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
org.apache.spark.sql.execution.streaming.TextSocketSourceProvider
org.apache.spark.sql.execution.streaming.RateSourceProvider
+org.apache.spark.sql.execution.streaming.sources.RateSourceProviderV2
http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 7eb99a6..cba20dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -35,6 +35,16 @@ case class DataSourceV2Relation(
}
}
+/**
+ * A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical
+ * to the non-streaming relation.
+ */
+class StreamingDataSourceV2Relation(
+ fullOutput: Seq[AttributeReference],
+ reader: DataSourceV2Reader) extends DataSourceV2Relation(fullOutput, reader) {
+ override def isStreaming: Boolean = true
+}
+
object DataSourceV2Relation {
def apply(reader: DataSourceV2Reader): DataSourceV2Relation = {
new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 9a7a13f..42240ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.execution.streaming
+import java.util.Optional
+
+import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
import org.apache.spark.sql.{Dataset, SparkSession}
@@ -24,7 +27,10 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
+import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2}
+import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.v2.streaming.{MicroBatchReadSupport, MicroBatchWriteSupport}
+import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset => OffsetV2}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.{Clock, Utils}
@@ -33,10 +39,11 @@ class MicroBatchExecution(
name: String,
checkpointRoot: String,
analyzedPlan: LogicalPlan,
- sink: Sink,
+ sink: BaseStreamingSink,
trigger: Trigger,
triggerClock: Clock,
outputMode: OutputMode,
+ extraOptions: Map[String, String],
deleteCheckpointOnStop: Boolean)
extends StreamExecution(
sparkSession, name, checkpointRoot, analyzedPlan, sink,
@@ -57,6 +64,13 @@ class MicroBatchExecution(
var nextSourceId = 0L
val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]()
val v2ToExecutionRelationMap = MutableMap[StreamingRelationV2, StreamingExecutionRelation]()
+ // We transform each distinct streaming relation into a StreamingExecutionRelation, keeping a
+ // map as we go to ensure each identical relation gets the same StreamingExecutionRelation
+ // object. For each microbatch, the StreamingExecutionRelation will be replaced with a logical
+ // plan for the data within that batch.
+ // Note that we have to use the previous `output` as attributes in StreamingExecutionRelation,
+ // since the existing logical plan has already used those attributes. The per-microbatch
+ // transformation is responsible for replacing attributes with their final values.
val _logicalPlan = analyzedPlan.transform {
case streamingRelation@StreamingRelation(dataSource, _, output) =>
toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
@@ -64,19 +78,26 @@ class MicroBatchExecution(
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
val source = dataSource.createSource(metadataPath)
nextSourceId += 1
- // We still need to use the previous `output` instead of `source.schema` as attributes in
- // "df.logicalPlan" has already used attributes of the previous `output`.
StreamingExecutionRelation(source, output)(sparkSession)
})
- case s @ StreamingRelationV2(v2DataSource, _, _, output, v1DataSource)
- if !v2DataSource.isInstanceOf[MicroBatchReadSupport] =>
+ case s @ StreamingRelationV2(source: MicroBatchReadSupport, _, options, output, _) =>
+ v2ToExecutionRelationMap.getOrElseUpdate(s, {
+ // Materialize source to avoid creating it in every batch
+ val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
+ val reader = source.createMicroBatchReader(
+ Optional.empty(), // user specified schema
+ metadataPath,
+ new DataSourceV2Options(options.asJava))
+ nextSourceId += 1
+ StreamingExecutionRelation(reader, output)(sparkSession)
+ })
+ case s @ StreamingRelationV2(_, _, _, output, v1Relation) =>
v2ToExecutionRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
- val source = v1DataSource.createSource(metadataPath)
+ assert(v1Relation.isDefined, "v2 execution didn't match but v1 was unavailable")
+ val source = v1Relation.get.dataSource.createSource(metadataPath)
nextSourceId += 1
- // We still need to use the previous `output` instead of `source.schema` as attributes in
- // "df.logicalPlan" has already used attributes of the previous `output`.
StreamingExecutionRelation(source, output)(sparkSession)
})
}
@@ -192,7 +213,8 @@ class MicroBatchExecution(
source.getBatch(start, end)
}
case nonV1Tuple =>
- throw new IllegalStateException(s"Unexpected V2 source in $nonV1Tuple")
+ // The V2 API does not have the same edge case requiring getBatch to be called
+ // here, so we do nothing here.
}
currentBatchId = latestCommittedBatchId + 1
committedOffsets ++= availableOffsets
@@ -236,14 +258,27 @@ class MicroBatchExecution(
val hasNewData = {
awaitProgressLock.lock()
try {
- val latestOffsets: Map[Source, Option[Offset]] = uniqueSources.map {
+ // Generate a map from each unique source to the next available offset.
+ val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map {
case s: Source =>
updateStatusMessage(s"Getting offsets from $s")
reportTimeTaken("getOffset") {
(s, s.getOffset)
}
+ case s: MicroBatchReader =>
+ updateStatusMessage(s"Getting offsets from $s")
+ reportTimeTaken("getOffset") {
+ // Once v1 streaming source execution is gone, we can refactor this away.
+ // For now, we set the range here to get the source to infer the available end offset,
+ // get that offset, and then set the range again when we later execute.
+ s.setOffsetRange(
+ toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))),
+ Optional.empty())
+
+ (s, Some(s.getEndOffset))
+ }
}.toMap
- availableOffsets ++= latestOffsets.filter { case (s, o) => o.nonEmpty }.mapValues(_.get)
+ availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get)
if (dataAvailable) {
true
@@ -317,6 +352,8 @@ class MicroBatchExecution(
if (prevBatchOff.isDefined) {
prevBatchOff.get.toStreamProgress(sources).foreach {
case (src: Source, off) => src.commit(off)
+ case (reader: MicroBatchReader, off) =>
+ reader.commit(reader.deserializeOffset(off.json))
}
} else {
throw new IllegalStateException(s"batch $currentBatchId doesn't exist")
@@ -357,7 +394,16 @@ class MicroBatchExecution(
s"DataFrame returned by getBatch from $source did not have isStreaming=true\n" +
s"${batch.queryExecution.logical}")
logDebug(s"Retrieving data from $source: $current -> $available")
- Some(source -> batch)
+ Some(source -> batch.logicalPlan)
+ case (reader: MicroBatchReader, available)
+ if committedOffsets.get(reader).map(_ != available).getOrElse(true) =>
+ val current = committedOffsets.get(reader).map(off => reader.deserializeOffset(off.json))
+ reader.setOffsetRange(
+ toJava(current),
+ Optional.of(available.asInstanceOf[OffsetV2]))
+ logDebug(s"Retrieving data from $reader: $current -> $available")
+ Some(reader ->
+ new StreamingDataSourceV2Relation(reader.readSchema().toAttributes, reader))
case _ => None
}
}
@@ -365,15 +411,14 @@ class MicroBatchExecution(
// A list of attributes that will need to be updated.
val replacements = new ArrayBuffer[(Attribute, Attribute)]
// Replace sources in the logical plan with data that has arrived since the last batch.
- val withNewSources = logicalPlan transform {
+ val newBatchesPlan = logicalPlan transform {
case StreamingExecutionRelation(source, output) =>
- newData.get(source).map { data =>
- val newPlan = data.logicalPlan
- assert(output.size == newPlan.output.size,
+ newData.get(source).map { dataPlan =>
+ assert(output.size == dataPlan.output.size,
s"Invalid batch: ${Utils.truncatedString(output, ",")} != " +
- s"${Utils.truncatedString(newPlan.output, ",")}")
- replacements ++= output.zip(newPlan.output)
- newPlan
+ s"${Utils.truncatedString(dataPlan.output, ",")}")
+ replacements ++= output.zip(dataPlan.output)
+ dataPlan
}.getOrElse {
LocalRelation(output, isStreaming = true)
}
@@ -381,7 +426,7 @@ class MicroBatchExecution(
// Rewire the plan to use the new attributes that were returned by the source.
val replacementMap = AttributeMap(replacements)
- val triggerLogicalPlan = withNewSources transformAllExpressions {
+ val newAttributePlan = newBatchesPlan transformAllExpressions {
case a: Attribute if replacementMap.contains(a) =>
replacementMap(a).withMetadata(a.metadata)
case ct: CurrentTimestamp =>
@@ -392,6 +437,20 @@ class MicroBatchExecution(
cd.dataType, cd.timeZoneId)
}
+ val triggerLogicalPlan = sink match {
+ case _: Sink => newAttributePlan
+ case s: MicroBatchWriteSupport =>
+ val writer = s.createMicroBatchWriter(
+ s"$runId",
+ currentBatchId,
+ newAttributePlan.schema,
+ outputMode,
+ new DataSourceV2Options(extraOptions.asJava))
+ assert(writer.isPresent, "microbatch writer must always be present")
+ WriteToDataSourceV2(writer.get, newAttributePlan)
+ case _ => throw new IllegalArgumentException(s"unknown sink type for $sink")
+ }
+
reportTimeTaken("queryPlanning") {
lastExecution = new IncrementalExecution(
sparkSessionToRunBatch,
@@ -409,7 +468,12 @@ class MicroBatchExecution(
reportTimeTaken("addBatch") {
SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) {
- sink.addBatch(currentBatchId, nextBatch)
+ sink match {
+ case s: Sink => s.addBatch(currentBatchId, nextBatch)
+ case s: MicroBatchWriteSupport =>
+ // This doesn't accumulate any data - it just forces execution of the microbatch writer.
+ nextBatch.collect()
+ }
}
}
@@ -421,4 +485,8 @@ class MicroBatchExecution(
awaitProgressLock.unlock()
}
}
+
+ private def toJava(scalaOption: Option[OffsetV2]): Optional[OffsetV2] = {
+ Optional.ofNullable(scalaOption.orNull)
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 1c90436..d1e5be9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -53,7 +53,7 @@ trait ProgressReporter extends Logging {
protected def triggerClock: Clock
protected def logicalPlan: LogicalPlan
protected def lastExecution: QueryExecution
- protected def newData: Map[BaseStreamingSource, DataFrame]
+ protected def newData: Map[BaseStreamingSource, LogicalPlan]
protected def availableOffsets: StreamProgress
protected def committedOffsets: StreamProgress
protected def sources: Seq[BaseStreamingSource]
@@ -225,8 +225,8 @@ trait ProgressReporter extends Logging {
//
// 3. For each source, we sum the metrics of the associated execution plan leaves.
//
- val logicalPlanLeafToSource = newData.flatMap { case (source, df) =>
- df.logicalPlan.collectLeaves().map { leaf => leaf -> source }
+ val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) =>
+ logicalPlan.collectLeaves().map { leaf => leaf -> source }
}
val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming
val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
index d02cf88..66eb016 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
@@ -29,12 +29,12 @@ import org.apache.spark.network.util.JavaUtils
import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
-import org.apache.spark.sql.execution.streaming.continuous.ContinuousRateStreamReader
-import org.apache.spark.sql.execution.streaming.sources.RateStreamV2Reader
+import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader
+import org.apache.spark.sql.execution.streaming.sources.RateStreamMicroBatchReader
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
import org.apache.spark.sql.sources.v2._
-import org.apache.spark.sql.sources.v2.streaming.ContinuousReadSupport
-import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader
+import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, MicroBatchReader}
import org.apache.spark.sql.types._
import org.apache.spark.util.{ManualClock, SystemClock}
@@ -112,7 +112,7 @@ class RateSourceProvider extends StreamSourceProvider with DataSourceRegister
schema: Optional[StructType],
checkpointLocation: String,
options: DataSourceV2Options): ContinuousReader = {
- new ContinuousRateStreamReader(options)
+ new RateStreamContinuousReader(options)
}
override def shortName(): String = "rate"
http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 3e76bf7..24a8b00 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -163,7 +163,7 @@ abstract class StreamExecution(
var lastExecution: IncrementalExecution = _
/** Holds the most recent input data for each source. */
- protected var newData: Map[BaseStreamingSource, DataFrame] = _
+ protected var newData: Map[BaseStreamingSource, LogicalPlan] = _
@volatile
protected var streamDeathCause: StreamingQueryException = null
@@ -418,7 +418,7 @@ abstract class StreamExecution(
* Blocks the current thread until processing for data from the given `source` has reached at
* least the given `Offset`. This method is intended for use primarily when writing tests.
*/
- private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = {
+ private[sql] def awaitOffset(source: BaseStreamingSource, newOffset: Offset): Unit = {
assertAwaitThread()
def notDone = {
val localCommittedOffsets = committedOffsets
http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index a9d50e3..a0ee683 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -61,7 +61,7 @@ case class StreamingRelation(dataSource: DataSource, sourceName: String, output:
* [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]].
*/
case class StreamingExecutionRelation(
- source: Source,
+ source: BaseStreamingSource,
output: Seq[Attribute])(session: SparkSession)
extends LeafNode {
@@ -92,7 +92,7 @@ case class StreamingRelationV2(
sourceName: String,
extraOptions: Map[String, String],
output: Seq[Attribute],
- v1DataSource: DataSource)(session: SparkSession)
+ v1Relation: Option[StreamingRelation])(session: SparkSession)
extends LeafNode {
override def isStreaming: Boolean = true
override def toString: String = sourceName
http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 2843ab1..9657b5e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, StreamingDataSourceV2Relation, WriteToDataSourceV2}
import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
import org.apache.spark.sql.sources.v2.DataSourceV2Options
import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, ContinuousWriteSupport}
@@ -174,7 +174,7 @@ class ContinuousExecution(
val loggedOffset = offsets.offsets(0)
val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json))
reader.setOffset(java.util.Optional.ofNullable(realOffset.orNull))
- DataSourceV2Relation(newOutput, reader)
+ new StreamingDataSourceV2Relation(newOutput, reader)
}
// Rewire the plan to use the new attributes that were returned by the source.
http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
index c9aa78a..b4b21e7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
@@ -32,10 +32,10 @@ import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType}
-case class ContinuousRateStreamPartitionOffset(
+case class RateStreamPartitionOffset(
partition: Int, currentValue: Long, currentTimeMs: Long) extends PartitionOffset
-class ContinuousRateStreamReader(options: DataSourceV2Options)
+class RateStreamContinuousReader(options: DataSourceV2Options)
extends ContinuousReader {
implicit val defaultFormats: DefaultFormats = DefaultFormats
@@ -48,7 +48,7 @@ class ContinuousRateStreamReader(options: DataSourceV2Options)
override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
assert(offsets.length == numPartitions)
val tuples = offsets.map {
- case ContinuousRateStreamPartitionOffset(i, currVal, nextRead) =>
+ case RateStreamPartitionOffset(i, currVal, nextRead) =>
(i, ValueRunTimeMsPair(currVal, nextRead))
}
RateStreamOffset(Map(tuples: _*))
@@ -86,7 +86,7 @@ class ContinuousRateStreamReader(options: DataSourceV2Options)
val start = partitionStartMap(i)
// Have each partition advance by numPartitions each row, with starting points staggered
// by their partition index.
- RateStreamReadTask(
+ RateStreamContinuousReadTask(
start.value,
start.runTimeMs,
i,
@@ -101,7 +101,7 @@ class ContinuousRateStreamReader(options: DataSourceV2Options)
}
-case class RateStreamReadTask(
+case class RateStreamContinuousReadTask(
startValue: Long,
startTimeMs: Long,
partitionIndex: Int,
@@ -109,10 +109,11 @@ case class RateStreamReadTask(
rowsPerSecond: Double)
extends ReadTask[Row] {
override def createDataReader(): DataReader[Row] =
- new RateStreamDataReader(startValue, startTimeMs, partitionIndex, increment, rowsPerSecond)
+ new RateStreamContinuousDataReader(
+ startValue, startTimeMs, partitionIndex, increment, rowsPerSecond)
}
-class RateStreamDataReader(
+class RateStreamContinuousDataReader(
startValue: Long,
startTimeMs: Long,
partitionIndex: Int,
@@ -151,5 +152,5 @@ class RateStreamDataReader(
override def close(): Unit = {}
override def getOffset(): PartitionOffset =
- ContinuousRateStreamPartitionOffset(partitionIndex, currentValue, nextReadTime)
+ RateStreamPartitionOffset(partitionIndex, currentValue, nextReadTime)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
index 97bada08..c0ed12c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
@@ -28,17 +28,38 @@ import org.json4s.jackson.Serialization
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming.{RateStreamOffset, ValueRunTimeMsPair}
-import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset}
import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType}
-import org.apache.spark.util.SystemClock
+import org.apache.spark.util.{ManualClock, SystemClock}
-class RateStreamV2Reader(options: DataSourceV2Options)
+/**
+ * This is a temporary register as we build out v2 migration. Microbatch read support should
+ * be implemented in the same register as v1.
+ */
+class RateSourceProviderV2 extends DataSourceV2 with MicroBatchReadSupport with DataSourceRegister {
+ override def createMicroBatchReader(
+ schema: Optional[StructType],
+ checkpointLocation: String,
+ options: DataSourceV2Options): MicroBatchReader = {
+ new RateStreamMicroBatchReader(options)
+ }
+
+ override def shortName(): String = "ratev2"
+}
+
+class RateStreamMicroBatchReader(options: DataSourceV2Options)
extends MicroBatchReader {
implicit val defaultFormats: DefaultFormats = DefaultFormats
- val clock = new SystemClock
+ val clock = {
+ // The option to use a manual clock is provided only for unit testing purposes.
+ if (options.get("useManualClock").orElse("false").toBoolean) new ManualClock
+ else new SystemClock
+ }
private val numPartitions =
options.get(RateStreamSourceV2.NUM_PARTITIONS).orElse("5").toInt
@@ -111,7 +132,7 @@ class RateStreamV2Reader(options: DataSourceV2Options)
val packedRows = mutable.ListBuffer[(Long, Long)]()
var outVal = startVal + numPartitions
- var outTimeMs = startTimeMs + msPerPartitionBetweenRows
+ var outTimeMs = startTimeMs
while (outVal <= endVal) {
packedRows.append((outTimeMs, outVal))
outVal += numPartitions
http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 2e92bee..52f2e26 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.streaming
-import java.util.Locale
+import java.util.{Locale, Optional}
import scala.collection.JavaConverters._
@@ -27,8 +27,9 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2}
+import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.sources.v2.DataSourceV2Options
-import org.apache.spark.sql.sources.v2.streaming.ContinuousReadSupport
+import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, MicroBatchReadSupport}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
@@ -166,19 +167,31 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap)
+ val v1Relation = ds match {
+ case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource))
+ case _ => None
+ }
ds match {
+ case s: MicroBatchReadSupport =>
+ val tempReader = s.createMicroBatchReader(
+ Optional.ofNullable(userSpecifiedSchema.orNull),
+ Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
+ options)
+ Dataset.ofRows(
+ sparkSession,
+ StreamingRelationV2(
+ s, source, extraOptions.toMap,
+ tempReader.readSchema().toAttributes, v1Relation)(sparkSession))
case s: ContinuousReadSupport =>
val tempReader = s.createContinuousReader(
- java.util.Optional.ofNullable(userSpecifiedSchema.orNull),
+ Optional.ofNullable(userSpecifiedSchema.orNull),
Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
options)
- // Generate the V1 node to catch errors thrown within generation.
- StreamingRelation(v1DataSource)
Dataset.ofRows(
sparkSession,
StreamingRelationV2(
s, source, extraOptions.toMap,
- tempReader.readSchema().toAttributes, v1DataSource)(sparkSession))
+ tempReader.readSchema().toAttributes, v1Relation)(sparkSession))
case _ =>
// Code path for data source v1.
Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource))
http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index b508f44..4b27e0d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -29,10 +29,10 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, ContinuousTrigger}
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.streaming.ContinuousWriteSupport
+import org.apache.spark.sql.sources.v2.streaming.{ContinuousWriteSupport, MicroBatchWriteSupport}
import org.apache.spark.util.{Clock, SystemClock, Utils}
/**
@@ -240,31 +240,35 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
"is not supported in streaming DataFrames/Datasets and will be disabled.")
}
- sink match {
- case v1Sink: Sink =>
- new StreamingQueryWrapper(new MicroBatchExecution(
+ (sink, trigger) match {
+ case (v2Sink: ContinuousWriteSupport, trigger: ContinuousTrigger) =>
+ UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode)
+ new StreamingQueryWrapper(new ContinuousExecution(
sparkSession,
userSpecifiedName.orNull,
checkpointLocation,
analyzedPlan,
- v1Sink,
+ v2Sink,
trigger,
triggerClock,
outputMode,
+ extraOptions,
deleteCheckpointOnStop))
- case v2Sink: ContinuousWriteSupport =>
- UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode)
- new StreamingQueryWrapper(new ContinuousExecution(
+ case (_: MicroBatchWriteSupport, _) | (_: Sink, _) =>
+ new StreamingQueryWrapper(new MicroBatchExecution(
sparkSession,
userSpecifiedName.orNull,
checkpointLocation,
analyzedPlan,
- v2Sink,
+ sink,
trigger,
triggerClock,
outputMode,
extraOptions,
deleteCheckpointOnStop))
+ case (_: ContinuousWriteSupport, t) if !t.isInstanceOf[ContinuousTrigger] =>
+ throw new AnalysisException(
+ "Sink only supports continuous writes, but a continuous trigger was not specified.")
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
index e11705a..85085d4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
@@ -18,20 +18,64 @@
package org.apache.spark.sql.execution.streaming
import java.util.Optional
+import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.continuous._
-import org.apache.spark.sql.execution.streaming.sources.{RateStreamBatchTask, RateStreamSourceV2, RateStreamV2Reader}
+import org.apache.spark.sql.execution.streaming.sources.{RateStreamBatchTask, RateStreamMicroBatchReader, RateStreamSourceV2}
import org.apache.spark.sql.sources.v2.DataSourceV2Options
-import org.apache.spark.sql.sources.v2.streaming.ContinuousReadSupport
+import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, MicroBatchReadSupport}
import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.util.ManualClock
class RateSourceV2Suite extends StreamTest {
+ import testImplicits._
+
+ case class AdvanceRateManualClock(seconds: Long) extends AddData {
+ override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = {
+ assert(query.nonEmpty)
+ val rateSource = query.get.logicalPlan.collect {
+ case StreamingExecutionRelation(source: RateStreamMicroBatchReader, _) => source
+ }.head
+ rateSource.clock.asInstanceOf[ManualClock].advance(TimeUnit.SECONDS.toMillis(seconds))
+ rateSource.setOffsetRange(Optional.empty(), Optional.empty())
+ (rateSource, rateSource.getEndOffset())
+ }
+ }
+
+ test("microbatch in registry") {
+ DataSource.lookupDataSource("ratev2", spark.sqlContext.conf).newInstance() match {
+ case ds: MicroBatchReadSupport =>
+ val reader = ds.createMicroBatchReader(Optional.empty(), "", DataSourceV2Options.empty())
+ assert(reader.isInstanceOf[RateStreamMicroBatchReader])
+ case _ =>
+ throw new IllegalStateException("Could not find v2 read support for rate")
+ }
+ }
+
+ test("basic microbatch execution") {
+ val input = spark.readStream
+ .format("rateV2")
+ .option("numPartitions", "1")
+ .option("rowsPerSecond", "10")
+ .option("useManualClock", "true")
+ .load()
+ testStream(input, useV2Sink = true)(
+ AdvanceRateManualClock(seconds = 1),
+ CheckLastBatch((0 until 10).map(v => new java.sql.Timestamp(v * 100L) -> v): _*),
+ StopStream,
+ StartStream(),
+ // Advance 2 seconds because creating a new RateSource will also create a new ManualClock
+ AdvanceRateManualClock(seconds = 2),
+ CheckLastBatch((10 until 20).map(v => new java.sql.Timestamp(v * 100L) -> v): _*)
+ )
+ }
+
test("microbatch - numPartitions propagated") {
- val reader = new RateStreamV2Reader(
+ val reader = new RateStreamMicroBatchReader(
new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" -> "33").asJava))
reader.setOffsetRange(Optional.empty(), Optional.empty())
val tasks = reader.createReadTasks()
@@ -39,7 +83,7 @@ class RateSourceV2Suite extends StreamTest {
}
test("microbatch - set offset") {
- val reader = new RateStreamV2Reader(DataSourceV2Options.empty())
+ val reader = new RateStreamMicroBatchReader(DataSourceV2Options.empty())
val startOffset = RateStreamOffset(Map((0, ValueRunTimeMsPair(0, 1000))))
val endOffset = RateStreamOffset(Map((0, ValueRunTimeMsPair(0, 2000))))
reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
@@ -48,7 +92,7 @@ class RateSourceV2Suite extends StreamTest {
}
test("microbatch - infer offsets") {
- val reader = new RateStreamV2Reader(
+ val reader = new RateStreamMicroBatchReader(
new DataSourceV2Options(Map("numPartitions" -> "1", "rowsPerSecond" -> "100").asJava))
reader.clock.waitTillTime(reader.clock.getTimeMillis() + 100)
reader.setOffsetRange(Optional.empty(), Optional.empty())
@@ -69,7 +113,7 @@ class RateSourceV2Suite extends StreamTest {
}
test("microbatch - predetermined batch size") {
- val reader = new RateStreamV2Reader(
+ val reader = new RateStreamMicroBatchReader(
new DataSourceV2Options(Map("numPartitions" -> "1", "rowsPerSecond" -> "20").asJava))
val startOffset = RateStreamOffset(Map((0, ValueRunTimeMsPair(0, 1000))))
val endOffset = RateStreamOffset(Map((0, ValueRunTimeMsPair(20, 2000))))
@@ -80,7 +124,7 @@ class RateSourceV2Suite extends StreamTest {
}
test("microbatch - data read") {
- val reader = new RateStreamV2Reader(
+ val reader = new RateStreamMicroBatchReader(
new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" -> "33").asJava))
val startOffset = RateStreamSourceV2.createInitialOffset(11, reader.creationTimeMs)
val endOffset = RateStreamOffset(startOffset.partitionToValueAndRunTimeMs.toSeq.map {
@@ -107,14 +151,14 @@ class RateSourceV2Suite extends StreamTest {
DataSource.lookupDataSource("rate", spark.sqlContext.conf).newInstance() match {
case ds: ContinuousReadSupport =>
val reader = ds.createContinuousReader(Optional.empty(), "", DataSourceV2Options.empty())
- assert(reader.isInstanceOf[ContinuousRateStreamReader])
+ assert(reader.isInstanceOf[RateStreamContinuousReader])
case _ =>
throw new IllegalStateException("Could not find v2 read support for rate")
}
}
test("continuous data") {
- val reader = new ContinuousRateStreamReader(
+ val reader = new RateStreamContinuousReader(
new DataSourceV2Options(Map("numPartitions" -> "2", "rowsPerSecond" -> "20").asJava))
reader.setOffset(Optional.empty())
val tasks = reader.createReadTasks()
@@ -122,17 +166,17 @@ class RateSourceV2Suite extends StreamTest {
val data = scala.collection.mutable.ListBuffer[Row]()
tasks.asScala.foreach {
- case t: RateStreamReadTask =>
+ case t: RateStreamContinuousReadTask =>
val startTimeMs = reader.getStartOffset()
.asInstanceOf[RateStreamOffset]
.partitionToValueAndRunTimeMs(t.partitionIndex)
.runTimeMs
- val r = t.createDataReader().asInstanceOf[RateStreamDataReader]
+ val r = t.createDataReader().asInstanceOf[RateStreamContinuousDataReader]
for (rowIndex <- 0 to 9) {
r.next()
data.append(r.get())
assert(r.getOffset() ==
- ContinuousRateStreamPartitionOffset(
+ RateStreamPartitionOffset(
t.partitionIndex,
t.partitionIndex + rowIndex * 2,
startTimeMs + (rowIndex + 1) * 100))
http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 4b7f0fb..d46461f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -105,7 +105,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
* the active query, and then return the source object the data was added, as well as the
* offset of added data.
*/
- def addData(query: Option[StreamExecution]): (Source, Offset)
+ def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset)
}
/** A trait that can be extended when testing a source. */
http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index eda0d8a..9562c10 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -61,7 +61,7 @@ class ContinuousSuiteBase extends StreamTest {
case s: ContinuousExecution =>
assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure query is initialized")
val reader = s.lastExecution.executedPlan.collectFirst {
- case DataSourceV2ScanExec(_, r: ContinuousRateStreamReader) => r
+ case DataSourceV2ScanExec(_, r: RateStreamContinuousReader) => r
}.get
val deltaMs = numTriggers * 1000 + 300
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org