You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/02/19 00:17:37 UTC
[spark] branch master updated: [SPARK-26785][SQL] data source v2
API refactor: streaming write
This is an automated email from the ASF dual-hosted git repository.
lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f85ed9a [SPARK-26785][SQL] data source v2 API refactor: streaming write
f85ed9a is described below
commit f85ed9a3e55083b0de0e20a37775efa92d248a4f
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Mon Feb 18 16:17:24 2019 -0800
[SPARK-26785][SQL] data source v2 API refactor: streaming write
## What changes were proposed in this pull request?
Continue the API refactor for streaming write, according to the [doc](https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing).
The major changes:
1. rename `StreamingWriteSupport` to `StreamingWrite`
2. add `WriteBuilder.buildForStreaming`
3. update existing sinks, to move the creation of `StreamingWrite` to `Table`
## How was this patch tested?
existing tests
Closes #23702 from cloud-fan/stream-write.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: gatorsmile <ga...@gmail.com>
---
.../spark/sql/kafka010/KafkaSourceProvider.scala | 42 ++++++-----
...riteSupport.scala => KafkaStreamingWrite.scala} | 8 +-
.../spark/sql/sources/v2/SessionConfigSupport.java | 4 +-
.../sources/v2/StreamingWriteSupportProvider.java | 54 -------------
.../spark/sql/sources/v2/SupportsBatchWrite.java | 2 +-
...BatchWrite.java => SupportsStreamingWrite.java} | 9 ++-
.../apache/spark/sql/sources/v2/TableProvider.java | 3 +-
.../spark/sql/sources/v2/writer/WriteBuilder.java | 9 ++-
.../sql/sources/v2/writer/WriterCommitMessage.java | 4 +-
.../streaming/StreamingDataWriterFactory.java | 2 +-
...eamingWriteSupport.java => StreamingWrite.java} | 21 +++++-
.../streaming/SupportsOutputMode.java} | 17 +++--
.../org/apache/spark/sql/DataFrameReader.scala | 2 +-
.../datasources/noop/NoopDataSource.scala | 26 +++----
.../datasources/v2/DataSourceV2StringFormat.scala | 88 ----------------------
.../datasources/v2/DataSourceV2Utils.scala | 43 +++++------
.../execution/streaming/MicroBatchExecution.scala | 20 +++--
.../execution/streaming/StreamingRelation.scala | 6 +-
.../spark/sql/execution/streaming/console.scala | 43 ++++++++---
.../streaming/continuous/ContinuousExecution.scala | 25 +++---
.../streaming/continuous/EpochCoordinator.scala | 6 +-
.../continuous/WriteToContinuousDataSource.scala | 6 +-
.../WriteToContinuousDataSourceExec.scala | 13 ++--
...onsoleWriteSupport.scala => ConsoleWrite.scala} | 6 +-
...portProvider.scala => ForeachWriterTable.scala} | 76 +++++++++++--------
.../streaming/sources/MicroBatchWrite.scala | 4 +-
.../streaming/sources/RateStreamProvider.scala | 3 +-
.../sources/TextSocketSourceProvider.scala | 3 +-
.../sql/execution/streaming/sources/memoryV2.scala | 42 +++++++----
.../spark/sql/streaming/DataStreamReader.scala | 2 +-
.../spark/sql/streaming/DataStreamWriter.scala | 50 ++++++------
.../sql/streaming/StreamingQueryManager.scala | 4 +-
...org.apache.spark.sql.sources.DataSourceRegister | 2 +-
.../execution/streaming/MemorySinkV2Suite.scala | 6 +-
.../sql/sources/v2/DataSourceV2UtilsSuite.scala | 4 +-
.../sql/sources/v2/SimpleWritableDataSource.scala | 3 +-
.../ContinuousQueuedDataReaderSuite.scala | 4 +-
.../continuous/EpochCoordinatorSuite.scala | 6 +-
.../sources/StreamingDataSourceV2Suite.scala | 70 ++++++++++-------
39 files changed, 345 insertions(+), 393 deletions(-)
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 9238899b..6994517 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -33,7 +33,8 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
+import org.apache.spark.sql.sources.v2.writer.WriteBuilder
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWrite, SupportsOutputMode}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
@@ -47,7 +48,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
with StreamSinkProvider
with RelationProvider
with CreatableRelationProvider
- with StreamingWriteSupportProvider
with TableProvider
with Logging {
import KafkaSourceProvider._
@@ -180,20 +180,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}
}
- override def createStreamingWriteSupport(
- queryId: String,
- schema: StructType,
- mode: OutputMode,
- options: DataSourceOptions): StreamingWriteSupport = {
- import scala.collection.JavaConverters._
-
- val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim)
- // We convert the options argument from V2 -> Java map -> scala mutable -> scala immutable.
- val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap)
-
- new KafkaStreamingWriteSupport(topic, producerParams, schema)
- }
-
private def strategy(caseInsensitiveParams: Map[String, String]) =
caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get match {
case ("assign", value) =>
@@ -365,7 +351,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}
class KafkaTable(strategy: => ConsumerStrategy) extends Table
- with SupportsMicroBatchRead with SupportsContinuousRead {
+ with SupportsMicroBatchRead with SupportsContinuousRead with SupportsStreamingWrite {
override def name(): String = s"Kafka $strategy"
@@ -374,6 +360,28 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new ScanBuilder {
override def build(): Scan = new KafkaScan(options)
}
+
+ override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
+ new WriteBuilder with SupportsOutputMode {
+ private var inputSchema: StructType = _
+
+ override def withInputDataSchema(schema: StructType): WriteBuilder = {
+ this.inputSchema = schema
+ this
+ }
+
+ override def outputMode(mode: OutputMode): WriteBuilder = this
+
+ override def buildForStreaming(): StreamingWrite = {
+ import scala.collection.JavaConverters._
+
+ assert(inputSchema != null)
+ val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim)
+ val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap)
+ new KafkaStreamingWrite(topic, producerParams, inputSchema)
+ }
+ }
+ }
}
class KafkaScan(options: DataSourceOptions) extends Scan {
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
similarity index 95%
rename from external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala
rename to external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
index 0d831c3..e3101e1 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.types.StructType
/**
@@ -33,18 +33,18 @@ import org.apache.spark.sql.types.StructType
case object KafkaWriterCommitMessage extends WriterCommitMessage
/**
- * A [[StreamingWriteSupport]] for Kafka writing. Responsible for generating the writer factory.
+ * A [[StreamingWrite]] for Kafka writing. Responsible for generating the writer factory.
*
* @param topic The topic this writer is responsible for. If None, topic will be inferred from
* a `topic` field in the incoming data.
* @param producerParams Parameters for Kafka producers in each task.
* @param schema The schema of the input data.
*/
-class KafkaStreamingWriteSupport(
+class KafkaStreamingWrite(
topic: Option[String],
producerParams: ju.Map[String, Object],
schema: StructType)
- extends StreamingWriteSupport {
+ extends StreamingWrite {
validateQuery(schema.toAttributes, producerParams, topic)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
index c00abd9..d27fbfd 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
@@ -20,12 +20,12 @@ package org.apache.spark.sql.sources.v2;
import org.apache.spark.annotation.Evolving;
/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
+ * A mix-in interface for {@link TableProvider}. Data sources can implement this interface to
* propagate session configs with the specified key-prefix to all data source operations in this
* session.
*/
@Evolving
-public interface SessionConfigSupport extends DataSourceV2 {
+public interface SessionConfigSupport extends TableProvider {
/**
* Key prefix of the session configs to propagate, which is usually the data source name. Spark
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java
deleted file mode 100644
index 8ac9c51..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport;
-import org.apache.spark.sql.streaming.OutputMode;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
- * provide data writing ability for structured streaming.
- *
- * This interface is used to create {@link StreamingWriteSupport} instances when end users run
- * {@code Dataset.writeStream.format(...).option(...).start()}.
- */
-@Evolving
-public interface StreamingWriteSupportProvider extends DataSourceV2, BaseStreamingSink {
-
- /**
- * Creates a {@link StreamingWriteSupport} instance to save the data to this data source, which is
- * called by Spark at the beginning of each streaming query.
- *
- * @param queryId A unique string for the writing query. It's possible that there are many
- * writing queries running at the same time, and the returned
- * {@link StreamingWriteSupport} can use this id to distinguish itself from others.
- * @param schema the schema of the data to be written.
- * @param mode the output mode which determines what successive epoch output means to this
- * sink, please refer to {@link OutputMode} for more details.
- * @param options the options for the returned data source writer, which is an immutable
- * case-insensitive string-to-string map.
- */
- StreamingWriteSupport createStreamingWriteSupport(
- String queryId,
- StructType schema,
- OutputMode mode,
- DataSourceOptions options);
-}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java
index 08caadd..b2cd97a 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java
@@ -24,7 +24,7 @@ import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
* An empty mix-in interface for {@link Table}, to indicate this table supports batch write.
* <p>
* If a {@link Table} implements this interface, the
- * {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder}
+ * {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder}
* with {@link WriteBuilder#buildForBatch()} implemented.
* </p>
*/
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java
similarity index 76%
copy from sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java
copy to sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java
index 08caadd..1050d35 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java
@@ -18,15 +18,16 @@
package org.apache.spark.sql.sources.v2;
import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
/**
- * An empty mix-in interface for {@link Table}, to indicate this table supports batch write.
+ * An empty mix-in interface for {@link Table}, to indicate this table supports streaming write.
* <p>
* If a {@link Table} implements this interface, the
- * {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder}
- * with {@link WriteBuilder#buildForBatch()} implemented.
+ * {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder}
+ * with {@link WriteBuilder#buildForStreaming()} implemented.
* </p>
*/
@Evolving
-public interface SupportsBatchWrite extends SupportsWrite {}
+public interface SupportsStreamingWrite extends SupportsWrite, BaseStreamingSink { }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java
index 855d5ef..a9b83b6 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java
@@ -29,8 +29,7 @@ import org.apache.spark.sql.types.StructType;
* </p>
*/
@Evolving
-// TODO: do not extend `DataSourceV2`, after we finish the API refactor completely.
-public interface TableProvider extends DataSourceV2 {
+public interface TableProvider {
/**
* Return a {@link Table} instance to do read/write with user-specified options.
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java
index e861c72..07529fe 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java
@@ -20,6 +20,7 @@ package org.apache.spark.sql.sources.v2.writer;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.SupportsBatchWrite;
import org.apache.spark.sql.sources.v2.Table;
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite;
import org.apache.spark.sql.types.StructType;
/**
@@ -64,6 +65,12 @@ public interface WriteBuilder {
* {@link SupportsSaveMode}.
*/
default BatchWrite buildForBatch() {
- throw new UnsupportedOperationException("Batch scans are not supported");
+ throw new UnsupportedOperationException(getClass().getName() +
+ " does not support batch write");
+ }
+
+ default StreamingWrite buildForStreaming() {
+ throw new UnsupportedOperationException(getClass().getName() +
+ " does not support streaming write");
}
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
index 6334c8f..23e8580 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
@@ -20,12 +20,12 @@ package org.apache.spark.sql.sources.v2.writer;
import java.io.Serializable;
import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport;
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite;
/**
* A commit message returned by {@link DataWriter#commit()} and will be sent back to the driver side
* as the input parameter of {@link BatchWrite#commit(WriterCommitMessage[])} or
- * {@link StreamingWriteSupport#commit(long, WriterCommitMessage[])}.
+ * {@link StreamingWrite#commit(long, WriterCommitMessage[])}.
*
* This is an empty interface, data sources should define their own message class and use it when
* generating messages at executor side and handling the messages at driver side.
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java
index 7d3d21c..af2f03c 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java
@@ -26,7 +26,7 @@ import org.apache.spark.sql.sources.v2.writer.DataWriter;
/**
* A factory of {@link DataWriter} returned by
- * {@link StreamingWriteSupport#createStreamingWriterFactory()}, which is responsible for creating
+ * {@link StreamingWrite#createStreamingWriterFactory()}, which is responsible for creating
* and initializing the actual data writer at executor side.
*
* Note that, the writer factory will be serialized and sent to executors, then the data writer
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWrite.java
similarity index 73%
rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java
rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWrite.java
index 84cfbf2..5617f1c 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWrite.java
@@ -22,13 +22,26 @@ import org.apache.spark.sql.sources.v2.writer.DataWriter;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
/**
- * An interface that defines how to write the data to data source for streaming processing.
+ * An interface that defines how to write the data to data source in streaming queries.
*
- * Streaming queries are divided into intervals of data called epochs, with a monotonically
- * increasing numeric ID. This writer handles commits and aborts for each successive epoch.
+ * The writing procedure is:
+ * 1. Create a writer factory by {@link #createStreamingWriterFactory()}, serialize and send it to
+ * all the partitions of the input data(RDD).
+ * 2. For each epoch in each partition, create the data writer, and write the data of the epoch in
+ * the partition with this writer. If all the data are written successfully, call
+ * {@link DataWriter#commit()}. If exception happens during the writing, call
+ * {@link DataWriter#abort()}.
+ * 3. If writers in all partitions of one epoch are successfully committed, call
+ * {@link #commit(long, WriterCommitMessage[])}. If some writers are aborted, or the job failed
+ * with an unknown reason, call {@link #abort(long, WriterCommitMessage[])}.
+ *
+ * While Spark will retry failed writing tasks, Spark won't retry failed writing jobs. Users should
+ * do it manually in their Spark applications if they want to retry.
+ *
+ * Please refer to the documentation of commit/abort methods for detailed specifications.
*/
@Evolving
-public interface StreamingWriteSupport {
+public interface StreamingWrite {
/**
* Creates a writer factory which will be serialized and sent to executors.
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsOutputMode.java
similarity index 67%
rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsOutputMode.java
index 43bdcca..832dcfa 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsOutputMode.java
@@ -15,12 +15,15 @@
* limitations under the License.
*/
-package org.apache.spark.sql.sources.v2;
+package org.apache.spark.sql.sources.v2.writer.streaming;
-import org.apache.spark.annotation.Evolving;
+import org.apache.spark.annotation.Unstable;
+import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
+import org.apache.spark.sql.streaming.OutputMode;
-/**
- * TODO: remove it when we finish the API refactor for streaming write side.
- */
-@Evolving
-public interface DataSourceV2 {}
+// TODO: remove it when we have `SupportsTruncate`
+@Unstable
+public interface SupportsOutputMode extends WriteBuilder {
+
+ WriteBuilder outputMode(OutputMode mode);
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 713c9a9..e757785 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -205,7 +205,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
if (classOf[TableProvider].isAssignableFrom(cls)) {
val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
- ds = provider, conf = sparkSession.sessionState.conf)
+ source = provider, conf = sparkSession.sessionState.conf)
val pathsOption = {
val objectMapper = new ObjectMapper()
DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
index 452ebbb..8f2072c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
@@ -30,30 +30,23 @@ import org.apache.spark.sql.types.StructType
* This is no-op datasource. It does not do anything besides consuming its input.
* This can be useful for benchmarking or to cache data without any additional overhead.
*/
-class NoopDataSource
- extends DataSourceV2
- with TableProvider
- with DataSourceRegister
- with StreamingWriteSupportProvider {
-
+class NoopDataSource extends TableProvider with DataSourceRegister {
override def shortName(): String = "noop"
override def getTable(options: DataSourceOptions): Table = NoopTable
- override def createStreamingWriteSupport(
- queryId: String,
- schema: StructType,
- mode: OutputMode,
- options: DataSourceOptions): StreamingWriteSupport = NoopStreamingWriteSupport
}
-private[noop] object NoopTable extends Table with SupportsBatchWrite {
+private[noop] object NoopTable extends Table with SupportsBatchWrite with SupportsStreamingWrite {
override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = NoopWriteBuilder
override def name(): String = "noop-table"
override def schema(): StructType = new StructType()
}
-private[noop] object NoopWriteBuilder extends WriteBuilder with SupportsSaveMode {
- override def buildForBatch(): BatchWrite = NoopBatchWrite
+private[noop] object NoopWriteBuilder extends WriteBuilder
+ with SupportsSaveMode with SupportsOutputMode {
override def mode(mode: SaveMode): WriteBuilder = this
+ override def outputMode(mode: OutputMode): WriteBuilder = this
+ override def buildForBatch(): BatchWrite = NoopBatchWrite
+ override def buildForStreaming(): StreamingWrite = NoopStreamingWrite
}
private[noop] object NoopBatchWrite extends BatchWrite {
@@ -72,7 +65,7 @@ private[noop] object NoopWriter extends DataWriter[InternalRow] {
override def abort(): Unit = {}
}
-private[noop] object NoopStreamingWriteSupport extends StreamingWriteSupport {
+private[noop] object NoopStreamingWrite extends StreamingWrite {
override def createStreamingWriterFactory(): StreamingDataWriterFactory =
NoopStreamingDataWriterFactory
override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
@@ -85,4 +78,3 @@ private[noop] object NoopStreamingDataWriterFactory extends StreamingDataWriterF
taskId: Long,
epochId: Long): DataWriter[InternalRow] = NoopWriter
}
-
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala
deleted file mode 100644
index f11703c..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.v2
-
-import org.apache.commons.lang3.StringUtils
-
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.sources.v2.DataSourceV2
-import org.apache.spark.util.Utils
-
-/**
- * A trait that can be used by data source v2 related query plans(both logical and physical), to
- * provide a string format of the data source information for explain.
- */
-trait DataSourceV2StringFormat {
-
- /**
- * The instance of this data source implementation. Note that we only consider its class in
- * equals/hashCode, not the instance itself.
- */
- def source: DataSourceV2
-
- /**
- * The output of the data source reader, w.r.t. column pruning.
- */
- def output: Seq[Attribute]
-
- /**
- * The options for this data source reader.
- */
- def options: Map[String, String]
-
- /**
- * The filters which have been pushed to the data source.
- */
- def pushedFilters: Seq[Expression]
-
- private def sourceName: String = source match {
- case registered: DataSourceRegister => registered.shortName()
- // source.getClass.getSimpleName can cause Malformed class name error,
- // call safer `Utils.getSimpleName` instead
- case _ => Utils.getSimpleName(source.getClass)
- }
-
- def metadataString(maxFields: Int): String = {
- val entries = scala.collection.mutable.ArrayBuffer.empty[(String, String)]
-
- if (pushedFilters.nonEmpty) {
- entries += "Filters" -> pushedFilters.mkString("[", ", ", "]")
- }
-
- // TODO: we should only display some standard options like path, table, etc.
- if (options.nonEmpty) {
- entries += "Options" -> Utils.redact(options).map {
- case (k, v) => s"$k=$v"
- }.mkString("[", ",", "]")
- }
-
- val outputStr = truncatedString(output, "[", ", ", "]", maxFields)
-
- val entriesStr = if (entries.nonEmpty) {
- truncatedString(entries.map {
- case (key, value) => key + ": " + StringUtils.abbreviate(value, 100)
- }, " (", ", ", ")", maxFields)
- } else {
- ""
- }
-
- s"$sourceName$outputStr$entriesStr"
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
index e9cc399..30897d8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
@@ -21,8 +21,7 @@ import java.util.regex.Pattern
import org.apache.spark.internal.Logging
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.sources.v2.{DataSourceV2, SessionConfigSupport}
+import org.apache.spark.sql.sources.v2.{SessionConfigSupport, TableProvider}
private[sql] object DataSourceV2Utils extends Logging {
@@ -34,34 +33,28 @@ private[sql] object DataSourceV2Utils extends Logging {
* `spark.datasource.$keyPrefix`. A session config `spark.datasource.$keyPrefix.xxx -> yyy` will
* be transformed into `xxx -> yyy`.
*
- * @param ds a [[DataSourceV2]] object
+ * @param source a [[TableProvider]] object
* @param conf the session conf
* @return an immutable map that contains all the extracted and transformed k/v pairs.
*/
- def extractSessionConfigs(ds: DataSourceV2, conf: SQLConf): Map[String, String] = ds match {
- case cs: SessionConfigSupport =>
- val keyPrefix = cs.keyPrefix()
- require(keyPrefix != null, "The data source config key prefix can't be null.")
-
- val pattern = Pattern.compile(s"^spark\\.datasource\\.$keyPrefix\\.(.+)")
-
- conf.getAllConfs.flatMap { case (key, value) =>
- val m = pattern.matcher(key)
- if (m.matches() && m.groupCount() > 0) {
- Seq((m.group(1), value))
- } else {
- Seq.empty
+ def extractSessionConfigs(source: TableProvider, conf: SQLConf): Map[String, String] = {
+ source match {
+ case cs: SessionConfigSupport =>
+ val keyPrefix = cs.keyPrefix()
+ require(keyPrefix != null, "The data source config key prefix can't be null.")
+
+ val pattern = Pattern.compile(s"^spark\\.datasource\\.$keyPrefix\\.(.+)")
+
+ conf.getAllConfs.flatMap { case (key, value) =>
+ val m = pattern.matcher(key)
+ if (m.matches() && m.groupCount() > 0) {
+ Seq((m.group(1), value))
+ } else {
+ Seq.empty
+ }
}
- }
-
- case _ => Map.empty
- }
- def failForUserSpecifiedSchema[T](ds: DataSourceV2): T = {
- val name = ds match {
- case register: DataSourceRegister => register.shortName()
- case _ => ds.getClass.getName
+ case _ => Map.empty
}
- throw new UnsupportedOperationException(name + " source does not support user-specified schema")
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 2c33975..cca2790 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWrite, RateCo
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2}
+import org.apache.spark.sql.sources.v2.writer.streaming.SupportsOutputMode
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.Clock
@@ -513,13 +514,16 @@ class MicroBatchExecution(
val triggerLogicalPlan = sink match {
case _: Sink => newAttributePlan
- case s: StreamingWriteSupportProvider =>
- val writer = s.createStreamingWriteSupport(
- s"$runId",
- newAttributePlan.schema,
- outputMode,
- new DataSourceOptions(extraOptions.asJava))
- WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, writer), newAttributePlan)
+ case s: SupportsStreamingWrite =>
+ // TODO: we should translate OutputMode to concrete write actions like truncate, but
+ // the truncate action is being developed in SPARK-26666.
+ val writeBuilder = s.newWriteBuilder(new DataSourceOptions(extraOptions.asJava))
+ .withQueryId(runId.toString)
+ .withInputDataSchema(newAttributePlan.schema)
+ val streamingWrite = writeBuilder.asInstanceOf[SupportsOutputMode]
+ .outputMode(outputMode)
+ .buildForStreaming()
+ WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, streamingWrite), newAttributePlan)
case _ => throw new IllegalArgumentException(s"unknown sink type for $sink")
}
@@ -549,7 +553,7 @@ class MicroBatchExecution(
SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) {
sink match {
case s: Sink => s.addBatch(currentBatchId, nextBatch)
- case _: StreamingWriteSupportProvider =>
+ case _: SupportsStreamingWrite =>
// This doesn't accumulate any data - it just forces execution of the microbatch writer.
nextBatch.collect()
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index 83d38dc..1b7aa54 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.datasources.DataSource
-import org.apache.spark.sql.sources.v2.{DataSourceV2, Table}
+import org.apache.spark.sql.sources.v2.{Table, TableProvider}
object StreamingRelation {
def apply(dataSource: DataSource): StreamingRelation = {
@@ -86,13 +86,13 @@ case class StreamingExecutionRelation(
// know at read time whether the query is continuous or not, so we need to be able to
// swap a V1 relation back in.
/**
- * Used to link a [[DataSourceV2]] into a streaming
+ * Used to link a [[TableProvider]] into a streaming
* [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. This is only used for creating
* a streaming [[org.apache.spark.sql.DataFrame]] from [[org.apache.spark.sql.DataFrameReader]],
* and should be converted before passing to [[StreamExecution]].
*/
case class StreamingRelationV2(
- dataSource: DataSourceV2,
+ source: TableProvider,
sourceName: String,
table: Table,
extraOptions: Map[String, String],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
index 9c5c16f..348bc76 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
@@ -18,10 +18,11 @@
package org.apache.spark.sql.execution.streaming
import org.apache.spark.sql._
-import org.apache.spark.sql.execution.streaming.sources.ConsoleWriteSupport
+import org.apache.spark.sql.execution.streaming.sources.ConsoleWrite
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister}
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, StreamingWriteSupportProvider}
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
+import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.writer.WriteBuilder
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWrite, SupportsOutputMode}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
@@ -30,17 +31,12 @@ case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame)
override def schema: StructType = data.schema
}
-class ConsoleSinkProvider extends DataSourceV2
- with StreamingWriteSupportProvider
+class ConsoleSinkProvider extends TableProvider
with DataSourceRegister
with CreatableRelationProvider {
- override def createStreamingWriteSupport(
- queryId: String,
- schema: StructType,
- mode: OutputMode,
- options: DataSourceOptions): StreamingWriteSupport = {
- new ConsoleWriteSupport(schema, options)
+ override def getTable(options: DataSourceOptions): Table = {
+ ConsoleTable
}
def createRelation(
@@ -60,3 +56,28 @@ class ConsoleSinkProvider extends DataSourceV2
def shortName(): String = "console"
}
+
+object ConsoleTable extends Table with SupportsStreamingWrite {
+
+ override def name(): String = "console"
+
+ override def schema(): StructType = StructType(Nil)
+
+ override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
+ new WriteBuilder with SupportsOutputMode {
+ private var inputSchema: StructType = _
+
+ override def withInputDataSchema(schema: StructType): WriteBuilder = {
+ this.inputSchema = schema
+ this
+ }
+
+ override def outputMode(mode: OutputMode): WriteBuilder = this
+
+ override def buildForStreaming(): StreamingWrite = {
+ assert(inputSchema != null)
+ new ConsoleWrite(inputSchema, options)
+ }
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index b22795d..20101c7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -32,8 +32,9 @@ import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _}
import org.apache.spark.sql.sources.v2
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamingWriteSupportProvider, SupportsContinuousRead}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsContinuousRead, SupportsStreamingWrite}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.streaming.SupportsOutputMode
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.Clock
@@ -42,7 +43,7 @@ class ContinuousExecution(
name: String,
checkpointRoot: String,
analyzedPlan: LogicalPlan,
- sink: StreamingWriteSupportProvider,
+ sink: SupportsStreamingWrite,
trigger: Trigger,
triggerClock: Clock,
outputMode: OutputMode,
@@ -174,12 +175,15 @@ class ContinuousExecution(
"CurrentTimestamp and CurrentDate not yet supported for continuous processing")
}
- val writer = sink.createStreamingWriteSupport(
- s"$runId",
- withNewSources.schema,
- outputMode,
- new DataSourceOptions(extraOptions.asJava))
- val planWithSink = WriteToContinuousDataSource(writer, withNewSources)
+ // TODO: we should translate OutputMode to concrete write actions like truncate, but
+ // the truncate action is being developed in SPARK-26666.
+ val writeBuilder = sink.newWriteBuilder(new DataSourceOptions(extraOptions.asJava))
+ .withQueryId(runId.toString)
+ .withInputDataSchema(withNewSources.schema)
+ val streamingWrite = writeBuilder.asInstanceOf[SupportsOutputMode]
+ .outputMode(outputMode)
+ .buildForStreaming()
+ val planWithSink = WriteToContinuousDataSource(streamingWrite, withNewSources)
reportTimeTaken("queryPlanning") {
lastExecution = new IncrementalExecution(
@@ -214,9 +218,8 @@ class ContinuousExecution(
trigger.asInstanceOf[ContinuousTrigger].intervalMs.toString)
// Use the parent Spark session for the endpoint since it's where this query ID is registered.
- val epochEndpoint =
- EpochCoordinatorRef.create(
- writer, stream, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get)
+ val epochEndpoint = EpochCoordinatorRef.create(
+ streamingWrite, stream, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get)
val epochUpdateThread = new Thread(new Runnable {
override def run: Unit = {
try {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
index d1bda79..a998422 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
@@ -25,7 +25,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeR
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
import org.apache.spark.util.RpcUtils
private[continuous] sealed trait EpochCoordinatorMessage extends Serializable
@@ -82,7 +82,7 @@ private[sql] object EpochCoordinatorRef extends Logging {
* Create a reference to a new [[EpochCoordinator]].
*/
def create(
- writeSupport: StreamingWriteSupport,
+ writeSupport: StreamingWrite,
stream: ContinuousStream,
query: ContinuousExecution,
epochCoordinatorId: String,
@@ -115,7 +115,7 @@ private[sql] object EpochCoordinatorRef extends Logging {
* have both committed and reported an end offset for a given epoch.
*/
private[continuous] class EpochCoordinator(
- writeSupport: StreamingWriteSupport,
+ writeSupport: StreamingWrite,
stream: ContinuousStream,
query: ContinuousExecution,
startEpoch: Long,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala
index 7ad21cc..54f484c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala
@@ -19,13 +19,13 @@ package org.apache.spark.sql.execution.streaming.continuous
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
/**
* The logical plan for writing data in a continuous stream.
*/
-case class WriteToContinuousDataSource(
- writeSupport: StreamingWriteSupport, query: LogicalPlan) extends LogicalPlan {
+case class WriteToContinuousDataSource(write: StreamingWrite, query: LogicalPlan)
+ extends LogicalPlan {
override def children: Seq[LogicalPlan] = Seq(query)
override def output: Seq[Attribute] = Nil
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
index 2178466..2f3af6a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
@@ -26,21 +26,22 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.streaming.StreamExecution
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
/**
- * The physical plan for writing data into a continuous processing [[StreamingWriteSupport]].
+ * The physical plan for writing data into a continuous processing [[StreamingWrite]].
*/
-case class WriteToContinuousDataSourceExec(writeSupport: StreamingWriteSupport, query: SparkPlan)
- extends UnaryExecNode with Logging {
+case class WriteToContinuousDataSourceExec(write: StreamingWrite, query: SparkPlan)
+ extends UnaryExecNode with Logging {
+
override def child: SparkPlan = query
override def output: Seq[Attribute] = Nil
override protected def doExecute(): RDD[InternalRow] = {
- val writerFactory = writeSupport.createStreamingWriterFactory()
+ val writerFactory = write.createStreamingWriterFactory()
val rdd = new ContinuousWriteRDD(query.execute(), writerFactory)
- logInfo(s"Start processing data source write support: $writeSupport. " +
+ logInfo(s"Start processing data source write support: $write. " +
s"The input RDD has ${rdd.partitions.length} partitions.")
EpochCoordinatorRef.get(
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWrite.scala
similarity index 94%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriteSupport.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWrite.scala
index 833e62f..f2ff30b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriteSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWrite.scala
@@ -22,12 +22,12 @@ import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.types.StructType
/** Common methods used to create writes for the the console sink */
-class ConsoleWriteSupport(schema: StructType, options: DataSourceOptions)
- extends StreamingWriteSupport with Logging {
+class ConsoleWrite(schema: StructType, options: DataSourceOptions)
+ extends StreamingWrite with Logging {
// Number of rows to display, by default 20 rows
protected val numRowsToShow = options.getInt("numRows", 20)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriteSupportProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
similarity index 66%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriteSupportProvider.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
index 4218fd5..6fbb59c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriteSupportProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
@@ -22,63 +22,73 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.python.PythonForeachWriter
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamingWriteSupportProvider}
-import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriterCommitMessage}
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite, Table}
+import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriteBuilder, WriterCommitMessage}
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
/**
- * A [[org.apache.spark.sql.sources.v2.DataSourceV2]] for forwarding data into the specified
- * [[ForeachWriter]].
+ * A write-only table for forwarding data into the specified [[ForeachWriter]].
*
* @param writer The [[ForeachWriter]] to process all data.
* @param converter An object to convert internal rows to target type T. Either it can be
* a [[ExpressionEncoder]] or a direct converter function.
* @tparam T The expected type of the sink.
*/
-case class ForeachWriteSupportProvider[T](
+case class ForeachWriterTable[T](
writer: ForeachWriter[T],
converter: Either[ExpressionEncoder[T], InternalRow => T])
- extends StreamingWriteSupportProvider {
-
- override def createStreamingWriteSupport(
- queryId: String,
- schema: StructType,
- mode: OutputMode,
- options: DataSourceOptions): StreamingWriteSupport = {
- new StreamingWriteSupport {
- override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
- override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
-
- override def createStreamingWriterFactory(): StreamingDataWriterFactory = {
- val rowConverter: InternalRow => T = converter match {
- case Left(enc) =>
- val boundEnc = enc.resolveAndBind(
- schema.toAttributes,
- SparkSession.getActiveSession.get.sessionState.analyzer)
- boundEnc.fromRow
- case Right(func) =>
- func
- }
- ForeachWriterFactory(writer, rowConverter)
+ extends Table with SupportsStreamingWrite {
+
+ override def name(): String = "ForeachSink"
+
+ override def schema(): StructType = StructType(Nil)
+
+ override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
+ new WriteBuilder with SupportsOutputMode {
+ private var inputSchema: StructType = _
+
+ override def withInputDataSchema(schema: StructType): WriteBuilder = {
+ this.inputSchema = schema
+ this
}
- override def toString: String = "ForeachSink"
+ override def outputMode(mode: OutputMode): WriteBuilder = this
+
+ override def buildForStreaming(): StreamingWrite = {
+ new StreamingWrite {
+ override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
+ override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
+
+ override def createStreamingWriterFactory(): StreamingDataWriterFactory = {
+ val rowConverter: InternalRow => T = converter match {
+ case Left(enc) =>
+ val boundEnc = enc.resolveAndBind(
+ inputSchema.toAttributes,
+ SparkSession.getActiveSession.get.sessionState.analyzer)
+ boundEnc.fromRow
+ case Right(func) =>
+ func
+ }
+ ForeachWriterFactory(writer, rowConverter)
+ }
+ }
+ }
}
}
}
-object ForeachWriteSupportProvider {
+object ForeachWriterTable {
def apply[T](
writer: ForeachWriter[T],
- encoder: ExpressionEncoder[T]): ForeachWriteSupportProvider[_] = {
+ encoder: ExpressionEncoder[T]): ForeachWriterTable[_] = {
writer match {
case pythonWriter: PythonForeachWriter =>
- new ForeachWriteSupportProvider[UnsafeRow](
+ new ForeachWriterTable[UnsafeRow](
pythonWriter, Right((x: InternalRow) => x.asInstanceOf[UnsafeRow]))
case _ =>
- new ForeachWriteSupportProvider[T](writer, Left(encoder))
+ new ForeachWriterTable[T](writer, Left(encoder))
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala
index 143235e..f395189 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala
@@ -19,14 +19,14 @@ package org.apache.spark.sql.execution.streaming.sources
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriter, DataWriterFactory, WriterCommitMessage}
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
/**
* A [[BatchWrite]] used to hook V2 stream writers into a microbatch plan. It implements
* the non-streaming interface, forwarding the epoch ID determined at construction to a wrapped
* streaming write support.
*/
-class MicroBatchWrite(eppchId: Long, val writeSupport: StreamingWriteSupport) extends BatchWrite {
+class MicroBatchWrite(eppchId: Long, val writeSupport: StreamingWrite) extends BatchWrite {
override def commit(messages: Array[WriterCommitMessage]): Unit = {
writeSupport.commit(eppchId, messages)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
index 075c6b9..3a00825 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
@@ -40,8 +40,7 @@ import org.apache.spark.sql.types._
* generated rows. The source will try its best to reach `rowsPerSecond`, but the query may
* be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed.
*/
-class RateStreamProvider extends DataSourceV2
- with TableProvider with DataSourceRegister {
+class RateStreamProvider extends TableProvider with DataSourceRegister {
import RateStreamProvider._
override def getTable(options: DataSourceOptions): Table = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
index c3b24a8..8ac5bfc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
@@ -31,8 +31,7 @@ import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
-class TextSocketSourceProvider extends DataSourceV2
- with TableProvider with DataSourceRegister with Logging {
+class TextSocketSourceProvider extends TableProvider with DataSourceRegister with Logging {
private def checkParameters(params: DataSourceOptions): Unit = {
logWarning("The socket source should not be used for production applications! " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
index c50dc7b..3fc2cbe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
@@ -32,9 +32,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update}
import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink}
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, StreamingWriteSupportProvider}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite}
import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
@@ -42,15 +42,31 @@ import org.apache.spark.sql.types.StructType
* A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit
* tests and does not provide durability.
*/
-class MemorySinkV2 extends DataSourceV2 with StreamingWriteSupportProvider
- with MemorySinkBase with Logging {
-
- override def createStreamingWriteSupport(
- queryId: String,
- schema: StructType,
- mode: OutputMode,
- options: DataSourceOptions): StreamingWriteSupport = {
- new MemoryStreamingWriteSupport(this, mode, schema)
+class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Logging {
+
+ override def name(): String = "MemorySinkV2"
+
+ override def schema(): StructType = StructType(Nil)
+
+ override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
+ new WriteBuilder with SupportsOutputMode {
+ private var mode: OutputMode = _
+ private var inputSchema: StructType = _
+
+ override def outputMode(mode: OutputMode): WriteBuilder = {
+ this.mode = mode
+ this
+ }
+
+ override def withInputDataSchema(schema: StructType): WriteBuilder = {
+ this.inputSchema = schema
+ this
+ }
+
+ override def buildForStreaming(): StreamingWrite = {
+ new MemoryStreamingWrite(MemorySinkV2.this, mode, inputSchema)
+ }
+ }
}
private case class AddedData(batchId: Long, data: Array[Row])
@@ -122,9 +138,9 @@ class MemorySinkV2 extends DataSourceV2 with StreamingWriteSupportProvider
case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row])
extends WriterCommitMessage {}
-class MemoryStreamingWriteSupport(
+class MemoryStreamingWrite(
val sink: MemorySinkV2, outputMode: OutputMode, schema: StructType)
- extends StreamingWriteSupport {
+ extends StreamingWrite {
override def createStreamingWriterFactory: MemoryWriterFactory = {
MemoryWriterFactory(outputMode, schema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 8666818..ef21caa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -173,7 +173,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
ds match {
case provider: TableProvider =>
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
- ds = provider, conf = sparkSession.sessionState.conf)
+ source = provider, conf = sparkSession.sessionState.conf)
val options = sessionOptions ++ extraOptions
val dsOptions = new DataSourceOptions(options.asJava)
val table = userSpecifiedSchema match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index ea596ba..9841994 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
import org.apache.spark.sql.execution.streaming.sources._
-import org.apache.spark.sql.sources.v2.StreamingWriteSupportProvider
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite, TableProvider}
/**
* Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems,
@@ -278,7 +278,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
query
} else if (source == "foreach") {
assertNotPartitioned("foreach")
- val sink = ForeachWriteSupportProvider[T](foreachWriter, ds.exprEnc)
+ val sink = ForeachWriterTable[T](foreachWriter, ds.exprEnc)
df.sparkSession.sessionState.streamingQueryManager.startQuery(
extraOptions.get("queryName"),
extraOptions.get("checkpointLocation"),
@@ -304,30 +304,29 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
useTempCheckpointLocation = true,
trigger = trigger)
} else {
- val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
+ val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",")
- var options = extraOptions.toMap
- val sink = ds.getConstructor().newInstance() match {
- case w: StreamingWriteSupportProvider
- if !disabledSources.contains(w.getClass.getCanonicalName) =>
- val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
- w, df.sparkSession.sessionState.conf)
- options = sessionOptions ++ extraOptions
- w
- case _ =>
- val ds = DataSource(
- df.sparkSession,
- className = source,
- options = options,
- partitionColumns = normalizedParCols.getOrElse(Nil))
- ds.createSink(outputMode)
+ val useV1Source = disabledSources.contains(cls.getCanonicalName)
+
+ val sink = if (classOf[TableProvider].isAssignableFrom(cls) && !useV1Source) {
+ val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
+ val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+ source = provider, conf = df.sparkSession.sessionState.conf)
+ val options = sessionOptions ++ extraOptions
+ val dsOptions = new DataSourceOptions(options.asJava)
+ provider.getTable(dsOptions) match {
+ case s: SupportsStreamingWrite => s
+ case _ => createV1Sink()
+ }
+ } else {
+ createV1Sink()
}
df.sparkSession.sessionState.streamingQueryManager.startQuery(
- options.get("queryName"),
- options.get("checkpointLocation"),
+ extraOptions.get("queryName"),
+ extraOptions.get("checkpointLocation"),
df,
- options,
+ extraOptions.toMap,
sink,
outputMode,
useTempCheckpointLocation = source == "console" || source == "noop",
@@ -336,6 +335,15 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
}
}
+ private def createV1Sink(): BaseStreamingSink = {
+ val ds = DataSource(
+ df.sparkSession,
+ className = source,
+ options = extraOptions.toMap,
+ partitionColumns = normalizedParCols.getOrElse(Nil))
+ ds.createSink(outputMode)
+ }
+
/**
* Sets the output of the streaming query to be processed using the provided writer object.
* object. See [[org.apache.spark.sql.ForeachWriter]] for more details on the lifecycle and
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 0bd8a92..a7fa800 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution,
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS
-import org.apache.spark.sql.sources.v2.StreamingWriteSupportProvider
+import org.apache.spark.sql.sources.v2.SupportsStreamingWrite
import org.apache.spark.util.{Clock, SystemClock, Utils}
/**
@@ -261,7 +261,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
}
(sink, trigger) match {
- case (v2Sink: StreamingWriteSupportProvider, trigger: ContinuousTrigger) =>
+ case (v2Sink: SupportsStreamingWrite, trigger: ContinuousTrigger) =>
if (operationCheckEnabled) {
UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode)
}
diff --git a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index a36b0cf..914af58 100644
--- a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -9,6 +9,6 @@ org.apache.spark.sql.streaming.sources.FakeReadMicroBatchOnly
org.apache.spark.sql.streaming.sources.FakeReadContinuousOnly
org.apache.spark.sql.streaming.sources.FakeReadBothModes
org.apache.spark.sql.streaming.sources.FakeReadNeitherMode
-org.apache.spark.sql.streaming.sources.FakeWriteSupportProvider
+org.apache.spark.sql.streaming.sources.FakeWriteOnly
org.apache.spark.sql.streaming.sources.FakeNoWrite
org.apache.spark.sql.streaming.sources.FakeWriteSupportProviderV1Fallback
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
index 6185736..e804377 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
@@ -43,9 +43,9 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter {
test("streaming writer") {
val sink = new MemorySinkV2
- val writeSupport = new MemoryStreamingWriteSupport(
+ val write = new MemoryStreamingWrite(
sink, OutputMode.Append(), new StructType().add("i", "int"))
- writeSupport.commit(0,
+ write.commit(0,
Array(
MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))),
MemoryWriterCommitMessage(1, Seq(Row(3), Row(4))),
@@ -53,7 +53,7 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter {
))
assert(sink.latestBatchId.contains(0))
assert(sink.latestBatchData.map(_.getInt(0)).sorted == Seq(1, 2, 3, 4, 6, 7))
- writeSupport.commit(19,
+ write.commit(19,
Array(
MemoryWriterCommitMessage(3, Seq(Row(11), Row(22))),
MemoryWriterCommitMessage(0, Seq(Row(33)))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2UtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2UtilsSuite.scala
index f903c17..0b1e3b5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2UtilsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2UtilsSuite.scala
@@ -33,8 +33,8 @@ class DataSourceV2UtilsSuite extends SparkFunSuite {
conf.setConfString(s"spark.sql.$keyPrefix.config.name", "false")
conf.setConfString("spark.datasource.another.config.name", "123")
conf.setConfString(s"spark.datasource.$keyPrefix.", "123")
- val cs = classOf[DataSourceV2WithSessionConfig].getConstructor().newInstance()
- val confs = DataSourceV2Utils.extractSessionConfigs(cs.asInstanceOf[DataSourceV2], conf)
+ val source = new DataSourceV2WithSessionConfig
+ val confs = DataSourceV2Utils.extractSessionConfigs(source, conf)
assert(confs.size == 2)
assert(confs.keySet.filter(_.startsWith("spark.datasource")).size == 0)
assert(confs.keySet.filter(_.startsWith("not.exist.prefix")).size == 0)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index daca65f..c56a545 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -38,8 +38,7 @@ import org.apache.spark.util.SerializableConfiguration
* Each task writes data to `target/_temporary/uniqueId/$jobId-$partitionId-$attemptNumber`.
* Each job moves files from `target/_temporary/uniqueId/` to `target`.
*/
-class SimpleWritableDataSource extends DataSourceV2
- with TableProvider with SessionConfigSupport {
+class SimpleWritableDataSource extends TableProvider with SessionConfigSupport {
private val tableSchema = new StructType().add("i", "long").add("j", "long")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
index d3d210c..bad2259 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReader, ContinuousStream, PartitionOffset}
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.types.{DataType, IntegerType, StructType}
@@ -43,7 +43,7 @@ class ContinuousQueuedDataReaderSuite extends StreamTest with MockitoSugar {
override def beforeEach(): Unit = {
super.beforeEach()
epochEndpoint = EpochCoordinatorRef.create(
- mock[StreamingWriteSupport],
+ mock[StreamingWrite],
mock[ContinuousStream],
mock[ContinuousExecution],
coordinatorId,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
index a0b56ec..f74285f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.LocalSparkSession
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
import org.apache.spark.sql.test.TestSparkSession
class EpochCoordinatorSuite
@@ -40,13 +40,13 @@ class EpochCoordinatorSuite
private var epochCoordinator: RpcEndpointRef = _
- private var writeSupport: StreamingWriteSupport = _
+ private var writeSupport: StreamingWrite = _
private var query: ContinuousExecution = _
private var orderVerifier: InOrder = _
override def beforeEach(): Unit = {
val stream = mock[ContinuousStream]
- writeSupport = mock[StreamingWriteSupport]
+ writeSupport = mock[StreamingWrite]
query = mock[ContinuousExecution]
orderVerifier = inOrder(writeSupport, query)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index 62f1666..c841793 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming._
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
+import org.apache.spark.sql.sources.v2.writer.WriteBuilder
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamTest, Trigger}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
@@ -71,13 +71,10 @@ trait FakeContinuousReadTable extends Table with SupportsContinuousRead {
override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new FakeScanBuilder
}
-trait FakeStreamingWriteSupportProvider extends StreamingWriteSupportProvider {
- override def createStreamingWriteSupport(
- queryId: String,
- schema: StructType,
- mode: OutputMode,
- options: DataSourceOptions): StreamingWriteSupport = {
- LastWriteOptions.options = options
+trait FakeStreamingWriteTable extends Table with SupportsStreamingWrite {
+ override def name(): String = "fake"
+ override def schema(): StructType = StructType(Seq())
+ override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
throw new IllegalStateException("fake sink - cannot actually write")
}
}
@@ -129,20 +126,33 @@ class FakeReadNeitherMode extends DataSourceRegister with TableProvider {
}
}
-class FakeWriteSupportProvider
+class FakeWriteOnly
extends DataSourceRegister
- with FakeStreamingWriteSupportProvider
+ with TableProvider
with SessionConfigSupport {
override def shortName(): String = "fake-write-microbatch-continuous"
override def keyPrefix: String = shortName()
+
+ override def getTable(options: DataSourceOptions): Table = {
+ LastWriteOptions.options = options
+ new Table with FakeStreamingWriteTable {
+ override def name(): String = "fake"
+ override def schema(): StructType = StructType(Nil)
+ }
+ }
}
-class FakeNoWrite extends DataSourceRegister {
+class FakeNoWrite extends DataSourceRegister with TableProvider {
override def shortName(): String = "fake-write-neither-mode"
+ override def getTable(options: DataSourceOptions): Table = {
+ new Table {
+ override def name(): String = "fake"
+ override def schema(): StructType = StructType(Nil)
+ }
+ }
}
-
case class FakeWriteV1FallbackException() extends Exception
class FakeSink extends Sink {
@@ -150,17 +160,24 @@ class FakeSink extends Sink {
}
class FakeWriteSupportProviderV1Fallback extends DataSourceRegister
- with FakeStreamingWriteSupportProvider with StreamSinkProvider {
+ with TableProvider with StreamSinkProvider {
override def createSink(
- sqlContext: SQLContext,
- parameters: Map[String, String],
- partitionColumns: Seq[String],
- outputMode: OutputMode): Sink = {
+ sqlContext: SQLContext,
+ parameters: Map[String, String],
+ partitionColumns: Seq[String],
+ outputMode: OutputMode): Sink = {
new FakeSink()
}
override def shortName(): String = "fake-write-v1-fallback"
+
+ override def getTable(options: DataSourceOptions): Table = {
+ new Table with FakeStreamingWriteTable {
+ override def name(): String = "fake"
+ override def schema(): StructType = StructType(Nil)
+ }
+ }
}
object LastReadOptions {
@@ -260,7 +277,7 @@ class StreamingDataSourceV2Suite extends StreamTest {
testPositiveCaseWithQuery(
"fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once()) { v2Query =>
assert(v2Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
- .isInstanceOf[FakeWriteSupportProviderV1Fallback])
+ .isInstanceOf[Table])
}
// Ensure we create a V1 sink with the config. Note the config is a comma separated
@@ -319,19 +336,20 @@ class StreamingDataSourceV2Suite extends StreamTest {
for ((read, write, trigger) <- cases) {
testQuietly(s"stream with read format $read, write format $write, trigger $trigger") {
- val table = DataSource.lookupDataSource(read, spark.sqlContext.conf).getConstructor()
+ val sourceTable = DataSource.lookupDataSource(read, spark.sqlContext.conf).getConstructor()
+ .newInstance().asInstanceOf[TableProvider].getTable(DataSourceOptions.empty())
+
+ val sinkTable = DataSource.lookupDataSource(write, spark.sqlContext.conf).getConstructor()
.newInstance().asInstanceOf[TableProvider].getTable(DataSourceOptions.empty())
- val writeSource = DataSource.lookupDataSource(write, spark.sqlContext.conf).
- getConstructor().newInstance()
- (table, writeSource, trigger) match {
+ (sourceTable, sinkTable, trigger) match {
// Valid microbatch queries.
- case (_: SupportsMicroBatchRead, _: StreamingWriteSupportProvider, t)
+ case (_: SupportsMicroBatchRead, _: SupportsStreamingWrite, t)
if !t.isInstanceOf[ContinuousTrigger] =>
testPositiveCase(read, write, trigger)
// Valid continuous queries.
- case (_: SupportsContinuousRead, _: StreamingWriteSupportProvider,
+ case (_: SupportsContinuousRead, _: SupportsStreamingWrite,
_: ContinuousTrigger) =>
testPositiveCase(read, write, trigger)
@@ -342,12 +360,12 @@ class StreamingDataSourceV2Suite extends StreamTest {
s"Data source $read does not support streamed reading")
// Invalid - can't write
- case (_, w, _) if !w.isInstanceOf[StreamingWriteSupportProvider] =>
+ case (_, w, _) if !w.isInstanceOf[SupportsStreamingWrite] =>
testNegativeCase(read, write, trigger,
s"Data source $write does not support streamed writing")
// Invalid - trigger is continuous but reader is not
- case (r, _: StreamingWriteSupportProvider, _: ContinuousTrigger)
+ case (r, _: SupportsStreamingWrite, _: ContinuousTrigger)
if !r.isInstanceOf[SupportsContinuousRead] =>
testNegativeCase(read, write, trigger,
s"Data source $read does not support continuous processing")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org