You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/01/29 08:07:37 UTC
[spark] branch master updated: [SPARK-26695][SQL] data source v2
API refactor - continuous read
This is an automated email from the ASF dual-hosted git repository.
lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e97ab1d [SPARK-26695][SQL] data source v2 API refactor - continuous read
e97ab1d is described below
commit e97ab1d9807134bb557ae73920af61e8534b2b08
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Tue Jan 29 00:07:27 2019 -0800
[SPARK-26695][SQL] data source v2 API refactor - continuous read
## What changes were proposed in this pull request?
Following https://github.com/apache/spark/pull/23430, this PR does the API refactor for continuous read, w.r.t. the [doc](https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing)
The major changes:
1. rename `XXXContinuousReadSupport` to `XXXContinuousStream`
2. at the beginning of continuous streaming execution, convert `StreamingRelationV2` to `StreamingDataSourceV2Relation` directly, instead of `StreamingExecutionRelation`.
3. remove all the hacks as we have finished all the read side API refactor
## How was this patch tested?
existing tests
Closes #23619 from cloud-fan/continuous.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: gatorsmile <ga...@gmail.com>
---
...adSupport.scala => KafkaContinuousStream.scala} | 90 ++++++---------
.../spark/sql/kafka010/KafkaSourceProvider.scala | 79 ++++++-------
.../sql/kafka010/KafkaContinuousSourceSuite.scala | 8 +-
.../spark/sql/kafka010/KafkaContinuousTest.scala | 4 +-
.../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 20 +---
.../sql/sources/v2/BatchReadSupportProvider.java | 61 ----------
.../sources/v2/ContinuousReadSupportProvider.java | 70 ------------
.../apache/spark/sql/sources/v2/DataSourceV2.java | 2 +-
...figBuilder.java => SupportsContinuousRead.java} | 18 +--
.../apache/spark/sql/sources/v2/reader/Batch.java | 2 +-
.../sql/sources/v2/reader/BatchReadSupport.java | 51 ---------
.../sql/sources/v2/reader/InputPartition.java | 2 +-
.../v2/reader/OldSupportsReportPartitioning.java | 38 -------
.../v2/reader/OldSupportsReportStatistics.java | 38 -------
.../spark/sql/sources/v2/reader/ReadSupport.java | 50 --------
.../apache/spark/sql/sources/v2/reader/Scan.java | 22 +++-
.../spark/sql/sources/v2/reader/ScanConfig.java | 45 --------
.../v2/reader/SupportsPushDownRequiredColumns.java | 2 +-
.../v2/reader/SupportsReportPartitioning.java | 6 +-
.../v2/reader/SupportsReportStatistics.java | 4 +-
.../v2/reader/streaming/ContinuousReadSupport.java | 77 -------------
...MicroBatchStream.java => ContinuousStream.java} | 43 ++++---
.../v2/reader/streaming/MicroBatchStream.java | 2 +-
.../sql/sources/v2/reader/streaming/Offset.java | 2 +-
.../v2/reader/streaming/SparkDataStream.java | 3 +-
.../v2/reader/streaming/StreamingReadSupport.java | 52 ---------
.../execution/datasources/v2/BatchScanExec.scala | 49 ++++++++
.../datasources/v2/ContinuousScanExec.scala | 99 ++++------------
.../datasources/v2/DataSourceV2Relation.scala | 48 +-------
...anExec.scala => DataSourceV2ScanExecBase.scala} | 39 ++-----
.../datasources/v2/DataSourceV2Strategy.scala | 16 ++-
.../datasources/v2/MicroBatchScanExec.scala | 55 ++-------
.../execution/streaming/MicroBatchExecution.scala | 2 +-
.../SimpleStreamingScanConfigBuilder.scala | 40 -------
.../execution/streaming/StreamingRelation.scala | 26 +----
.../streaming/continuous/ContinuousExecution.scala | 126 +++++++++------------
.../continuous/ContinuousRateStreamSource.scala | 26 ++---
.../continuous/ContinuousTextSocketSource.scala | 39 ++-----
.../streaming/continuous/EpochCoordinator.scala | 10 +-
.../spark/sql/execution/streaming/memory.scala | 33 +++---
.../streaming/sources/ContinuousMemoryStream.scala | 27 +----
.../streaming/sources/RateStreamProvider.scala | 18 ++-
.../sources/TextSocketMicroBatchStream.scala | 8 +-
.../sources/TextSocketSourceProvider.scala | 25 ++--
.../spark/sql/streaming/DataStreamReader.scala | 35 +-----
.../sources/RateStreamProviderSuite.scala | 29 ++---
.../streaming/sources/TextSocketStreamSuite.scala | 61 +++++-----
.../spark/sql/sources/v2/DataSourceV2Suite.scala | 8 +-
.../apache/spark/sql/streaming/StreamSuite.scala | 12 +-
.../apache/spark/sql/streaming/StreamTest.scala | 11 +-
.../spark/sql/streaming/StreamingQuerySuite.scala | 4 +-
.../ContinuousQueuedDataReaderSuite.scala | 4 +-
.../sql/streaming/continuous/ContinuousSuite.scala | 2 +-
.../continuous/EpochCoordinatorSuite.scala | 6 +-
.../sources/StreamingDataSourceV2Suite.scala | 88 ++++++--------
55 files changed, 474 insertions(+), 1263 deletions(-)
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
similarity index 84%
rename from external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
rename to external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
index f328567..0e61717 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
@@ -30,10 +30,9 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming._
-import org.apache.spark.sql.types.StructType
/**
- * A [[ContinuousReadSupport]] for data from kafka.
+ * A [[ContinuousStream]] for data from kafka.
*
* @param offsetReader a reader used to get kafka offsets. Note that the actual data will be
* read by per-task consumers generated later.
@@ -46,17 +45,23 @@ import org.apache.spark.sql.types.StructType
* scenarios, where some offsets after the specified initial ones can't be
* properly read.
*/
-class KafkaContinuousReadSupport(
+class KafkaContinuousStream(
offsetReader: KafkaOffsetReader,
kafkaParams: ju.Map[String, Object],
sourceOptions: Map[String, String],
metadataPath: String,
initialOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean)
- extends ContinuousReadSupport with Logging {
+ extends ContinuousStream with Logging {
private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
+ // Initialized when creating reader factories. If this diverges from the partitions at the latest
+ // offsets, we need to reconfigure.
+ // Exposed outside this object only for unit tests.
+ @volatile private[sql] var knownPartitions: Set[TopicPartition] = _
+
+
override def initialOffset(): Offset = {
val offsets = initialOffsets match {
case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
@@ -67,18 +72,32 @@ class KafkaContinuousReadSupport(
offsets
}
- override def fullSchema(): StructType = KafkaOffsetReader.kafkaSchema
-
- override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = {
- new KafkaContinuousScanConfigBuilder(fullSchema(), start, offsetReader, reportDataLoss)
- }
-
override def deserializeOffset(json: String): Offset = {
KafkaSourceOffset(JsonUtils.partitionOffsets(json))
}
- override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
- val startOffsets = config.asInstanceOf[KafkaContinuousScanConfig].startOffsets
+ override def planInputPartitions(start: Offset): Array[InputPartition] = {
+ val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(start)
+
+ val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet
+ val newPartitions = currentPartitionSet.diff(oldStartPartitionOffsets.keySet)
+ val newPartitionOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq)
+
+ val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
+ if (deletedPartitions.nonEmpty) {
+ val message = if (
+ offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+ s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}"
+ } else {
+ s"$deletedPartitions are gone. Some data may have been missed."
+ }
+ reportDataLoss(message)
+ }
+
+ val startOffsets = newPartitionOffsets ++
+ oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_))
+ knownPartitions = startOffsets.keySet
+
startOffsets.toSeq.map {
case (topicPartition, start) =>
KafkaContinuousInputPartition(
@@ -86,8 +105,7 @@ class KafkaContinuousReadSupport(
}.toArray
}
- override def createContinuousReaderFactory(
- config: ScanConfig): ContinuousPartitionReaderFactory = {
+ override def createContinuousReaderFactory(): ContinuousPartitionReaderFactory = {
KafkaContinuousReaderFactory
}
@@ -105,8 +123,7 @@ class KafkaContinuousReadSupport(
KafkaSourceOffset(mergedMap)
}
- override def needsReconfiguration(config: ScanConfig): Boolean = {
- val knownPartitions = config.asInstanceOf[KafkaContinuousScanConfig].knownPartitions
+ override def needsReconfiguration(): Boolean = {
offsetReader.fetchLatestOffsets(None).keySet != knownPartitions
}
@@ -151,47 +168,6 @@ object KafkaContinuousReaderFactory extends ContinuousPartitionReaderFactory {
}
}
-class KafkaContinuousScanConfigBuilder(
- schema: StructType,
- startOffset: Offset,
- offsetReader: KafkaOffsetReader,
- reportDataLoss: String => Unit)
- extends ScanConfigBuilder {
-
- override def build(): ScanConfig = {
- val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(startOffset)
-
- val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet
- val newPartitions = currentPartitionSet.diff(oldStartPartitionOffsets.keySet)
- val newPartitionOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq)
-
- val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
- if (deletedPartitions.nonEmpty) {
- val message = if (
- offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
- s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}"
- } else {
- s"$deletedPartitions are gone. Some data may have been missed."
- }
- reportDataLoss(message)
- }
-
- val startOffsets = newPartitionOffsets ++
- oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_))
- KafkaContinuousScanConfig(schema, startOffsets)
- }
-}
-
-case class KafkaContinuousScanConfig(
- readSchema: StructType,
- startOffsets: Map[TopicPartition, Long])
- extends ScanConfig {
-
- // Created when building the scan config builder. If this diverges from the partitions at the
- // latest offsets, we need to reconfigure the kafka read support.
- def knownPartitions: Set[TopicPartition] = startOffsets.keySet
-}
-
/**
* A per-task data reader for continuous Kafka processing.
*
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 58c90b8..9238899b 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
-import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
@@ -48,7 +48,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
with RelationProvider
with CreatableRelationProvider
with StreamingWriteSupportProvider
- with ContinuousReadSupportProvider
with TableProvider
with Logging {
import KafkaSourceProvider._
@@ -108,46 +107,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}
/**
- * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to read
- * Kafka data in a continuous streaming query.
- */
- override def createContinuousReadSupport(
- metadataPath: String,
- options: DataSourceOptions): KafkaContinuousReadSupport = {
- val parameters = options.asMap().asScala.toMap
- validateStreamOptions(parameters)
- // Each running query should use its own group id. Otherwise, the query may be only assigned
- // partial data since Kafka will assign partitions to multiple consumers having the same group
- // id. Hence, we should generate a unique id for each query.
- val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath)
-
- val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
- val specifiedKafkaParams =
- parameters
- .keySet
- .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
- .map { k => k.drop(6).toString -> parameters(k) }
- .toMap
-
- val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
- STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
-
- val kafkaOffsetReader = new KafkaOffsetReader(
- strategy(caseInsensitiveParams),
- kafkaParamsForDriver(specifiedKafkaParams),
- parameters,
- driverGroupIdPrefix = s"$uniqueGroupId-driver")
-
- new KafkaContinuousReadSupport(
- kafkaOffsetReader,
- kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
- parameters,
- metadataPath,
- startingStreamOffsets,
- failOnDataLoss(caseInsensitiveParams))
- }
-
- /**
* Returns a new base relation with the given parameters.
*
* @note The parameters' keywords are case insensitive and this insensitivity is enforced
@@ -406,7 +365,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}
class KafkaTable(strategy: => ConsumerStrategy) extends Table
- with SupportsMicroBatchRead {
+ with SupportsMicroBatchRead with SupportsContinuousRead {
override def name(): String = s"Kafka $strategy"
@@ -449,6 +408,40 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
startingStreamOffsets,
failOnDataLoss(caseInsensitiveParams))
}
+
+ override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
+ val parameters = options.asMap().asScala.toMap
+ validateStreamOptions(parameters)
+ // Each running query should use its own group id. Otherwise, the query may be only assigned
+ // partial data since Kafka will assign partitions to multiple consumers having the same group
+ // id. Hence, we should generate a unique id for each query.
+ val uniqueGroupId = streamingUniqueGroupId(parameters, checkpointLocation)
+
+ val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
+ val specifiedKafkaParams =
+ parameters
+ .keySet
+ .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
+ .map { k => k.drop(6).toString -> parameters(k) }
+ .toMap
+
+ val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
+ caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
+
+ val kafkaOffsetReader = new KafkaOffsetReader(
+ strategy(caseInsensitiveParams),
+ kafkaParamsForDriver(specifiedKafkaParams),
+ parameters,
+ driverGroupIdPrefix = s"$uniqueGroupId-driver")
+
+ new KafkaContinuousStream(
+ kafkaOffsetReader,
+ kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
+ parameters,
+ checkpointLocation,
+ startingStreamOffsets,
+ failOnDataLoss(caseInsensitiveParams))
+ }
}
}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
index 2f7fd7f..be0cea2 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -209,11 +209,11 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
assert(
query.lastExecution.executedPlan.collectFirst {
case scan: ContinuousScanExec
- if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
- scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
- }.exists { config =>
+ if scan.stream.isInstanceOf[KafkaContinuousStream] =>
+ scan.stream.asInstanceOf[KafkaContinuousStream]
+ }.exists { stream =>
// Ensure the new topic is present and the old topic is gone.
- config.knownPartitions.exists(_.topic == topic2)
+ stream.knownPartitions.exists(_.topic == topic2)
},
s"query never reconfigured to new topic $topic2")
}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
index fa3b623..ad1c2c5 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
@@ -48,8 +48,8 @@ trait KafkaContinuousTest extends KafkaSourceTest {
assert(
query.lastExecution.executedPlan.collectFirst {
case scan: ContinuousScanExec
- if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
- scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
+ if scan.stream.isInstanceOf[KafkaContinuousStream] =>
+ scan.stream.asInstanceOf[KafkaContinuousStream]
}.exists(_.knownPartitions.size == newCount),
s"query never reconfigured to $newCount partitions")
}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 90b5015..aa7baac 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -28,14 +28,13 @@ import scala.collection.JavaConverters._
import scala.io.Source
import scala.util.Random
-import org.apache.kafka.clients.admin.{AdminClient, ConsumerGroupListing}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
+import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
import org.apache.spark.sql.{Dataset, ForeachWriter, SparkSession}
-import org.apache.spark.sql.execution.datasources.v2.{OldStreamingDataSourceV2Relation, StreamingDataSourceV2Relation}
+import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
@@ -118,17 +117,10 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf
val sources: Seq[BaseStreamingSource] = {
query.get.logicalPlan.collect {
case StreamingExecutionRelation(source: KafkaSource, _) => source
- case r: StreamingDataSourceV2Relation
- if r.stream.isInstanceOf[KafkaMicroBatchStream] =>
- r.stream.asInstanceOf[KafkaMicroBatchStream]
- } ++ (query.get.lastExecution match {
- case null => Seq()
- case e => e.logical.collect {
- case r: OldStreamingDataSourceV2Relation
- if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
- r.readSupport.asInstanceOf[KafkaContinuousReadSupport]
- }
- })
+ case r: StreamingDataSourceV2Relation if r.stream.isInstanceOf[KafkaMicroBatchStream] ||
+ r.stream.isInstanceOf[KafkaContinuousStream] =>
+ r.stream
+ }
}.distinct
if (sources.isEmpty) {
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java
deleted file mode 100644
index 2a4933d..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
-import org.apache.spark.sql.sources.v2.reader.BatchReadSupport;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
- * provide data reading ability for batch processing.
- *
- * This interface is used to create {@link BatchReadSupport} instances when end users run
- * {@code SparkSession.read.format(...).option(...).load()}.
- */
-@Evolving
-public interface BatchReadSupportProvider extends DataSourceV2 {
-
- /**
- * Creates a {@link BatchReadSupport} instance to load the data from this data source with a user
- * specified schema, which is called by Spark at the beginning of each batch query.
- *
- * Spark will call this method at the beginning of each batch query to create a
- * {@link BatchReadSupport} instance.
- *
- * By default this method throws {@link UnsupportedOperationException}, implementations should
- * override this method to handle user specified schema.
- *
- * @param schema the user specified schema.
- * @param options the options for the returned data source reader, which is an immutable
- * case-insensitive string-to-string map.
- */
- default BatchReadSupport createBatchReadSupport(StructType schema, DataSourceOptions options) {
- return DataSourceV2Utils.failForUserSpecifiedSchema(this);
- }
-
- /**
- * Creates a {@link BatchReadSupport} instance to scan the data from this data source, which is
- * called by Spark at the beginning of each batch query.
- *
- * @param options the options for the returned data source reader, which is an immutable
- * case-insensitive string-to-string map.
- */
- BatchReadSupport createBatchReadSupport(DataSourceOptions options);
-}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java
deleted file mode 100644
index b4f2eb3..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
-import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
- * provide data reading ability for continuous stream processing.
- *
- * This interface is used to create {@link ContinuousReadSupport} instances when end users run
- * {@code SparkSession.readStream.format(...).option(...).load()} with a continuous trigger.
- */
-@Evolving
-public interface ContinuousReadSupportProvider extends DataSourceV2 {
-
- /**
- * Creates a {@link ContinuousReadSupport} instance to scan the data from this streaming data
- * source with a user specified schema, which is called by Spark at the beginning of each
- * continuous streaming query.
- *
- * By default this method throws {@link UnsupportedOperationException}, implementations should
- * override this method to handle user specified schema.
- *
- * @param schema the user provided schema.
- * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
- * recovery. Readers for the same logical source in the same query
- * will be given the same checkpointLocation.
- * @param options the options for the returned data source reader, which is an immutable
- * case-insensitive string-to-string map.
- */
- default ContinuousReadSupport createContinuousReadSupport(
- StructType schema,
- String checkpointLocation,
- DataSourceOptions options) {
- return DataSourceV2Utils.failForUserSpecifiedSchema(this);
- }
-
- /**
- * Creates a {@link ContinuousReadSupport} instance to scan the data from this streaming data
- * source, which is called by Spark at the beginning of each continuous streaming query.
- *
- * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
- * recovery. Readers for the same logical source in the same query
- * will be given the same checkpointLocation.
- * @param options the options for the returned data source reader, which is an immutable
- * case-insensitive string-to-string map.
- */
- ContinuousReadSupport createContinuousReadSupport(
- String checkpointLocation,
- DataSourceOptions options);
-}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
index 4aaa57d..43bdcca 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
@@ -20,7 +20,7 @@ package org.apache.spark.sql.sources.v2;
import org.apache.spark.annotation.Evolving;
/**
- * TODO: remove it when we finish the API refactor for streaming side.
+ * TODO: remove it when we finish the API refactor for streaming write side.
*/
@Evolving
public interface DataSourceV2 {}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfigBuilder.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java
similarity index 59%
rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfigBuilder.java
rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java
index 4922962..b7fa3f2 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfigBuilder.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java
@@ -15,16 +15,20 @@
* limitations under the License.
*/
-package org.apache.spark.sql.sources.v2.reader;
+package org.apache.spark.sql.sources.v2;
import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.v2.reader.Scan;
+import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
/**
- * An interface for building the {@link ScanConfig}. Implementations can mixin those
- * SupportsPushDownXYZ interfaces to do operator pushdown, and keep the operator pushdown result in
- * the returned {@link ScanConfig}.
+ * An empty mix-in interface for {@link Table}, to indicate this table supports streaming scan with
+ * continuous mode.
+ * <p>
+ * If a {@link Table} implements this interface, the
+ * {@link SupportsRead#newScanBuilder(DataSourceOptions)} must return a {@link ScanBuilder} that
+ * builds {@link Scan} with {@link Scan#toContinuousStream(String)} implemented.
+ * </p>
*/
@Evolving
-public interface ScanConfigBuilder {
- ScanConfig build();
-}
+public interface SupportsContinuousRead extends SupportsRead { }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java
index bcfa198..28d80b7 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java
@@ -42,7 +42,7 @@ public interface Batch {
InputPartition[] planInputPartitions();
/**
- * Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}.
+ * Returns a factory to create a {@link PartitionReader} for each {@link InputPartition}.
*/
PartitionReaderFactory createReaderFactory();
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java
deleted file mode 100644
index 518a8b0..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.Evolving;
-
-/**
- * An interface that defines how to load the data from data source for batch processing.
- *
- * The execution engine will get an instance of this interface from a data source provider
- * (e.g. {@link org.apache.spark.sql.sources.v2.BatchReadSupportProvider}) at the start of a batch
- * query, then call {@link #newScanConfigBuilder()} and create an instance of {@link ScanConfig}.
- * The {@link ScanConfigBuilder} can apply operator pushdown and keep the pushdown result in
- * {@link ScanConfig}. The {@link ScanConfig} will be used to create input partitions and reader
- * factory to scan data from the data source with a Spark job.
- */
-@Evolving
-public interface BatchReadSupport extends ReadSupport {
-
- /**
- * Returns a builder of {@link ScanConfig}. Spark will call this method and create a
- * {@link ScanConfig} for each data scanning job.
- *
- * The builder can take some query specific information to do operators pushdown, and keep these
- * information in the created {@link ScanConfig}.
- *
- * This is the first step of the data scan. All other methods in {@link BatchReadSupport} needs
- * to take {@link ScanConfig} as an input.
- */
- ScanConfigBuilder newScanConfigBuilder();
-
- /**
- * Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}.
- */
- PartitionReaderFactory createReaderFactory(ScanConfig config);
-}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
index 5f52480..4133497 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
@@ -23,7 +23,7 @@ import org.apache.spark.annotation.Evolving;
/**
* A serializable representation of an input partition returned by
- * {@link ReadSupport#planInputPartitions(ScanConfig)}.
+ * {@link Batch#planInputPartitions()} and the corresponding ones in streaming .
*
* Note that {@link InputPartition} will be serialized and sent to executors, then
* {@link PartitionReader} will be created by
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportPartitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportPartitioning.java
deleted file mode 100644
index 347a465..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportPartitioning.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
-
-/**
- * A mix in interface for {@link BatchReadSupport}. Data sources can implement this interface to
- * report data partitioning and try to avoid shuffle at Spark side.
- *
- * Note that, when a {@link ReadSupport} implementation creates exactly one {@link InputPartition},
- * Spark may avoid adding a shuffle even if the reader does not implement this interface.
- */
-@Evolving
-// TODO: remove it, after we finish the API refactor completely.
-public interface OldSupportsReportPartitioning extends ReadSupport {
-
- /**
- * Returns the output data partitioning that this reader guarantees.
- */
- Partitioning outputPartitioning(ScanConfig config);
-}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportStatistics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportStatistics.java
deleted file mode 100644
index 0d3ec17..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportStatistics.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.Evolving;
-
-/**
- * A mix in interface for {@link BatchReadSupport}. Data sources can implement this interface to
- * report statistics to Spark.
- *
- * As of Spark 2.4, statistics are reported to the optimizer before any operator is pushed to the
- * data source. Implementations that return more accurate statistics based on pushed operators will
- * not improve query performance until the planner can push operators before getting stats.
- */
-@Evolving
-// TODO: remove it, after we finish the API refactor completely.
-public interface OldSupportsReportStatistics extends ReadSupport {
-
- /**
- * Returns the estimated statistics of this data source scan.
- */
- Statistics estimateStatistics(ScanConfig config);
-}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java
deleted file mode 100644
index b1f610a..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * The base interface for all the batch and streaming read supports. Data sources should implement
- * concrete read support interfaces like {@link BatchReadSupport}.
- *
- * If Spark fails to execute any methods in the implementations of this interface (by throwing an
- * exception), the read action will fail and no Spark job will be submitted.
- */
-@Evolving
-public interface ReadSupport {
-
- /**
- * Returns the full schema of this data source, which is usually the physical schema of the
- * underlying storage. This full schema should not be affected by column pruning or other
- * optimizations.
- */
- StructType fullSchema();
-
- /**
- * Returns a list of {@link InputPartition input partitions}. Each {@link InputPartition}
- * represents a data split that can be processed by one Spark task. The number of input
- * partitions returned here is the same as the number of RDD partitions this scan outputs.
- *
- * Note that, this may not be a full scan if the data source supports optimization like filter
- * push-down. Implementations should check the input {@link ScanConfig} and adjust the resulting
- * {@link InputPartition input partitions}.
- */
- InputPartition[] planInputPartitions(ScanConfig config);
-}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java
index c60fb2b..25ab06e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java
@@ -18,9 +18,11 @@
package org.apache.spark.sql.sources.v2.reader;
import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousStream;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.sources.v2.SupportsBatchRead;
+import org.apache.spark.sql.sources.v2.SupportsContinuousRead;
import org.apache.spark.sql.sources.v2.SupportsMicroBatchRead;
import org.apache.spark.sql.sources.v2.Table;
@@ -65,7 +67,7 @@ public interface Scan {
* @throws UnsupportedOperationException
*/
default Batch toBatch() {
- throw new UnsupportedOperationException("Batch scans are not supported");
+ throw new UnsupportedOperationException(description() + ": Batch scan are not supported");
}
/**
@@ -81,6 +83,22 @@ public interface Scan {
* @throws UnsupportedOperationException
*/
default MicroBatchStream toMicroBatchStream(String checkpointLocation) {
- throw new UnsupportedOperationException("Micro-batch scans are not supported");
+ throw new UnsupportedOperationException(description() + ": Micro-batch scan are not supported");
+ }
+
+ /**
+ * Returns the physical representation of this scan for streaming query with continuous mode. By
+ * default this method throws exception, data sources must overwrite this method to provide an
+ * implementation, if the {@link Table} that creates this scan implements
+ * {@link SupportsContinuousRead}.
+ *
+ * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
+ * recovery. Data streams for the same logical source in the same query
+ * will be given the same checkpointLocation.
+ *
+ * @throws UnsupportedOperationException
+ */
+ default ContinuousStream toContinuousStream(String checkpointLocation) {
+ throw new UnsupportedOperationException(description() + ": Continuous scan are not supported");
}
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java
deleted file mode 100644
index c8cff68..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * An interface that carries query specific information for the data scanning job, like operator
- * pushdown information and streaming query offsets. This is defined as an empty interface, and data
- * sources should define their own {@link ScanConfig} classes.
- *
- * For APIs that take a {@link ScanConfig} as input, like
- * {@link ReadSupport#planInputPartitions(ScanConfig)},
- * {@link BatchReadSupport#createReaderFactory(ScanConfig)} and
- * {@link OldSupportsReportStatistics#estimateStatistics(ScanConfig)}, implementations mostly need
- * to cast the input {@link ScanConfig} to the concrete {@link ScanConfig} class of the data source.
- */
-@Evolving
-public interface ScanConfig {
-
- /**
- * Returns the actual schema of this data source reader, which may be different from the physical
- * schema of the underlying storage, as column pruning or other optimizations may happen.
- *
- * If this method fails (by throwing an exception), the action will fail and no Spark job will be
- * submitted.
- */
- StructType readSchema();
-}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
index 60e71c5..862bd14 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
@@ -35,7 +35,7 @@ public interface SupportsPushDownRequiredColumns extends ScanBuilder {
* also OK to do the pruning partially, e.g., a data source may not be able to prune nested
* fields, and only prune top-level columns.
*
- * Note that, {@link ScanConfig#readSchema()} implementation should take care of the column
+ * Note that, {@link Scan#readSchema()} implementation should take care of the column
* pruning applied here.
*/
void pruneColumns(StructType requiredSchema);
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
index ba17581..4ce97bc 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
@@ -21,14 +21,14 @@ import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
/**
- * A mix in interface for {@link Batch}. Data sources can implement this interface to
+ * A mix in interface for {@link Scan}. Data sources can implement this interface to
* report data partitioning and try to avoid shuffle at Spark side.
*
- * Note that, when a {@link Batch} implementation creates exactly one {@link InputPartition},
+ * Note that, when a {@link Scan} implementation creates exactly one {@link InputPartition},
* Spark may avoid adding a shuffle even if the reader does not implement this interface.
*/
@Evolving
-public interface SupportsReportPartitioning extends Batch {
+public interface SupportsReportPartitioning extends Scan {
/**
* Returns the output data partitioning that this reader guarantees.
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
index d9f5fb6..d7364af 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
@@ -20,7 +20,7 @@ package org.apache.spark.sql.sources.v2.reader;
import org.apache.spark.annotation.Evolving;
/**
- * A mix in interface for {@link Batch}. Data sources can implement this interface to
+ * A mix in interface for {@link Scan}. Data sources can implement this interface to
* report statistics to Spark.
*
* As of Spark 2.4, statistics are reported to the optimizer before any operator is pushed to the
@@ -28,7 +28,7 @@ import org.apache.spark.annotation.Evolving;
* not improve query performance until the planner can push operators before getting stats.
*/
@Evolving
-public interface SupportsReportStatistics extends Batch {
+public interface SupportsReportStatistics extends Scan {
/**
* Returns the estimated statistics of this data source scan.
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java
deleted file mode 100644
index 2b784ac..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader.streaming;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.ScanConfig;
-import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
-
-/**
- * An interface that defines how to load the data from data source for continuous streaming
- * processing.
- *
- * The execution engine will get an instance of this interface from a data source provider
- * (e.g. {@link org.apache.spark.sql.sources.v2.ContinuousReadSupportProvider}) at the start of a
- * streaming query, then call {@link #newScanConfigBuilder(Offset)} and create an instance of
- * {@link ScanConfig} for the duration of the streaming query or until
- * {@link #needsReconfiguration(ScanConfig)} is true. The {@link ScanConfig} will be used to create
- * input partitions and reader factory to scan data with a Spark job for its duration. At the end
- * {@link #stop()} will be called when the streaming execution is completed. Note that a single
- * query may have multiple executions due to restart or failure recovery.
- */
-@Evolving
-public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource {
-
- /**
- * Returns a builder of {@link ScanConfig}. Spark will call this method and create a
- * {@link ScanConfig} for each data scanning job.
- *
- * The builder can take some query specific information to do operators pushdown, store streaming
- * offsets, etc., and keep these information in the created {@link ScanConfig}.
- *
- * This is the first step of the data scan. All other methods in {@link ContinuousReadSupport}
- * needs to take {@link ScanConfig} as an input.
- */
- ScanConfigBuilder newScanConfigBuilder(Offset start);
-
- /**
- * Returns a factory, which produces one {@link ContinuousPartitionReader} for one
- * {@link InputPartition}.
- */
- ContinuousPartitionReaderFactory createContinuousReaderFactory(ScanConfig config);
-
- /**
- * Merge partitioned offsets coming from {@link ContinuousPartitionReader} instances
- * for each partition to a single global offset.
- */
- Offset mergeOffsets(PartitionOffset[] offsets);
-
- /**
- * The execution engine will call this method in every epoch to determine if new input
- * partitions need to be generated, which may be required if for example the underlying
- * source system has had partitions added or removed.
- *
- * If true, the query will be shut down and restarted with a new {@link ContinuousReadSupport}
- * instance.
- */
- default boolean needsReconfiguration(ScanConfig config) {
- return false;
- }
-}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchStream.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousStream.java
similarity index 55%
copy from sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchStream.java
copy to sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousStream.java
index 2fb3957..fff5b95 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchStream.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousStream.java
@@ -19,23 +19,16 @@ package org.apache.spark.sql.sources.v2.reader.streaming;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.PartitionReader;
-import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory;
import org.apache.spark.sql.sources.v2.reader.Scan;
/**
- * A {@link SparkDataStream} for streaming queries with micro-batch mode.
+ * A {@link SparkDataStream} for streaming queries with continuous mode.
*/
@Evolving
-public interface MicroBatchStream extends SparkDataStream {
+public interface ContinuousStream extends SparkDataStream {
/**
- * Returns the most recent offset available.
- */
- Offset latestOffset();
-
- /**
- * Returns a list of {@link InputPartition input partitions} given the start and end offsets. Each
+ * Returns a list of {@link InputPartition input partitions} given the start offset. Each
* {@link InputPartition} represents a data split that can be processed by one Spark task. The
* number of input partitions returned here is the same as the number of RDD partitions this scan
* outputs.
@@ -44,14 +37,34 @@ public interface MicroBatchStream extends SparkDataStream {
* and is responsible for creating splits for that filter, which is not a full scan.
* </p>
* <p>
- * This method will be called multiple times, to launch one Spark job for each micro-batch in this
- * data stream.
+ * This method will be called to launch one Spark job for reading the data stream. It will be
+ * called more than once, if {@link #needsReconfiguration()} returns true and Spark needs to
+ * launch a new job.
* </p>
*/
- InputPartition[] planInputPartitions(Offset start, Offset end);
+ InputPartition[] planInputPartitions(Offset start);
+
+ /**
+ * Returns a factory to create a {@link ContinuousPartitionReader} for each
+ * {@link InputPartition}.
+ */
+ ContinuousPartitionReaderFactory createContinuousReaderFactory();
+
+ /**
+ * Merge partitioned offsets coming from {@link ContinuousPartitionReader} instances
+ * for each partition to a single global offset.
+ */
+ Offset mergeOffsets(PartitionOffset[] offsets);
/**
- * Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}.
+ * The execution engine will call this method in every epoch to determine if new input
+ * partitions need to be generated, which may be required if for example the underlying
+ * source system has had partitions added or removed.
+ *
+ * If true, the Spark job to scan this continuous data stream will be interrupted and Spark will
+ * launch it again with a new list of {@link InputPartition input partitions}.
*/
- PartitionReaderFactory createReaderFactory();
+ default boolean needsReconfiguration() {
+ return false;
+ }
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchStream.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchStream.java
index 2fb3957..330f07b 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchStream.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchStream.java
@@ -51,7 +51,7 @@ public interface MicroBatchStream extends SparkDataStream {
InputPartition[] planInputPartitions(Offset start, Offset end);
/**
- * Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}.
+ * Returns a factory to create a {@link PartitionReader} for each {@link InputPartition}.
*/
PartitionReaderFactory createReaderFactory();
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
index 67bff0c..a066713 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
@@ -21,7 +21,7 @@ import org.apache.spark.annotation.Evolving;
/**
* An abstract representation of progress through a {@link MicroBatchStream} or
- * {@link ContinuousReadSupport}.
+ * {@link ContinuousStream}.
* During execution, offsets provided by the data source implementation will be logged and used as
* restart checkpoints. Each source should provide an offset implementation which the source can use
* to reconstruct a position in the stream up to which data has been seen/processed.
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SparkDataStream.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SparkDataStream.java
index 8ea34be..30f38ce 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SparkDataStream.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SparkDataStream.java
@@ -24,7 +24,8 @@ import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
* The base interface representing a readable data stream in a Spark streaming query. It's
* responsible to manage the offsets of the streaming source in the streaming query.
*
- * Data sources should implement concrete data stream interfaces: {@link MicroBatchStream}.
+ * Data sources should implement concrete data stream interfaces:
+ * {@link MicroBatchStream} and {@link ContinuousStream}.
*/
@Evolving
public interface SparkDataStream extends BaseStreamingSource {
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
deleted file mode 100644
index 9a8c1bd..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader.streaming;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.spark.sql.sources.v2.reader.ReadSupport;
-
-/**
- * A base interface for streaming read support. Data sources should implement concrete streaming
- * read support interfaces: {@link ContinuousReadSupport}.
- * This is exposed for a testing purpose.
- */
-@VisibleForTesting
-public interface StreamingReadSupport extends ReadSupport {
-
- /**
- * Returns the initial offset for a streaming query to start reading from. Note that the
- * streaming data source should not assume that it will start reading from its initial offset:
- * if Spark is restarting an existing query, it will restart from the check-pointed offset rather
- * than the initial one.
- */
- Offset initialOffset();
-
- /**
- * Deserialize a JSON string into an Offset of the implementation-defined offset type.
- *
- * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
- */
- Offset deserializeOffset(String json);
-
- /**
- * Informs the source that Spark has completed processing all data for offsets less than or
- * equal to `end` and will only request offsets greater than `end` in the future.
- */
- void commit(Offset end);
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
new file mode 100644
index 0000000..c7fcc67
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.sources.v2.reader._
+
+/**
+ * Physical plan node for scanning a batch of data from a data source v2.
+ */
+case class BatchScanExec(
+ output: Seq[AttributeReference],
+ @transient scan: Scan) extends DataSourceV2ScanExecBase {
+
+ @transient lazy val batch = scan.toBatch
+
+ // TODO: unify the equal/hashCode implementation for all data source v2 query plans.
+ override def equals(other: Any): Boolean = other match {
+ case other: BatchScanExec => this.batch == other.batch
+ case _ => false
+ }
+
+ override def hashCode(): Int = batch.hashCode()
+
+ override lazy val partitions: Seq[InputPartition] = batch.planInputPartitions()
+
+ override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory()
+
+ override lazy val inputRDD: RDD[InternalRow] = {
+ new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch)
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala
index c735b0e..f54ff60 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala
@@ -20,99 +20,44 @@ package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical
-import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
-import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
import org.apache.spark.sql.execution.streaming.continuous._
-import org.apache.spark.sql.sources.v2.DataSourceV2
import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory, ContinuousReadSupport}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory, ContinuousStream, Offset}
/**
* Physical plan node for scanning data from a streaming data source with continuous mode.
*/
-// TODO: merge it and `MicroBatchScanExec`.
case class ContinuousScanExec(
- output: Seq[AttributeReference],
- @transient source: DataSourceV2,
- @transient options: Map[String, String],
- @transient pushedFilters: Seq[Expression],
- @transient readSupport: ReadSupport,
- @transient scanConfig: ScanConfig)
- extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan {
-
- override def simpleString(maxFields: Int): String = "ScanV2 " + metadataString(maxFields)
+ output: Seq[Attribute],
+ @transient scan: Scan,
+ @transient stream: ContinuousStream,
+ @transient start: Offset) extends DataSourceV2ScanExecBase {
// TODO: unify the equal/hashCode implementation for all data source v2 query plans.
override def equals(other: Any): Boolean = other match {
- case other: ContinuousScanExec =>
- output == other.output && readSupport.getClass == other.readSupport.getClass &&
- options == other.options
+ case other: ContinuousScanExec => this.stream == other.stream
case _ => false
}
- override def hashCode(): Int = {
- Seq(output, source, options).hashCode()
- }
-
- override def outputPartitioning: physical.Partitioning = readSupport match {
- case _ if partitions.length == 1 =>
- SinglePartition
+ override def hashCode(): Int = stream.hashCode()
- case s: OldSupportsReportPartitioning =>
- new DataSourcePartitioning(
- s.outputPartitioning(scanConfig), AttributeMap(output.map(a => a -> a.name)))
+ override lazy val partitions: Seq[InputPartition] = stream.planInputPartitions(start)
- case _ => super.outputPartitioning
+ override lazy val readerFactory: ContinuousPartitionReaderFactory = {
+ stream.createContinuousReaderFactory()
}
- private lazy val partitions: Seq[InputPartition] = readSupport.planInputPartitions(scanConfig)
-
- private lazy val readerFactory = readSupport match {
- case r: ContinuousReadSupport => r.createContinuousReaderFactory(scanConfig)
- case _ => throw new IllegalStateException("unknown read support: " + readSupport)
- }
-
- override val supportsBatch: Boolean = {
- require(partitions.forall(readerFactory.supportColumnarReads) ||
- !partitions.exists(readerFactory.supportColumnarReads),
- "Cannot mix row-based and columnar input partitions.")
-
- partitions.exists(readerFactory.supportColumnarReads)
- }
-
- private lazy val inputRDD: RDD[InternalRow] = readSupport match {
- case _: ContinuousReadSupport =>
- assert(!supportsBatch,
- "continuous stream reader does not support columnar read yet.")
- EpochCoordinatorRef.get(
- sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
- sparkContext.env)
- .askSync[Unit](SetReaderPartitions(partitions.size))
- new ContinuousDataSourceRDD(
- sparkContext,
- sqlContext.conf.continuousStreamingExecutorQueueSize,
- sqlContext.conf.continuousStreamingExecutorPollIntervalMs,
- partitions,
- schema,
- readerFactory.asInstanceOf[ContinuousPartitionReaderFactory])
-
- case _ =>
- new DataSourceRDD(
- sparkContext, partitions, readerFactory.asInstanceOf[PartitionReaderFactory], supportsBatch)
- }
-
- override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
-
- override protected def doExecute(): RDD[InternalRow] = {
- if (supportsBatch) {
- WholeStageCodegenExec(this)(codegenStageId = 0).execute()
- } else {
- val numOutputRows = longMetric("numOutputRows")
- inputRDD.map { r =>
- numOutputRows += 1
- r
- }
- }
+ override lazy val inputRDD: RDD[InternalRow] = {
+ EpochCoordinatorRef.get(
+ sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
+ sparkContext.env)
+ .askSync[Unit](SetReaderPartitions(partitions.size))
+ new ContinuousDataSourceRDD(
+ sparkContext,
+ sqlContext.conf.continuousStreamingExecutorQueueSize,
+ sqlContext.conf.continuousStreamingExecutorPollIntervalMs,
+ partitions,
+ schema,
+ readerFactory.asInstanceOf[ContinuousPartitionReaderFactory])
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 63e97e6..47cf26d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.sources.v2._
@@ -58,8 +58,6 @@ case class DataSourceV2Relation(
case _ => throw new AnalysisException(s"Table is not readable: ${table.name()}")
}
-
-
def newWriteBuilder(schema: StructType): WriteBuilder = table match {
case s: SupportsBatchWrite =>
val dsOptions = new DataSourceOptions(options.asJava)
@@ -94,7 +92,7 @@ case class DataSourceV2Relation(
*/
case class StreamingDataSourceV2Relation(
output: Seq[Attribute],
- scanDesc: String,
+ scan: Scan,
stream: SparkDataStream,
startOffset: Option[Offset] = None,
endOffset: Option[Offset] = None)
@@ -104,7 +102,7 @@ case class StreamingDataSourceV2Relation(
override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))
- override def computeStats(): Statistics = stream match {
+ override def computeStats(): Statistics = scan match {
case r: SupportsReportStatistics =>
val statistics = r.estimateStatistics()
Statistics(sizeInBytes = statistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
@@ -113,46 +111,6 @@ case class StreamingDataSourceV2Relation(
}
}
-// TODO: remove it after finish API refactor for continuous streaming.
-case class OldStreamingDataSourceV2Relation(
- output: Seq[AttributeReference],
- source: DataSourceV2,
- options: Map[String, String],
- readSupport: ReadSupport,
- scanConfigBuilder: ScanConfigBuilder)
- extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {
-
- override def isStreaming: Boolean = true
-
- override def simpleString(maxFields: Int): String = {
- "Streaming RelationV2 " + metadataString(maxFields)
- }
-
- override def pushedFilters: Seq[Expression] = Nil
-
- override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))
-
- // TODO: unify the equal/hashCode implementation for all data source v2 query plans.
- override def equals(other: Any): Boolean = other match {
- case other: OldStreamingDataSourceV2Relation =>
- output == other.output && readSupport.getClass == other.readSupport.getClass &&
- options == other.options
- case _ => false
- }
-
- override def hashCode(): Int = {
- Seq(output, source, options).hashCode()
- }
-
- override def computeStats(): Statistics = readSupport match {
- case r: OldSupportsReportStatistics =>
- val statistics = r.estimateStatistics(scanConfigBuilder.build())
- Statistics(sizeInBytes = statistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
- case _ =>
- Statistics(sizeInBytes = conf.defaultSizeInBytes)
- }
-}
-
object DataSourceV2Relation {
def create(table: Table, options: Map[String, String]): DataSourceV2Relation = {
val output = table.schema().toAttributes
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
similarity index 66%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
index 53e4e77..da71e78 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
@@ -19,39 +19,26 @@ package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.AttributeMap
import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReaderFactory, Scan, SupportsReportPartitioning}
-/**
- * Physical plan node for scanning a batch of data from a data source.
- */
-case class DataSourceV2ScanExec(
- output: Seq[AttributeReference],
- scanDesc: String,
- @transient batch: Batch)
- extends LeafExecNode with ColumnarBatchScan {
-
- override def simpleString(maxFields: Int): String = {
- s"ScanV2${truncatedString(output, "[", ", ", "]", maxFields)} $scanDesc"
- }
+trait DataSourceV2ScanExecBase extends LeafExecNode with ColumnarBatchScan {
- // TODO: unify the equal/hashCode implementation for all data source v2 query plans.
- override def equals(other: Any): Boolean = other match {
- case other: DataSourceV2ScanExec => this.batch == other.batch
- case _ => false
- }
+ def scan: Scan
- override def hashCode(): Int = batch.hashCode()
+ def partitions: Seq[InputPartition]
- private lazy val partitions = batch.planInputPartitions()
+ def readerFactory: PartitionReaderFactory
- private lazy val readerFactory = batch.createReaderFactory()
+ override def simpleString(maxFields: Int): String = {
+ s"$nodeName${truncatedString(output, "[", ", ", "]", maxFields)} ${scan.description()}"
+ }
- override def outputPartitioning: physical.Partitioning = batch match {
+ override def outputPartitioning: physical.Partitioning = scan match {
case _ if partitions.length == 1 =>
SinglePartition
@@ -70,13 +57,11 @@ case class DataSourceV2ScanExec(
partitions.exists(readerFactory.supportColumnarReads)
}
- private lazy val inputRDD: RDD[InternalRow] = {
- new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch)
- }
+ def inputRDD: RDD[InternalRow]
override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
- override protected def doExecute(): RDD[InternalRow] = {
+ override def doExecute(): RDD[InternalRow] = {
if (supportsBatch) {
WholeStageCodegenExec(this)(codegenStageId = 0).execute()
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index b4c5471..40ac5cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, MicroBatchStream}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode
object DataSourceV2Strategy extends Strategy {
@@ -117,7 +117,7 @@ object DataSourceV2Strategy extends Strategy {
|Output: ${output.mkString(", ")}
""".stripMargin)
- val plan = DataSourceV2ScanExec(output, scan.description(), scan.toBatch)
+ val plan = BatchScanExec(output, scan)
val filterCondition = postScanFilters.reduceLeftOption(And)
val withFilter = filterCondition.map(FilterExec(_, plan)).getOrElse(plan)
@@ -130,15 +130,14 @@ object DataSourceV2Strategy extends Strategy {
// ensure there is a projection, which will produce unsafe rows required by some operators
ProjectExec(r.output,
MicroBatchScanExec(
- r.output, r.scanDesc, microBatchStream, r.startOffset.get, r.endOffset.get)) :: Nil
+ r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get)) :: Nil
- case r: OldStreamingDataSourceV2Relation =>
- // TODO: support operator pushdown for streaming data sources.
- val scanConfig = r.scanConfigBuilder.build()
+ case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isEmpty =>
+ val continuousStream = r.stream.asInstanceOf[ContinuousStream]
// ensure there is a projection, which will produce unsafe rows required by some operators
ProjectExec(r.output,
ContinuousScanExec(
- r.output, r.source, r.options, r.pushedFilters, r.readSupport, scanConfig)) :: Nil
+ r.output, r.scan, continuousStream, r.startOffset.get)) :: Nil
case WriteToDataSourceV2(writer, query) =>
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
@@ -158,8 +157,7 @@ object DataSourceV2Strategy extends Strategy {
case Repartition(1, false, child) =>
val isContinuous = child.find {
- case s: OldStreamingDataSourceV2Relation =>
- s.readSupport.isInstanceOf[ContinuousReadSupport]
+ case r: StreamingDataSourceV2Relation => r.stream.isInstanceOf[ContinuousStream]
case _ => false
}.isDefined
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala
index feea8bc..d2e33d4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala
@@ -19,12 +19,8 @@ package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
-import org.apache.spark.sql.catalyst.plans.physical
-import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
-import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
-import org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReaderFactory, Scan}
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset}
/**
@@ -32,14 +28,10 @@ import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offse
*/
case class MicroBatchScanExec(
output: Seq[Attribute],
- scanDesc: String,
+ @transient scan: Scan,
@transient stream: MicroBatchStream,
@transient start: Offset,
- @transient end: Offset) extends LeafExecNode with ColumnarBatchScan {
-
- override def simpleString(maxFields: Int): String = {
- s"ScanV2${truncatedString(output, "[", ", ", "]", maxFields)} $scanDesc"
- }
+ @transient end: Offset) extends DataSourceV2ScanExecBase {
// TODO: unify the equal/hashCode implementation for all data source v2 query plans.
override def equals(other: Any): Boolean = other match {
@@ -49,44 +41,11 @@ case class MicroBatchScanExec(
override def hashCode(): Int = stream.hashCode()
- private lazy val partitions = stream.planInputPartitions(start, end)
-
- private lazy val readerFactory = stream.createReaderFactory()
-
- override def outputPartitioning: physical.Partitioning = stream match {
- case _ if partitions.length == 1 =>
- SinglePartition
-
- case s: SupportsReportPartitioning =>
- new DataSourcePartitioning(
- s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
-
- case _ => super.outputPartitioning
- }
-
- override def supportsBatch: Boolean = {
- require(partitions.forall(readerFactory.supportColumnarReads) ||
- !partitions.exists(readerFactory.supportColumnarReads),
- "Cannot mix row-based and columnar input partitions.")
+ override lazy val partitions: Seq[InputPartition] = stream.planInputPartitions(start, end)
- partitions.exists(readerFactory.supportColumnarReads)
- }
+ override lazy val readerFactory: PartitionReaderFactory = stream.createReaderFactory()
- private lazy val inputRDD: RDD[InternalRow] = {
+ override lazy val inputRDD: RDD[InternalRow] = {
new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch)
}
-
- override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
-
- override protected def doExecute(): RDD[InternalRow] = {
- if (supportsBatch) {
- WholeStageCodegenExec(this)(codegenStageId = 0).execute()
- } else {
- val numOutputRows = longMetric("numOutputRows")
- inputRDD.map { r =>
- numOutputRows += 1
- r
- }
- }
- }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 792e3a3..2c33975 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -99,7 +99,7 @@ class MicroBatchExecution(
// TODO: operator pushdown.
val scan = table.newScanBuilder(dsOptions).build()
val stream = scan.toMicroBatchStream(metadataPath)
- StreamingDataSourceV2Relation(output, scan.description(), stream)
+ StreamingDataSourceV2Relation(output, scan, stream)
})
case s @ StreamingRelationV2(ds, dsName, _, _, output, v1Relation) =>
v2ToExecutionRelationMap.getOrElseUpdate(s, {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SimpleStreamingScanConfigBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SimpleStreamingScanConfigBuilder.scala
deleted file mode 100644
index 1be0716..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SimpleStreamingScanConfigBuilder.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import org.apache.spark.sql.sources.v2.reader.{ScanConfig, ScanConfigBuilder}
-import org.apache.spark.sql.types.StructType
-
-/**
- * A very simple [[ScanConfigBuilder]] implementation that creates a simple [[ScanConfig]] to
- * carry schema and offsets for streaming data sources.
- */
-class SimpleStreamingScanConfigBuilder(
- schema: StructType,
- start: Offset,
- end: Option[Offset] = None)
- extends ScanConfigBuilder {
-
- override def build(): ScanConfig = SimpleStreamingScanConfig(schema, start, end)
-}
-
-case class SimpleStreamingScanConfig(
- readSchema: StructType,
- start: Offset,
- end: Option[Offset])
- extends ScanConfig
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index 535fa1c..83d38dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.datasources.DataSource
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceV2, Table}
+import org.apache.spark.sql.sources.v2.{DataSourceV2, Table}
object StreamingRelation {
def apply(dataSource: DataSource): StreamingRelation = {
@@ -111,30 +111,6 @@ case class StreamingRelationV2(
}
/**
- * Used to link a [[DataSourceV2]] into a continuous processing execution.
- */
-case class ContinuousExecutionRelation(
- source: ContinuousReadSupportProvider,
- extraOptions: Map[String, String],
- output: Seq[Attribute])(session: SparkSession)
- extends LeafNode with MultiInstanceRelation {
-
- override def otherCopyArgs: Seq[AnyRef] = session :: Nil
- override def isStreaming: Boolean = true
- override def toString: String = source.toString
-
- // There's no sensible value here. On the execution path, this relation will be
- // swapped out with microbatches. But some dataframe operations (in particular explain) do lead
- // to this node surviving analysis. So we satisfy the LeafNode contract with the session default
- // value.
- override def computeStats(): Statistics = Statistics(
- sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
- )
-
- override def newInstance(): LogicalPlan = this.copy(output = output.map(_.newInstance()))(session)
-}
-
-/**
* A dummy physical plan for [[StreamingRelation]] to support
* [[org.apache.spark.sql.Dataset.explain]]
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index c74fa14..b22795d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -22,20 +22,18 @@ import java.util.concurrent.TimeUnit
import java.util.function.UnaryOperator
import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+import scala.collection.mutable.{Map => MutableMap}
import org.apache.spark.SparkEnv
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.execution.datasources.v2.{ContinuousScanExec, OldStreamingDataSourceV2Relation}
-import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
+import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _}
import org.apache.spark.sql.sources.v2
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions, StreamingWriteSupportProvider}
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, PartitionOffset}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamingWriteSupportProvider, SupportsContinuousRead}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.Clock
@@ -54,25 +52,39 @@ class ContinuousExecution(
sparkSession, name, checkpointRoot, analyzedPlan, sink,
trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
- @volatile protected var continuousSources: Seq[ContinuousReadSupport] = Seq()
- override protected def sources: Seq[BaseStreamingSource] = continuousSources
+ @volatile protected var sources: Seq[ContinuousStream] = Seq()
// For use only in test harnesses.
private[sql] var currentEpochCoordinatorId: String = _
override val logicalPlan: LogicalPlan = {
- val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
- analyzedPlan.transform {
- case r @ StreamingRelationV2(
- source: ContinuousReadSupportProvider, _, _, extraReaderOptions, output, _) =>
- // TODO: shall we create `ContinuousReadSupport` here instead of each reconfiguration?
- toExecutionRelationMap.getOrElseUpdate(r, {
- ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
+ val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2Relation]()
+ var nextSourceId = 0
+ val _logicalPlan = analyzedPlan.transform {
+ case s @ StreamingRelationV2(
+ ds, dsName, table: SupportsContinuousRead, options, output, _) =>
+ v2ToRelationMap.getOrElseUpdate(s, {
+ val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
+ nextSourceId += 1
+ logInfo(s"Reading table [$table] from DataSourceV2 named '$dsName' [$ds]")
+ val dsOptions = new DataSourceOptions(options.asJava)
+ // TODO: operator pushdown.
+ val scan = table.newScanBuilder(dsOptions).build()
+ val stream = scan.toContinuousStream(metadataPath)
+ StreamingDataSourceV2Relation(output, scan, stream)
})
+
case StreamingRelationV2(_, sourceName, _, _, _, _) =>
throw new UnsupportedOperationException(
s"Data source $sourceName does not support continuous processing.")
}
+
+ sources = _logicalPlan.collect {
+ case r: StreamingDataSourceV2Relation => r.stream.asInstanceOf[ContinuousStream]
+ }
+ uniqueSources = sources.distinct
+
+ _logicalPlan
}
private val triggerExecutor = trigger match {
@@ -92,6 +104,8 @@ class ContinuousExecution(
do {
runContinuous(sparkSessionForStream)
} while (state.updateAndGet(stateUpdate) == ACTIVE)
+
+ stopSources()
}
/**
@@ -135,7 +149,7 @@ class ContinuousExecution(
updateStatusMessage("Starting new streaming query")
logInfo(s"Starting new streaming query.")
currentBatchId = 0
- OffsetSeq.fill(continuousSources.map(_ => null): _*)
+ OffsetSeq.fill(sources.map(_ => null): _*)
}
}
@@ -144,47 +158,17 @@ class ContinuousExecution(
* @param sparkSessionForQuery Isolated [[SparkSession]] to run the continuous query with.
*/
private def runContinuous(sparkSessionForQuery: SparkSession): Unit = {
- // A list of attributes that will need to be updated.
- val replacements = new ArrayBuffer[(Attribute, Attribute)]
- // Translate from continuous relation to the underlying data source.
- var nextSourceId = 0
- continuousSources = logicalPlan.collect {
- case ContinuousExecutionRelation(dataSource, extraReaderOptions, output) =>
- val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
- nextSourceId += 1
-
- dataSource.createContinuousReadSupport(
- metadataPath,
- new DataSourceOptions(extraReaderOptions.asJava))
- }
- uniqueSources = continuousSources.distinct
-
val offsets = getStartOffsets(sparkSessionForQuery)
- var insertedSourceId = 0
- val withNewSources = logicalPlan transform {
- case ContinuousExecutionRelation(source, options, output) =>
- val readSupport = continuousSources(insertedSourceId)
- insertedSourceId += 1
- val newOutput = readSupport.fullSchema().toAttributes
- val maxFields = SQLConf.get.maxToStringFields
- assert(output.size == newOutput.size,
- s"Invalid reader: ${truncatedString(output, ",", maxFields)} != " +
- s"${truncatedString(newOutput, ",", maxFields)}")
- replacements ++= output.zip(newOutput)
-
+ val withNewSources: LogicalPlan = logicalPlan transform {
+ case relation: StreamingDataSourceV2Relation =>
val loggedOffset = offsets.offsets(0)
- val realOffset = loggedOffset.map(off => readSupport.deserializeOffset(off.json))
- val startOffset = realOffset.getOrElse(readSupport.initialOffset)
- val scanConfigBuilder = readSupport.newScanConfigBuilder(startOffset)
- OldStreamingDataSourceV2Relation(newOutput, source, options, readSupport, scanConfigBuilder)
+ val realOffset = loggedOffset.map(off => relation.stream.deserializeOffset(off.json))
+ val startOffset = realOffset.getOrElse(relation.stream.initialOffset)
+ relation.copy(startOffset = Some(startOffset))
}
- // Rewire the plan to use the new attributes that were returned by the source.
- val replacementMap = AttributeMap(replacements)
- val triggerLogicalPlan = withNewSources transformAllExpressions {
- case a: Attribute if replacementMap.contains(a) =>
- replacementMap(a).withMetadata(a.metadata)
+ withNewSources.transformAllExpressions {
case (_: CurrentTimestamp | _: CurrentDate) =>
throw new IllegalStateException(
"CurrentTimestamp and CurrentDate not yet supported for continuous processing")
@@ -192,15 +176,15 @@ class ContinuousExecution(
val writer = sink.createStreamingWriteSupport(
s"$runId",
- triggerLogicalPlan.schema,
+ withNewSources.schema,
outputMode,
new DataSourceOptions(extraOptions.asJava))
- val withSink = WriteToContinuousDataSource(writer, triggerLogicalPlan)
+ val planWithSink = WriteToContinuousDataSource(writer, withNewSources)
reportTimeTaken("queryPlanning") {
lastExecution = new IncrementalExecution(
sparkSessionForQuery,
- withSink,
+ planWithSink,
outputMode,
checkpointFile("state"),
id,
@@ -210,10 +194,9 @@ class ContinuousExecution(
lastExecution.executedPlan // Force the lazy generation of execution plan
}
- val (readSupport, scanConfig) = lastExecution.executedPlan.collect {
- case scan: ContinuousScanExec
- if scan.readSupport.isInstanceOf[ContinuousReadSupport] =>
- scan.readSupport.asInstanceOf[ContinuousReadSupport] -> scan.scanConfig
+ val stream = planWithSink.collect {
+ case relation: StreamingDataSourceV2Relation =>
+ relation.stream.asInstanceOf[ContinuousStream]
}.head
sparkSessionForQuery.sparkContext.setLocalProperty(
@@ -233,16 +216,14 @@ class ContinuousExecution(
// Use the parent Spark session for the endpoint since it's where this query ID is registered.
val epochEndpoint =
EpochCoordinatorRef.create(
- writer, readSupport, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get)
+ writer, stream, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get)
val epochUpdateThread = new Thread(new Runnable {
override def run: Unit = {
try {
triggerExecutor.execute(() => {
startTrigger()
- val shouldReconfigure = readSupport.needsReconfiguration(scanConfig) &&
- state.compareAndSet(ACTIVE, RECONFIGURING)
- if (shouldReconfigure) {
+ if (stream.needsReconfiguration && state.compareAndSet(ACTIVE, RECONFIGURING)) {
if (queryExecutionThread.isAlive) {
queryExecutionThread.interrupt()
}
@@ -289,7 +270,6 @@ class ContinuousExecution(
epochUpdateThread.interrupt()
epochUpdateThread.join()
- stopSources()
sparkSession.sparkContext.cancelJobGroup(runId.toString)
}
}
@@ -299,11 +279,11 @@ class ContinuousExecution(
*/
def addOffset(
epoch: Long,
- readSupport: ContinuousReadSupport,
+ stream: ContinuousStream,
partitionOffsets: Seq[PartitionOffset]): Unit = {
- assert(continuousSources.length == 1, "only one continuous source supported currently")
+ assert(sources.length == 1, "only one continuous source supported currently")
- val globalOffset = readSupport.mergeOffsets(partitionOffsets.toArray)
+ val globalOffset = stream.mergeOffsets(partitionOffsets.toArray)
val oldOffset = synchronized {
offsetLog.add(epoch, OffsetSeq.fill(globalOffset))
offsetLog.get(epoch - 1)
@@ -329,7 +309,7 @@ class ContinuousExecution(
def commit(epoch: Long): Unit = {
updateStatusMessage(s"Committing epoch $epoch")
- assert(continuousSources.length == 1, "only one continuous source supported currently")
+ assert(sources.length == 1, "only one continuous source supported currently")
assert(offsetLog.get(epoch).isDefined, s"offset for epoch $epoch not reported before commit")
synchronized {
@@ -338,9 +318,9 @@ class ContinuousExecution(
if (queryExecutionThread.isAlive) {
commitLog.add(epoch, CommitMetadata())
val offset =
- continuousSources(0).deserializeOffset(offsetLog.get(epoch).get.offsets(0).get.json)
- committedOffsets ++= Seq(continuousSources(0) -> offset)
- continuousSources(0).commit(offset.asInstanceOf[v2.reader.streaming.Offset])
+ sources(0).deserializeOffset(offsetLog.get(epoch).get.offsets(0).get.json)
+ committedOffsets ++= Seq(sources(0) -> offset)
+ sources(0).commit(offset.asInstanceOf[v2.reader.streaming.Offset])
} else {
return
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
index a6cde2b..48ff70f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
@@ -22,23 +22,22 @@ import org.json4s.jackson.Serialization
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.streaming.{RateStreamOffset, SimpleStreamingScanConfig, SimpleStreamingScanConfigBuilder, ValueRunTimeMsPair}
-import org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
+import org.apache.spark.sql.execution.streaming.{RateStreamOffset, ValueRunTimeMsPair}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming._
-import org.apache.spark.sql.types.StructType
case class RateStreamPartitionOffset(
partition: Int, currentValue: Long, currentTimeMs: Long) extends PartitionOffset
-class RateStreamContinuousReadSupport(options: DataSourceOptions) extends ContinuousReadSupport {
+class RateStreamContinuousStream(
+ rowsPerSecond: Long,
+ numPartitions: Int,
+ options: DataSourceOptions) extends ContinuousStream {
implicit val defaultFormats: DefaultFormats = DefaultFormats
val creationTime = System.currentTimeMillis()
- val numPartitions = options.get(RateStreamProvider.NUM_PARTITIONS).orElse("5").toInt
- val rowsPerSecond = options.get(RateStreamProvider.ROWS_PER_SECOND).orElse("6").toLong
val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble
override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
@@ -54,18 +53,10 @@ class RateStreamContinuousReadSupport(options: DataSourceOptions) extends Contin
RateStreamOffset(Serialization.read[Map[Int, ValueRunTimeMsPair]](json))
}
- override def fullSchema(): StructType = RateStreamProvider.SCHEMA
-
- override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = {
- new SimpleStreamingScanConfigBuilder(fullSchema(), start)
- }
-
override def initialOffset: Offset = createInitialOffset(numPartitions, creationTime)
- override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
- val startOffset = config.asInstanceOf[SimpleStreamingScanConfig].start
-
- val partitionStartMap = startOffset match {
+ override def planInputPartitions(start: Offset): Array[InputPartition] = {
+ val partitionStartMap = start match {
case off: RateStreamOffset => off.partitionToValueAndRunTimeMs
case off =>
throw new IllegalArgumentException(
@@ -91,8 +82,7 @@ class RateStreamContinuousReadSupport(options: DataSourceOptions) extends Contin
}.toArray
}
- override def createContinuousReaderFactory(
- config: ScanConfig): ContinuousPartitionReaderFactory = {
+ override def createContinuousReaderFactory(): ContinuousPartitionReaderFactory = {
RateStreamContinuousReaderFactory
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
index 28ab244..e7bc713 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
@@ -31,37 +31,29 @@ import org.json4s.jackson.Serialization
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.RpcEndpointRef
-import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.streaming.{Offset => _, _}
import org.apache.spark.sql.execution.streaming.sources.TextSocketReader
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming._
-import org.apache.spark.sql.types.StructType
import org.apache.spark.util.RpcUtils
/**
- * A ContinuousReadSupport that reads text lines through a TCP socket, designed only for tutorials
- * and debugging. This ContinuousReadSupport will *not* work in production applications due to
+ * A [[ContinuousStream]] that reads text lines through a TCP socket, designed only for tutorials
+ * and debugging. This ContinuousStream will *not* work in production applications due to
* multiple reasons, including no support for fault recovery.
*
* The driver maintains a socket connection to the host-port, keeps the received messages in
* buckets and serves the messages to the executors via a RPC endpoint.
*/
-class TextSocketContinuousReadSupport(options: DataSourceOptions)
- extends ContinuousReadSupport with Logging {
+class TextSocketContinuousStream(
+ host: String, port: Int, numPartitions: Int, options: DataSourceOptions)
+ extends ContinuousStream with Logging {
implicit val defaultFormats: DefaultFormats = DefaultFormats
- private val host: String = options.get("host").get()
- private val port: Int = options.get("port").get().toInt
-
- assert(SparkSession.getActiveSession.isDefined)
- private val spark = SparkSession.getActiveSession.get
- private val numPartitions = spark.sparkContext.defaultParallelism
-
@GuardedBy("this")
private var socket: Socket = _
@@ -101,21 +93,9 @@ class TextSocketContinuousReadSupport(options: DataSourceOptions)
startOffset
}
- override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = {
- new SimpleStreamingScanConfigBuilder(fullSchema(), start)
- }
-
- override def fullSchema(): StructType = {
- if (includeTimestamp) {
- TextSocketReader.SCHEMA_TIMESTAMP
- } else {
- TextSocketReader.SCHEMA_REGULAR
- }
- }
- override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
- val startOffset = config.asInstanceOf[SimpleStreamingScanConfig]
- .start.asInstanceOf[TextSocketOffset]
+ override def planInputPartitions(start: Offset): Array[InputPartition] = {
+ val startOffset = start.asInstanceOf[TextSocketOffset]
recordEndpoint.setStartOffsets(startOffset.offsets)
val endpointName = s"TextSocketContinuousReaderEndpoint-${java.util.UUID.randomUUID()}"
endpointRef = recordEndpoint.rpcEnv.setupEndpoint(endpointName, recordEndpoint)
@@ -140,8 +120,7 @@ class TextSocketContinuousReadSupport(options: DataSourceOptions)
}.toArray
}
- override def createContinuousReaderFactory(
- config: ScanConfig): ContinuousPartitionReaderFactory = {
+ override def createContinuousReaderFactory(): ContinuousPartitionReaderFactory = {
TextSocketReaderFactory
}
@@ -197,7 +176,7 @@ class TextSocketContinuousReadSupport(options: DataSourceOptions)
logWarning(s"Stream closed by $host:$port")
return
}
- TextSocketContinuousReadSupport.this.synchronized {
+ TextSocketContinuousStream.this.synchronized {
currentOffset += 1
val newData = (line,
Timestamp.valueOf(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
index 2238ce2..d1bda79 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
@@ -23,7 +23,7 @@ import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, PartitionOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
import org.apache.spark.util.RpcUtils
@@ -83,14 +83,14 @@ private[sql] object EpochCoordinatorRef extends Logging {
*/
def create(
writeSupport: StreamingWriteSupport,
- readSupport: ContinuousReadSupport,
+ stream: ContinuousStream,
query: ContinuousExecution,
epochCoordinatorId: String,
startEpoch: Long,
session: SparkSession,
env: SparkEnv): RpcEndpointRef = synchronized {
val coordinator = new EpochCoordinator(
- writeSupport, readSupport, query, startEpoch, session, env.rpcEnv)
+ writeSupport, stream, query, startEpoch, session, env.rpcEnv)
val ref = env.rpcEnv.setupEndpoint(endpointName(epochCoordinatorId), coordinator)
logInfo("Registered EpochCoordinator endpoint")
ref
@@ -116,7 +116,7 @@ private[sql] object EpochCoordinatorRef extends Logging {
*/
private[continuous] class EpochCoordinator(
writeSupport: StreamingWriteSupport,
- readSupport: ContinuousReadSupport,
+ stream: ContinuousStream,
query: ContinuousExecution,
startEpoch: Long,
session: SparkSession,
@@ -220,7 +220,7 @@ private[continuous] class EpochCoordinator(
partitionOffsets.collect { case ((e, _), o) if e == epoch => o }
if (thisEpochOffsets.size == numReaderPartitions) {
logDebug(s"Epoch $epoch has offsets reported from all partitions: $thisEpochOffsets")
- query.addOffset(epoch, readSupport, thisEpochOffsets.toSeq)
+ query.addOffset(epoch, stream, thisEpochOffsets.toSeq)
resolveCommitsAtEpoch(epoch)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 5406679..e71f81c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -33,9 +33,9 @@ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUti
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsMicroBatchRead, Table, TableProvider}
+import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream, Offset => OffsetV2}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
@@ -68,7 +68,15 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends Bas
def fullSchema(): StructType = encoder.schema
- protected def logicalPlan: LogicalPlan
+ protected val logicalPlan: LogicalPlan = {
+ StreamingRelationV2(
+ MemoryStreamTableProvider,
+ "memory",
+ new MemoryStreamTable(this),
+ Map.empty,
+ attributes,
+ None)(sqlContext.sparkSession)
+ }
def addData(data: TraversableOnce[A]): Offset
}
@@ -81,7 +89,8 @@ object MemoryStreamTableProvider extends TableProvider {
}
}
-class MemoryStreamTable(val stream: MemoryStreamBase[_]) extends Table with SupportsMicroBatchRead {
+class MemoryStreamTable(val stream: MemoryStreamBase[_]) extends Table
+ with SupportsMicroBatchRead with SupportsContinuousRead {
override def name(): String = "MemoryStreamDataSource"
@@ -101,7 +110,11 @@ class MemoryStreamScanBuilder(stream: MemoryStreamBase[_]) extends ScanBuilder w
override def readSchema(): StructType = stream.fullSchema()
override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
- stream.asInstanceOf[MemoryStream[_]]
+ stream.asInstanceOf[MicroBatchStream]
+ }
+
+ override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
+ stream.asInstanceOf[ContinuousStream]
}
}
@@ -113,16 +126,6 @@ class MemoryStreamScanBuilder(stream: MemoryStreamBase[_]) extends ScanBuilder w
case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
extends MemoryStreamBase[A](sqlContext) with MicroBatchStream with Logging {
- protected val logicalPlan: LogicalPlan = {
- StreamingRelationV2(
- MemoryStreamTableProvider,
- "memory",
- new MemoryStreamTable(this),
- Map.empty,
- attributes,
- None)(sqlContext.sparkSession)
- }
-
protected val output = logicalPlan.output
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
index 8c5c9ef..41eaf84 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
@@ -30,8 +30,7 @@ import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.sql.{Encoder, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.streaming.{Offset => _, _}
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions}
-import org.apache.spark.sql.sources.v2.reader.{InputPartition, ScanConfig, ScanConfigBuilder}
+import org.apache.spark.sql.sources.v2.reader.InputPartition
import org.apache.spark.sql.sources.v2.reader.streaming._
import org.apache.spark.util.RpcUtils
@@ -44,15 +43,10 @@ import org.apache.spark.util.RpcUtils
* the specified offset within the list, or null if that offset doesn't yet have a record.
*/
class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPartitions: Int = 2)
- extends MemoryStreamBase[A](sqlContext)
- with ContinuousReadSupportProvider with ContinuousReadSupport {
+ extends MemoryStreamBase[A](sqlContext) with ContinuousStream {
private implicit val formats = Serialization.formats(NoTypeHints)
- protected val logicalPlan =
- // TODO: don't pass null as table after finish API refactor for continuous stream.
- StreamingRelationV2(this, "memory", null, Map(), attributes, None)(sqlContext.sparkSession)
-
// ContinuousReader implementation
@GuardedBy("this")
@@ -87,13 +81,9 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa
)
}
- override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = {
- new SimpleStreamingScanConfigBuilder(fullSchema(), start)
- }
- override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
- val startOffset = config.asInstanceOf[SimpleStreamingScanConfig]
- .start.asInstanceOf[ContinuousMemoryStreamOffset]
+ override def planInputPartitions(start: Offset): Array[InputPartition] = {
+ val startOffset = start.asInstanceOf[ContinuousMemoryStreamOffset]
synchronized {
val endpointName = s"ContinuousMemoryStreamRecordEndpoint-${java.util.UUID.randomUUID()}-$id"
endpointRef =
@@ -105,8 +95,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa
}
}
- override def createContinuousReaderFactory(
- config: ScanConfig): ContinuousPartitionReaderFactory = {
+ override def createContinuousReaderFactory(): ContinuousPartitionReaderFactory = {
ContinuousMemoryStreamReaderFactory
}
@@ -115,12 +104,6 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa
}
override def commit(end: Offset): Unit = {}
-
- // ContinuousReadSupportProvider implementation
- // This is necessary because of how StreamTest finds the source for AddDataMemory steps.
- override def createContinuousReadSupport(
- checkpointLocation: String,
- options: DataSourceOptions): ContinuousReadSupport = this
}
object ContinuousMemoryStream {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
index 8d334f0..075c6b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
@@ -19,11 +19,11 @@ package org.apache.spark.sql.execution.streaming.sources
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReadSupport
+import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousStream
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, MicroBatchStream}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.types._
/**
@@ -41,7 +41,7 @@ import org.apache.spark.sql.types._
* be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed.
*/
class RateStreamProvider extends DataSourceV2
- with TableProvider with ContinuousReadSupportProvider with DataSourceRegister {
+ with TableProvider with DataSourceRegister {
import RateStreamProvider._
override def getTable(options: DataSourceOptions): Table = {
@@ -68,12 +68,6 @@ class RateStreamProvider extends DataSourceV2
new RateStreamTable(rowsPerSecond, rampUpTimeSeconds, numPartitions)
}
- override def createContinuousReadSupport(
- checkpointLocation: String,
- options: DataSourceOptions): ContinuousReadSupport = {
- new RateStreamContinuousReadSupport(options)
- }
-
override def shortName(): String = "rate"
}
@@ -81,7 +75,7 @@ class RateStreamTable(
rowsPerSecond: Long,
rampUpTimeSeconds: Long,
numPartitions: Int)
- extends Table with SupportsMicroBatchRead {
+ extends Table with SupportsMicroBatchRead with SupportsContinuousRead {
override def name(): String = {
s"RateStream(rowsPerSecond=$rowsPerSecond, rampUpTimeSeconds=$rampUpTimeSeconds, " +
@@ -98,6 +92,10 @@ class RateStreamTable(
new RateStreamMicroBatchStream(
rowsPerSecond, rampUpTimeSeconds, numPartitions, options, checkpointLocation)
}
+
+ override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
+ new RateStreamContinuousStream(rowsPerSecond, numPartitions, options)
+ }
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
index ddf398b..540131c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
@@ -26,7 +26,6 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.ListBuffer
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming.LongOffset
@@ -40,7 +39,8 @@ import org.apache.spark.unsafe.types.UTF8String
* and debugging. This MicroBatchReadSupport will *not* work in production applications due to
* multiple reasons, including no support for fault recovery.
*/
-class TextSocketMicroBatchStream(host: String, port: Int, options: DataSourceOptions)
+class TextSocketMicroBatchStream(
+ host: String, port: Int, numPartitions: Int, options: DataSourceOptions)
extends MicroBatchStream with Logging {
@GuardedBy("this")
@@ -124,10 +124,6 @@ class TextSocketMicroBatchStream(host: String, port: Int, options: DataSourceOpt
batches.slice(sliceStart, sliceEnd)
}
- assert(SparkSession.getActiveSession.isDefined)
- val spark = SparkSession.getActiveSession.get
- val numPartitions = spark.sparkContext.defaultParallelism
-
val slices = Array.fill(numPartitions)(new ListBuffer[(UTF8String, Long)])
rawList.zipWithIndex.foreach { case (r, idx) =>
slices(idx % numPartitions).append(r)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
index 3500778..c3b24a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
@@ -24,16 +24,15 @@ import scala.util.{Failure, Success, Try}
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
-import org.apache.spark.sql.execution.streaming.continuous.TextSocketContinuousReadSupport
+import org.apache.spark.sql.execution.streaming.continuous.TextSocketContinuousStream
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, MicroBatchStream}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
class TextSocketSourceProvider extends DataSourceV2
- with TableProvider with ContinuousReadSupportProvider
- with DataSourceRegister with Logging {
+ with TableProvider with DataSourceRegister with Logging {
private def checkParameters(params: DataSourceOptions): Unit = {
logWarning("The socket source should not be used for production applications! " +
@@ -58,22 +57,16 @@ class TextSocketSourceProvider extends DataSourceV2
new TextSocketTable(
options.get("host").get,
options.getInt("port", -1),
+ options.getInt("numPartitions", SparkSession.active.sparkContext.defaultParallelism),
options.getBoolean("includeTimestamp", false))
}
- override def createContinuousReadSupport(
- checkpointLocation: String,
- options: DataSourceOptions): ContinuousReadSupport = {
- checkParameters(options)
- new TextSocketContinuousReadSupport(options)
- }
-
/** String that represents the format that this data source provider uses. */
override def shortName(): String = "socket"
}
-class TextSocketTable(host: String, port: Int, includeTimestamp: Boolean)
- extends Table with SupportsMicroBatchRead {
+class TextSocketTable(host: String, port: Int, numPartitions: Int, includeTimestamp: Boolean)
+ extends Table with SupportsMicroBatchRead with SupportsContinuousRead {
override def name(): String = s"Socket[$host:$port]"
@@ -90,7 +83,11 @@ class TextSocketTable(host: String, port: Int, includeTimestamp: Boolean)
override def readSchema(): StructType = schema()
override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
- new TextSocketMicroBatchStream(host, port, options)
+ new TextSocketMicroBatchStream(host, port, numPartitions, options)
+ }
+
+ override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
+ new TextSocketContinuousStream(host, port, numPartitions, options)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 417dd55..8666818 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -30,9 +30,7 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2}
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.sources.v2._
-import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport
import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.Utils
/**
* Interface used to load a streaming `Dataset` from external storage systems (e.g. file systems,
@@ -183,39 +181,12 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
case _ => provider.getTable(dsOptions)
}
table match {
- case s: SupportsMicroBatchRead =>
+ case _: SupportsMicroBatchRead | _: SupportsContinuousRead =>
Dataset.ofRows(
sparkSession,
StreamingRelationV2(
- provider, source, s, options,
- table.schema.toAttributes, v1Relation)(sparkSession))
-
- case _ if ds.isInstanceOf[ContinuousReadSupportProvider] =>
- val provider = ds.asInstanceOf[ContinuousReadSupportProvider]
- var tempReadSupport: ContinuousReadSupport = null
- val schema = try {
- val tmpCheckpointPath = Utils.createTempDir(namePrefix = s"tempCP").getCanonicalPath
- tempReadSupport = if (userSpecifiedSchema.isDefined) {
- provider.createContinuousReadSupport(
- userSpecifiedSchema.get, tmpCheckpointPath, dsOptions)
- } else {
- provider.createContinuousReadSupport(tmpCheckpointPath, dsOptions)
- }
- tempReadSupport.fullSchema()
- } finally {
- // Stop tempReader to avoid side-effect thing
- if (tempReadSupport != null) {
- tempReadSupport.stop()
- tempReadSupport = null
- }
- }
- Dataset.ofRows(
- sparkSession,
- // TODO: do not pass null as table after finish the API refactor for continuous
- // stream.
- StreamingRelationV2(
- provider, source, table = null, options,
- schema.toAttributes, v1Relation)(sparkSession))
+ provider, source, table, options, table.schema.toAttributes, v1Relation)(
+ sparkSession))
// fallback to v1
case _ => Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
index d40a1fd..d0418f8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relati
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.functions._
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader.streaming.Offset
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.util.ManualClock
@@ -308,30 +308,17 @@ class RateStreamProviderSuite extends StreamTest {
"rate source does not support user-specified schema"))
}
- test("continuous in registry") {
- DataSource.lookupDataSource("rate", spark.sqlContext.conf).
- getConstructor().newInstance() match {
- case ds: ContinuousReadSupportProvider =>
- val readSupport = ds.createContinuousReadSupport(
- "", DataSourceOptions.empty())
- assert(readSupport.isInstanceOf[RateStreamContinuousReadSupport])
- case _ =>
- throw new IllegalStateException("Could not find read support for continuous rate")
- }
- }
-
test("continuous data") {
- val readSupport = new RateStreamContinuousReadSupport(
- new DataSourceOptions(Map("numPartitions" -> "2", "rowsPerSecond" -> "20").asJava))
- val config = readSupport.newScanConfigBuilder(readSupport.initialOffset).build()
- val tasks = readSupport.planInputPartitions(config)
- val readerFactory = readSupport.createContinuousReaderFactory(config)
- assert(tasks.size == 2)
+ val stream = new RateStreamContinuousStream(
+ rowsPerSecond = 20, numPartitions = 2, options = DataSourceOptions.empty())
+ val partitions = stream.planInputPartitions(stream.initialOffset)
+ val readerFactory = stream.createContinuousReaderFactory()
+ assert(partitions.size == 2)
val data = scala.collection.mutable.ListBuffer[InternalRow]()
- tasks.foreach {
+ partitions.foreach {
case t: RateStreamContinuousInputPartition =>
- val startTimeMs = readSupport.initialOffset()
+ val startTimeMs = stream.initialOffset()
.asInstanceOf[RateStreamOffset]
.partitionToValueAndRunTimeMs(t.partitionIndex)
.runTimeMs
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
index cf069d5..33c65d7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
@@ -29,6 +29,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming._
@@ -294,25 +295,25 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
serverThread = new ServerThread()
serverThread.start()
- val readSupport = new TextSocketContinuousReadSupport(
- new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost",
- "port" -> serverThread.port.toString).asJava))
-
- val scanConfig = readSupport.newScanConfigBuilder(readSupport.initialOffset()).build()
- val tasks = readSupport.planInputPartitions(scanConfig)
- assert(tasks.size == 2)
+ val stream = new TextSocketContinuousStream(
+ host = "localhost",
+ port = serverThread.port,
+ numPartitions = 2,
+ options = DataSourceOptions.empty())
+ val partitions = stream.planInputPartitions(stream.initialOffset())
+ assert(partitions.length == 2)
val numRecords = 10
val data = scala.collection.mutable.ListBuffer[Int]()
val offsets = scala.collection.mutable.ListBuffer[Int]()
- val readerFactory = readSupport.createContinuousReaderFactory(scanConfig)
+ val readerFactory = stream.createContinuousReaderFactory()
import org.scalatest.time.SpanSugar._
failAfter(5 seconds) {
// inject rows, read and check the data and offsets
for (i <- 0 until numRecords) {
serverThread.enqueue(i.toString)
}
- tasks.foreach {
+ partitions.foreach {
case t: TextSocketContinuousInputPartition =>
val r = readerFactory.createReader(t).asInstanceOf[TextSocketContinuousPartitionReader]
for (i <- 0 until numRecords / 2) {
@@ -330,15 +331,15 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
data.clear()
case _ => throw new IllegalStateException("Unexpected task type")
}
- assert(readSupport.startOffset.offsets == List(3, 3))
- readSupport.commit(TextSocketOffset(List(5, 5)))
- assert(readSupport.startOffset.offsets == List(5, 5))
+ assert(stream.startOffset.offsets == List(3, 3))
+ stream.commit(TextSocketOffset(List(5, 5)))
+ assert(stream.startOffset.offsets == List(5, 5))
}
def commitOffset(partition: Int, offset: Int): Unit = {
- val offsetsToCommit = readSupport.startOffset.offsets.updated(partition, offset)
- readSupport.commit(TextSocketOffset(offsetsToCommit))
- assert(readSupport.startOffset.offsets == offsetsToCommit)
+ val offsetsToCommit = stream.startOffset.offsets.updated(partition, offset)
+ stream.commit(TextSocketOffset(offsetsToCommit))
+ assert(stream.startOffset.offsets == offsetsToCommit)
}
}
@@ -346,13 +347,15 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
serverThread = new ServerThread()
serverThread.start()
- val readSupport = new TextSocketContinuousReadSupport(
- new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost",
- "port" -> serverThread.port.toString).asJava))
+ val stream = new TextSocketContinuousStream(
+ host = "localhost",
+ port = serverThread.port,
+ numPartitions = 2,
+ options = DataSourceOptions.empty())
- readSupport.startOffset = TextSocketOffset(List(5, 5))
+ stream.startOffset = TextSocketOffset(List(5, 5))
assertThrows[IllegalStateException] {
- readSupport.commit(TextSocketOffset(List(6, 6)))
+ stream.commit(TextSocketOffset(List(6, 6)))
}
}
@@ -360,21 +363,21 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
serverThread = new ServerThread()
serverThread.start()
- val readSupport = new TextSocketContinuousReadSupport(
- new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost",
- "includeTimestamp" -> "true",
- "port" -> serverThread.port.toString).asJava))
- val scanConfig = readSupport.newScanConfigBuilder(readSupport.initialOffset()).build()
- val tasks = readSupport.planInputPartitions(scanConfig)
- assert(tasks.size == 2)
+ val stream = new TextSocketContinuousStream(
+ host = "localhost",
+ port = serverThread.port,
+ numPartitions = 2,
+ options = new DataSourceOptions(Map("includeTimestamp" -> "true").asJava))
+ val partitions = stream.planInputPartitions(stream.initialOffset())
+ assert(partitions.size == 2)
val numRecords = 4
// inject rows, read and check the data and offsets
for (i <- 0 until numRecords) {
serverThread.enqueue(i.toString)
}
- val readerFactory = readSupport.createContinuousReaderFactory(scanConfig)
- tasks.foreach {
+ val readerFactory = stream.createContinuousReaderFactory()
+ partitions.foreach {
case t: TextSocketContinuousInputPartition =>
val r = readerFactory.createReader(t).asInstanceOf[TextSocketContinuousPartitionReader]
for (_ <- 0 until numRecords / 2) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index c60ea4a..511fdfe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -24,7 +24,7 @@ import test.org.apache.spark.sql.sources.v2._
import org.apache.spark.SparkException
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanExec}
+import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation}
import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
import org.apache.spark.sql.functions._
@@ -40,14 +40,14 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
private def getBatch(query: DataFrame): AdvancedBatch = {
query.queryExecution.executedPlan.collect {
- case d: DataSourceV2ScanExec =>
+ case d: BatchScanExec =>
d.batch.asInstanceOf[AdvancedBatch]
}.head
}
private def getJavaBatch(query: DataFrame): JavaAdvancedDataSourceV2.AdvancedBatch = {
query.queryExecution.executedPlan.collect {
- case d: DataSourceV2ScanExec =>
+ case d: BatchScanExec =>
d.batch.asInstanceOf[JavaAdvancedDataSourceV2.AdvancedBatch]
}.head
}
@@ -309,7 +309,7 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
assert(logical.canonicalized.output.length == logicalNumOutput)
val physical = df.queryExecution.executedPlan.collect {
- case d: DataSourceV2ScanExec => d
+ case d: BatchScanExec => d
}.head
assert(physical.canonicalized.output.length == physicalNumOutput)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 868b43c..659deb8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -495,7 +495,7 @@ class StreamSuite extends StreamTest {
// `extended = false` only displays the physical plan.
assert("StreamingDataSourceV2Relation".r
.findAllMatchIn(explainWithoutExtended).size === 0)
- assert("ScanV2".r
+ assert("BatchScan".r
.findAllMatchIn(explainWithoutExtended).size === 1)
// Use "StateStoreRestore" to verify that it does output a streaming physical plan
assert(explainWithoutExtended.contains("StateStoreRestore"))
@@ -505,7 +505,7 @@ class StreamSuite extends StreamTest {
// plan.
assert("StreamingDataSourceV2Relation".r
.findAllMatchIn(explainWithExtended).size === 3)
- assert("ScanV2".r
+ assert("BatchScan".r
.findAllMatchIn(explainWithExtended).size === 1)
// Use "StateStoreRestore" to verify that it does output a streaming physical plan
assert(explainWithExtended.contains("StateStoreRestore"))
@@ -548,17 +548,17 @@ class StreamSuite extends StreamTest {
val explainWithoutExtended = q.explainInternal(false)
// `extended = false` only displays the physical plan.
- assert("Streaming RelationV2 ContinuousMemoryStream".r
+ assert("StreamingDataSourceV2Relation".r
.findAllMatchIn(explainWithoutExtended).size === 0)
- assert("ScanV2 ContinuousMemoryStream".r
+ assert("ContinuousScan".r
.findAllMatchIn(explainWithoutExtended).size === 1)
val explainWithExtended = q.explainInternal(true)
// `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical
// plan.
- assert("Streaming RelationV2 ContinuousMemoryStream".r
+ assert("StreamingDataSourceV2Relation".r
.findAllMatchIn(explainWithExtended).size === 3)
- assert("ScanV2 ContinuousMemoryStream".r
+ assert("ContinuousScan".r
.findAllMatchIn(explainWithExtended).size === 1)
} finally {
q.stop()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index b4bd6f6..da49683 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -39,12 +39,11 @@ import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, Ro
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.AllTuples
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.datasources.v2.{OldStreamingDataSourceV2Relation, StreamingDataSourceV2Relation}
+import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, EpochCoordinatorRef, IncrementAndGetEpoch}
import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2
import org.apache.spark.sql.execution.streaming.state.StateStore
-import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport
import org.apache.spark.sql.streaming.StreamingQueryListener._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.{Clock, SystemClock, Utils}
@@ -692,16 +691,10 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
case r: StreamingExecutionRelation => r.source
// v2 source
case r: StreamingDataSourceV2Relation => r.stream
- case r: OldStreamingDataSourceV2Relation => r.readSupport
// We can add data to memory stream before starting it. Then the input plan has
// not been processed by the streaming engine and contains `StreamingRelationV2`.
case r: StreamingRelationV2 if r.sourceName == "memory" =>
- // TODO: remove this null hack after finish API refactor for continuous stream.
- if (r.table == null) {
- r.dataSource.asInstanceOf[ContinuousReadSupport]
- } else {
- r.table.asInstanceOf[MemoryStreamTable].stream
- }
+ r.table.asInstanceOf[MemoryStreamTable].stream
}
.zipWithIndex
.find(_._1 == source)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 62fde98..dc22e31 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.TestForeachWriter
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.reader.{InputPartition, ScanConfig}
+import org.apache.spark.sql.sources.v2.reader.InputPartition
import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
import org.apache.spark.sql.streaming.util.{BlockingSource, MockSourceProvider, StreamManualClock}
import org.apache.spark.sql.types.StructType
@@ -911,7 +911,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
testStream(df, useV2Sink = true)(
StartStream(trigger = Trigger.Continuous(100)),
- AssertOnQuery(_.logicalPlan.toJSON.contains("ContinuousExecutionRelation"))
+ AssertOnQuery(_.logicalPlan.toJSON.contains("StreamingDataSourceV2Relation"))
)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
index d6819ea..d3d210c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.execution.streaming.continuous._
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReader, ContinuousReadSupport, PartitionOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReader, ContinuousStream, PartitionOffset}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.types.{DataType, IntegerType, StructType}
@@ -44,7 +44,7 @@ class ContinuousQueuedDataReaderSuite extends StreamTest with MockitoSugar {
super.beforeEach()
epochEndpoint = EpochCoordinatorRef.create(
mock[StreamingWriteSupport],
- mock[ContinuousReadSupport],
+ mock[ContinuousStream],
mock[ContinuousExecution],
coordinatorId,
startEpoch,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index f85cae9..344a8aa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -41,7 +41,7 @@ class ContinuousSuiteBase extends StreamTest {
case s: ContinuousExecution =>
assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure query is initialized")
val reader = s.lastExecution.executedPlan.collectFirst {
- case ContinuousScanExec(_, _, _, _, r: RateStreamContinuousReadSupport, _) => r
+ case ContinuousScanExec(_, _, r: RateStreamContinuousStream, _) => r
}.get
val deltaMs = numTriggers * 1000 + 300
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
index e644c16..a0b56ec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.sql.LocalSparkSession
import org.apache.spark.sql.execution.streaming.continuous._
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, PartitionOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
import org.apache.spark.sql.test.TestSparkSession
@@ -45,7 +45,7 @@ class EpochCoordinatorSuite
private var orderVerifier: InOrder = _
override def beforeEach(): Unit = {
- val reader = mock[ContinuousReadSupport]
+ val stream = mock[ContinuousStream]
writeSupport = mock[StreamingWriteSupport]
query = mock[ContinuousExecution]
orderVerifier = inOrder(writeSupport, query)
@@ -53,7 +53,7 @@ class EpochCoordinatorSuite
spark = new TestSparkSession()
epochCoordinator
- = EpochCoordinatorRef.create(writeSupport, reader, query, "test", 1, spark, SparkEnv.get)
+ = EpochCoordinatorRef.create(writeSupport, stream, query, "test", 1, spark, SparkEnv.get)
}
test("single epoch") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index d98cc41..62f1666 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -31,33 +31,23 @@ import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamTest, T
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
-class FakeDataStream extends MicroBatchStream {
+class FakeDataStream extends MicroBatchStream with ContinuousStream {
override def deserializeOffset(json: String): Offset = RateStreamOffset(Map())
override def commit(end: Offset): Unit = {}
override def stop(): Unit = {}
override def initialOffset(): Offset = RateStreamOffset(Map())
override def latestOffset(): Offset = RateStreamOffset(Map())
+ override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map())
override def planInputPartitions(start: Offset, end: Offset): Array[InputPartition] = {
throw new IllegalStateException("fake source - cannot actually read")
}
- override def createReaderFactory(): PartitionReaderFactory = {
+ override def planInputPartitions(start: Offset): Array[InputPartition] = {
throw new IllegalStateException("fake source - cannot actually read")
}
-}
-
-case class FakeReadSupport() extends ContinuousReadSupport {
- override def deserializeOffset(json: String): Offset = RateStreamOffset(Map())
- override def commit(end: Offset): Unit = {}
- override def stop(): Unit = {}
- override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map())
- override def fullSchema(): StructType = StructType(Seq())
- override def initialOffset(): Offset = RateStreamOffset(Map())
- override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = null
- override def createContinuousReaderFactory(
- config: ScanConfig): ContinuousPartitionReaderFactory = {
+ override def createReaderFactory(): PartitionReaderFactory = {
throw new IllegalStateException("fake source - cannot actually read")
}
- override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
+ override def createContinuousReaderFactory(): ContinuousPartitionReaderFactory = {
throw new IllegalStateException("fake source - cannot actually read")
}
}
@@ -66,21 +56,19 @@ class FakeScanBuilder extends ScanBuilder with Scan {
override def build(): Scan = this
override def readSchema(): StructType = StructType(Seq())
override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = new FakeDataStream
+ override def toContinuousStream(checkpointLocation: String): ContinuousStream = new FakeDataStream
}
-class FakeMicroBatchReadTable extends Table with SupportsMicroBatchRead {
+trait FakeMicroBatchReadTable extends Table with SupportsMicroBatchRead {
override def name(): String = "fake"
override def schema(): StructType = StructType(Seq())
override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new FakeScanBuilder
}
-trait FakeContinuousReadSupportProvider extends ContinuousReadSupportProvider {
- override def createContinuousReadSupport(
- checkpointLocation: String,
- options: DataSourceOptions): ContinuousReadSupport = {
- LastReadOptions.options = options
- FakeReadSupport()
- }
+trait FakeContinuousReadTable extends Table with SupportsContinuousRead {
+ override def name(): String = "fake"
+ override def schema(): StructType = StructType(Seq())
+ override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new FakeScanBuilder
}
trait FakeStreamingWriteSupportProvider extends StreamingWriteSupportProvider {
@@ -111,27 +99,34 @@ class FakeReadMicroBatchOnly
class FakeReadContinuousOnly
extends DataSourceRegister
with TableProvider
- with FakeContinuousReadSupportProvider
with SessionConfigSupport {
override def shortName(): String = "fake-read-continuous-only"
override def keyPrefix: String = shortName()
- override def getTable(options: DataSourceOptions): Table = new Table {
- override def schema(): StructType = StructType(Seq())
- override def name(): String = "fake"
+ override def getTable(options: DataSourceOptions): Table = {
+ LastReadOptions.options = options
+ new FakeContinuousReadTable {}
}
}
-class FakeReadBothModes extends DataSourceRegister
- with TableProvider with FakeContinuousReadSupportProvider {
+class FakeReadBothModes extends DataSourceRegister with TableProvider {
override def shortName(): String = "fake-read-microbatch-continuous"
- override def getTable(options: DataSourceOptions): Table = new FakeMicroBatchReadTable {}
+ override def getTable(options: DataSourceOptions): Table = {
+ new Table with FakeMicroBatchReadTable with FakeContinuousReadTable {}
+ }
}
-class FakeReadNeitherMode extends DataSourceRegister {
+class FakeReadNeitherMode extends DataSourceRegister with TableProvider {
override def shortName(): String = "fake-read-neither-mode"
+
+ override def getTable(options: DataSourceOptions): Table = {
+ new Table {
+ override def name(): String = "fake"
+ override def schema(): StructType = StructType(Nil)
+ }
+ }
}
class FakeWriteSupportProvider
@@ -324,33 +319,25 @@ class StreamingDataSourceV2Suite extends StreamTest {
for ((read, write, trigger) <- cases) {
testQuietly(s"stream with read format $read, write format $write, trigger $trigger") {
- val readSource = DataSource.lookupDataSource(read, spark.sqlContext.conf).
- getConstructor().newInstance()
+ val table = DataSource.lookupDataSource(read, spark.sqlContext.conf).getConstructor()
+ .newInstance().asInstanceOf[TableProvider].getTable(DataSourceOptions.empty())
val writeSource = DataSource.lookupDataSource(write, spark.sqlContext.conf).
getConstructor().newInstance()
- def isMicroBatch(ds: Any): Boolean = ds match {
- case provider: TableProvider =>
- val table = provider.getTable(DataSourceOptions.empty())
- table.isInstanceOf[SupportsMicroBatchRead]
- case _ => false
- }
-
- (readSource, writeSource, trigger) match {
+ (table, writeSource, trigger) match {
// Valid microbatch queries.
- case (_: TableProvider, _: StreamingWriteSupportProvider, t)
- if isMicroBatch(readSource) && !t.isInstanceOf[ContinuousTrigger] =>
+ case (_: SupportsMicroBatchRead, _: StreamingWriteSupportProvider, t)
+ if !t.isInstanceOf[ContinuousTrigger] =>
testPositiveCase(read, write, trigger)
// Valid continuous queries.
- case (_: ContinuousReadSupportProvider, _: StreamingWriteSupportProvider,
+ case (_: SupportsContinuousRead, _: StreamingWriteSupportProvider,
_: ContinuousTrigger) =>
testPositiveCase(read, write, trigger)
// Invalid - can't read at all
- case (r, _, _)
- if !r.isInstanceOf[TableProvider]
- && !r.isInstanceOf[ContinuousReadSupportProvider] =>
+ case (r, _, _) if !r.isInstanceOf[SupportsMicroBatchRead] &&
+ !r.isInstanceOf[SupportsContinuousRead] =>
testNegativeCase(read, write, trigger,
s"Data source $read does not support streamed reading")
@@ -361,14 +348,13 @@ class StreamingDataSourceV2Suite extends StreamTest {
// Invalid - trigger is continuous but reader is not
case (r, _: StreamingWriteSupportProvider, _: ContinuousTrigger)
- if !r.isInstanceOf[ContinuousReadSupportProvider] =>
+ if !r.isInstanceOf[SupportsContinuousRead] =>
testNegativeCase(read, write, trigger,
s"Data source $read does not support continuous processing")
// Invalid - trigger is microbatch but reader is not
- case (r, _, t)
- if !isMicroBatch(r) &&
- !t.isInstanceOf[ContinuousTrigger] =>
+ case (r, _, t) if !r.isInstanceOf[SupportsMicroBatchRead] &&
+ !t.isInstanceOf[ContinuousTrigger] =>
testPostCreationNegativeCase(read, write, trigger,
s"Data source $read does not support microbatch processing")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org