You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/01/29 05:10:44 UTC
spark git commit: [SPARK-23196] Unify continuous and microbatch V2
sinks
Repository: spark
Updated Branches:
refs/heads/master 686a622c9 -> 49b0207dc
[SPARK-23196] Unify continuous and microbatch V2 sinks
## What changes were proposed in this pull request?
Replace streaming V2 sinks with a unified StreamWriteSupport interface, with a shim to use it with microbatch execution.
Add a new SQL config to use for disabling V2 sinks, falling back to the V1 sink implementation.
## How was this patch tested?
Existing tests, which in the case of Kafka (the only existing continuous V2 sink) now use V2 for microbatch.
Author: Jose Torres <jo...@databricks.com>
Closes #20369 from jose-torres/streaming-sink.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49b0207d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49b0207d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49b0207d
Branch: refs/heads/master
Commit: 49b0207dc9327989c72700b4d04d2a714c92e159
Parents: 686a622
Author: Jose Torres <jo...@databricks.com>
Authored: Mon Jan 29 13:10:38 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Jan 29 13:10:38 2018 +0800
----------------------------------------------------------------------
.../sql/kafka010/KafkaContinuousWriter.scala | 119 -------------------
.../sql/kafka010/KafkaSourceProvider.scala | 16 +--
.../spark/sql/kafka010/KafkaStreamWriter.scala | 115 ++++++++++++++++++
.../sql/kafka010/KafkaContinuousSinkSuite.scala | 8 +-
.../spark/sql/kafka010/KafkaSinkSuite.scala | 14 ++-
.../spark/sql/kafka010/KafkaSourceSuite.scala | 8 +-
.../org/apache/spark/sql/internal/SQLConf.scala | 9 ++
.../v2/streaming/ContinuousWriteSupport.java | 56 ---------
.../v2/streaming/MicroBatchWriteSupport.java | 60 ----------
.../v2/streaming/StreamWriteSupport.java | 54 +++++++++
.../v2/streaming/writer/ContinuousWriter.java | 44 -------
.../v2/streaming/writer/StreamWriter.java | 72 +++++++++++
.../sources/v2/writer/DataSourceV2Writer.java | 4 +-
.../datasources/v2/WriteToDataSourceV2.scala | 11 +-
.../streaming/MicroBatchExecution.scala | 19 +--
.../spark/sql/execution/streaming/console.scala | 27 ++---
.../continuous/ContinuousExecution.scala | 19 ++-
.../streaming/continuous/EpochCoordinator.scala | 9 +-
.../streaming/sources/ConsoleWriter.scala | 59 ++-------
.../streaming/sources/MicroBatchWriter.scala | 54 +++++++++
.../execution/streaming/sources/memoryV2.scala | 29 ++---
.../spark/sql/streaming/DataStreamWriter.scala | 10 +-
.../sql/streaming/StreamingQueryManager.scala | 9 +-
....apache.spark.sql.sources.DataSourceRegister | 7 +-
.../execution/streaming/MemorySinkV2Suite.scala | 2 +-
.../sources/StreamingDataSourceV2Suite.scala | 112 ++++++++---------
26 files changed, 457 insertions(+), 489 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousWriter.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousWriter.scala
deleted file mode 100644
index 9843f46..0000000
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousWriter.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.kafka010
-
-import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata}
-import scala.collection.JavaConverters._
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection}
-import org.apache.spark.sql.kafka010.KafkaSourceProvider.{kafkaParamsForProducer, TOPIC_OPTION_KEY}
-import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
-import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
-import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.sql.streaming.OutputMode
-import org.apache.spark.sql.types.{BinaryType, StringType, StructType}
-
-/**
- * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we
- * don't need to really send one.
- */
-case object KafkaWriterCommitMessage extends WriterCommitMessage
-
-/**
- * A [[ContinuousWriter]] for Kafka writing. Responsible for generating the writer factory.
- * @param topic The topic this writer is responsible for. If None, topic will be inferred from
- * a `topic` field in the incoming data.
- * @param producerParams Parameters for Kafka producers in each task.
- * @param schema The schema of the input data.
- */
-class KafkaContinuousWriter(
- topic: Option[String], producerParams: Map[String, String], schema: StructType)
- extends ContinuousWriter with SupportsWriteInternalRow {
-
- validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic)
-
- override def createInternalRowWriterFactory(): KafkaContinuousWriterFactory =
- KafkaContinuousWriterFactory(topic, producerParams, schema)
-
- override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
- override def abort(messages: Array[WriterCommitMessage]): Unit = {}
-}
-
-/**
- * A [[DataWriterFactory]] for Kafka writing. Will be serialized and sent to executors to generate
- * the per-task data writers.
- * @param topic The topic that should be written to. If None, topic will be inferred from
- * a `topic` field in the incoming data.
- * @param producerParams Parameters for Kafka producers in each task.
- * @param schema The schema of the input data.
- */
-case class KafkaContinuousWriterFactory(
- topic: Option[String], producerParams: Map[String, String], schema: StructType)
- extends DataWriterFactory[InternalRow] {
-
- override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
- new KafkaContinuousDataWriter(topic, producerParams, schema.toAttributes)
- }
-}
-
-/**
- * A [[DataWriter]] for Kafka writing. One data writer will be created in each partition to
- * process incoming rows.
- *
- * @param targetTopic The topic that this data writer is targeting. If None, topic will be inferred
- * from a `topic` field in the incoming data.
- * @param producerParams Parameters to use for the Kafka producer.
- * @param inputSchema The attributes in the input data.
- */
-class KafkaContinuousDataWriter(
- targetTopic: Option[String], producerParams: Map[String, String], inputSchema: Seq[Attribute])
- extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] {
- import scala.collection.JavaConverters._
-
- private lazy val producer = CachedKafkaProducer.getOrCreate(
- new java.util.HashMap[String, Object](producerParams.asJava))
-
- def write(row: InternalRow): Unit = {
- checkForErrors()
- sendRow(row, producer)
- }
-
- def commit(): WriterCommitMessage = {
- // Send is asynchronous, but we can't commit until all rows are actually in Kafka.
- // This requires flushing and then checking that no callbacks produced errors.
- // We also check for errors before to fail as soon as possible - the check is cheap.
- checkForErrors()
- producer.flush()
- checkForErrors()
- KafkaWriterCommitMessage
- }
-
- def abort(): Unit = {}
-
- def close(): Unit = {
- checkForErrors()
- if (producer != null) {
- producer.flush()
- checkForErrors()
- CachedKafkaProducer.close(new java.util.HashMap[String, Object](producerParams.asJava))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 62a998f..2deb7fa 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -28,11 +28,11 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext}
-import org.apache.spark.sql.execution.streaming.{Offset, Sink, Source}
+import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
-import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
-import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, ContinuousWriteSupport}
-import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
+import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
@@ -46,7 +46,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
with StreamSinkProvider
with RelationProvider
with CreatableRelationProvider
- with ContinuousWriteSupport
+ with StreamWriteSupport
with ContinuousReadSupport
with Logging {
import KafkaSourceProvider._
@@ -223,11 +223,11 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}
}
- override def createContinuousWriter(
+ override def createStreamWriter(
queryId: String,
schema: StructType,
mode: OutputMode,
- options: DataSourceV2Options): Optional[ContinuousWriter] = {
+ options: DataSourceV2Options): StreamWriter = {
import scala.collection.JavaConverters._
val spark = SparkSession.getActiveSession.get
@@ -238,7 +238,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
KafkaWriter.validateQuery(
schema.toAttributes, new java.util.HashMap[String, Object](producerParams.asJava), topic)
- Optional.of(new KafkaContinuousWriter(topic, producerParams, schema))
+ new KafkaStreamWriter(topic, producerParams, schema)
}
private def strategy(caseInsensitiveParams: Map[String, String]) =
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
new file mode 100644
index 0000000..a24efde
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
+import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we
+ * don't need to really send one.
+ */
+case object KafkaWriterCommitMessage extends WriterCommitMessage
+
+/**
+ * A [[StreamWriter]] for Kafka writing. Responsible for generating the writer factory.
+ *
+ * @param topic The topic this writer is responsible for. If None, topic will be inferred from
+ * a `topic` field in the incoming data.
+ * @param producerParams Parameters for Kafka producers in each task.
+ * @param schema The schema of the input data.
+ */
+class KafkaStreamWriter(
+ topic: Option[String], producerParams: Map[String, String], schema: StructType)
+ extends StreamWriter with SupportsWriteInternalRow {
+
+ validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic)
+
+ override def createInternalRowWriterFactory(): KafkaStreamWriterFactory =
+ KafkaStreamWriterFactory(topic, producerParams, schema)
+
+ override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
+ override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
+}
+
+/**
+ * A [[DataWriterFactory]] for Kafka writing. Will be serialized and sent to executors to generate
+ * the per-task data writers.
+ * @param topic The topic that should be written to. If None, topic will be inferred from
+ * a `topic` field in the incoming data.
+ * @param producerParams Parameters for Kafka producers in each task.
+ * @param schema The schema of the input data.
+ */
+case class KafkaStreamWriterFactory(
+ topic: Option[String], producerParams: Map[String, String], schema: StructType)
+ extends DataWriterFactory[InternalRow] {
+
+ override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
+ new KafkaStreamDataWriter(topic, producerParams, schema.toAttributes)
+ }
+}
+
+/**
+ * A [[DataWriter]] for Kafka writing. One data writer will be created in each partition to
+ * process incoming rows.
+ *
+ * @param targetTopic The topic that this data writer is targeting. If None, topic will be inferred
+ * from a `topic` field in the incoming data.
+ * @param producerParams Parameters to use for the Kafka producer.
+ * @param inputSchema The attributes in the input data.
+ */
+class KafkaStreamDataWriter(
+ targetTopic: Option[String], producerParams: Map[String, String], inputSchema: Seq[Attribute])
+ extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] {
+ import scala.collection.JavaConverters._
+
+ private lazy val producer = CachedKafkaProducer.getOrCreate(
+ new java.util.HashMap[String, Object](producerParams.asJava))
+
+ def write(row: InternalRow): Unit = {
+ checkForErrors()
+ sendRow(row, producer)
+ }
+
+ def commit(): WriterCommitMessage = {
+ // Send is asynchronous, but we can't commit until all rows are actually in Kafka.
+ // This requires flushing and then checking that no callbacks produced errors.
+ // We also check for errors before to fail as soon as possible - the check is cheap.
+ checkForErrors()
+ producer.flush()
+ checkForErrors()
+ KafkaWriterCommitMessage
+ }
+
+ def abort(): Unit = {}
+
+ def close(): Unit = {
+ checkForErrors()
+ if (producer != null) {
+ producer.flush()
+ checkForErrors()
+ CachedKafkaProducer.close(new java.util.HashMap[String, Object](producerParams.asJava))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
index 8487a69..fc890a0 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
@@ -18,16 +18,14 @@
package org.apache.spark.sql.kafka010
import java.util.Locale
-import java.util.concurrent.atomic.AtomicInteger
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.scalatest.time.SpanSugar._
import scala.collection.JavaConverters._
-import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
+import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection}
-import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types.{BinaryType, DataType}
import org.apache.spark.util.Utils
@@ -362,7 +360,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
} finally {
writer.stop()
}
- assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("job aborted"))
+ assert(ex.getCause.getCause.getMessage.toLowerCase(Locale.ROOT).contains("job aborted"))
}
test("streaming - exception on config serializer") {
@@ -424,7 +422,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
options.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
val inputSchema = Seq(AttributeReference("value", BinaryType)())
val data = new Array[Byte](15000) // large value
- val writeTask = new KafkaContinuousDataWriter(Some(topic), options.asScala.toMap, inputSchema)
+ val writeTask = new KafkaStreamDataWriter(Some(topic), options.asScala.toMap, inputSchema)
try {
val fieldTypes: Array[DataType] = Array(BinaryType)
val converter = UnsafeProjection.create(fieldTypes)
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
index 2ab336c..42f8b4c 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -336,27 +336,31 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
} finally {
writer.stop()
}
- assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("job aborted"))
+ assert(ex.getCause.getCause.getMessage.toLowerCase(Locale.ROOT).contains("job aborted"))
}
test("streaming - exception on config serializer") {
val input = MemoryStream[String]
var writer: StreamingQuery = null
var ex: Exception = null
- ex = intercept[IllegalArgumentException] {
+ ex = intercept[StreamingQueryException] {
writer = createKafkaWriter(
input.toDF(),
withOptions = Map("kafka.key.serializer" -> "foo"))()
+ input.addData("1")
+ writer.processAllAvailable()
}
- assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
+ assert(ex.getCause.getMessage.toLowerCase(Locale.ROOT).contains(
"kafka option 'key.serializer' is not supported"))
- ex = intercept[IllegalArgumentException] {
+ ex = intercept[StreamingQueryException] {
writer = createKafkaWriter(
input.toDF(),
withOptions = Map("kafka.value.serializer" -> "foo"))()
+ input.addData("1")
+ writer.processAllAvailable()
}
- assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
+ assert(ex.getCause.getMessage.toLowerCase(Locale.ROOT).contains(
"kafka option 'value.serializer' is not supported"))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index c4cb1bc..02c8764 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -29,19 +29,17 @@ import scala.util.Random
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition
-import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkContext
-import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row}
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2Exec}
+import org.apache.spark.sql.{Dataset, ForeachWriter}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
-import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryWriter
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
import org.apache.spark.util.Utils
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 76b9d6f..2c70b00 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1127,6 +1127,13 @@ object SQLConf {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(100)
+ val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers")
+ .internal()
+ .doc("A comma-separated list of fully qualified data source register class names for which" +
+ " StreamWriteSupport is disabled. Writes to these sources will fail back to the V1 Sink.")
+ .stringConf
+ .createWithDefault("")
+
object PartitionOverwriteMode extends Enumeration {
val STATIC, DYNAMIC = Value
}
@@ -1494,6 +1501,8 @@ class SQLConf extends Serializable with Logging {
def continuousStreamingExecutorPollIntervalMs: Long =
getConf(CONTINUOUS_STREAMING_EXECUTOR_POLL_INTERVAL_MS)
+ def disabledV2StreamingWriters: String = getConf(DISABLED_V2_STREAMING_WRITERS)
+
def concatBinaryAsString: Boolean = getConf(CONCAT_BINARY_AS_STRING)
def eltOutputAsString: Boolean = getConf(ELT_OUTPUT_AS_STRING)
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousWriteSupport.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousWriteSupport.java
deleted file mode 100644
index dee493c..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousWriteSupport.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.DataSourceV2Options;
-import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter;
-import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
-import org.apache.spark.sql.streaming.OutputMode;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
- * provide data writing ability for continuous stream processing.
- */
-@InterfaceStability.Evolving
-public interface ContinuousWriteSupport extends BaseStreamingSink {
-
- /**
- * Creates an optional {@link ContinuousWriter} to save the data to this data source. Data
- * sources can return None if there is no writing needed to be done.
- *
- * @param queryId A unique string for the writing query. It's possible that there are many
- * writing queries running at the same time, and the returned
- * {@link DataSourceV2Writer} can use this id to distinguish itself from others.
- * @param schema the schema of the data to be written.
- * @param mode the output mode which determines what successive epoch output means to this
- * sink, please refer to {@link OutputMode} for more details.
- * @param options the options for the returned data source writer, which is an immutable
- * case-insensitive string-to-string map.
- */
- Optional<ContinuousWriter> createContinuousWriter(
- String queryId,
- StructType schema,
- OutputMode mode,
- DataSourceV2Options options);
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchWriteSupport.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchWriteSupport.java
deleted file mode 100644
index 53ffa95..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchWriteSupport.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.DataSourceV2Options;
-import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
-import org.apache.spark.sql.streaming.OutputMode;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
- * provide data writing ability and save the data from a microbatch to the data source.
- */
-@InterfaceStability.Evolving
-public interface MicroBatchWriteSupport extends BaseStreamingSink {
-
- /**
- * Creates an optional {@link DataSourceV2Writer} to save the data to this data source. Data
- * sources can return None if there is no writing needed to be done.
- *
- * @param queryId A unique string for the writing query. It's possible that there are many writing
- * queries running at the same time, and the returned {@link DataSourceV2Writer}
- * can use this id to distinguish itself from others.
- * @param epochId The unique numeric ID of the batch within this writing query. This is an
- * incrementing counter representing a consistent set of data; the same batch may
- * be started multiple times in failure recovery scenarios, but it will always
- * contain the same records.
- * @param schema the schema of the data to be written.
- * @param mode the output mode which determines what successive batch output means to this
- * sink, please refer to {@link OutputMode} for more details.
- * @param options the options for the returned data source writer, which is an immutable
- * case-insensitive string-to-string map.
- */
- Optional<DataSourceV2Writer> createMicroBatchWriter(
- String queryId,
- long epochId,
- StructType schema,
- OutputMode mode,
- DataSourceV2Options options);
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java
new file mode 100644
index 0000000..6cd219c
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter;
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
+ * provide data writing ability for structured streaming.
+ */
+@InterfaceStability.Evolving
+public interface StreamWriteSupport extends BaseStreamingSink {
+
+ /**
+ * Creates an optional {@link StreamWriter} to save the data to this data source. Data
+ * sources can return None if there is no writing needed to be done.
+ *
+ * @param queryId A unique string for the writing query. It's possible that there are many
+ * writing queries running at the same time, and the returned
+ * {@link DataSourceV2Writer} can use this id to distinguish itself from others.
+ * @param schema the schema of the data to be written.
+ * @param mode the output mode which determines what successive epoch output means to this
+ * sink, please refer to {@link OutputMode} for more details.
+ * @param options the options for the returned data source writer, which is an immutable
+ * case-insensitive string-to-string map.
+ */
+ StreamWriter createStreamWriter(
+ String queryId,
+ StructType schema,
+ OutputMode mode,
+ DataSourceV2Options options);
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/ContinuousWriter.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/ContinuousWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/ContinuousWriter.java
deleted file mode 100644
index 723395b..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/ContinuousWriter.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming.writer;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
-import org.apache.spark.sql.sources.v2.writer.DataWriter;
-import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
-
-/**
- * A {@link DataSourceV2Writer} for use with continuous stream processing.
- */
-@InterfaceStability.Evolving
-public interface ContinuousWriter extends DataSourceV2Writer {
- /**
- * Commits this writing job for the specified epoch with a list of commit messages. The commit
- * messages are collected from successful data writers and are produced by
- * {@link DataWriter#commit()}.
- *
- * If this method fails (by throwing an exception), this writing job is considered to have been
- * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
- */
- void commit(long epochId, WriterCommitMessage[] messages);
-
- default void commit(WriterCommitMessage[] messages) {
- throw new UnsupportedOperationException(
- "Commit without epoch should not be called with ContinuousWriter");
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
new file mode 100644
index 0000000..3156c88
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+
+/**
+ * A {@link DataSourceV2Writer} for use with structured streaming. This writer handles commits and
+ * aborts relative to an epoch ID determined by the execution engine.
+ *
+ * {@link DataWriter} implementations generated by a StreamWriter may be reused for multiple epochs,
+ * and so must reset any internal state after a successful commit.
+ */
+@InterfaceStability.Evolving
+public interface StreamWriter extends DataSourceV2Writer {
+ /**
+ * Commits this writing job for the specified epoch with a list of commit messages. The commit
+ * messages are collected from successful data writers and are produced by
+ * {@link DataWriter#commit()}.
+ *
+ * If this method fails (by throwing an exception), this writing job is considered to have been
+ * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
+ *
+ * To support exactly-once processing, writer implementations should ensure that this method is
+ * idempotent. The execution engine may call commit() multiple times for the same epoch
+ * in some circumstances.
+ */
+ void commit(long epochId, WriterCommitMessage[] messages);
+
+ /**
+ * Aborts this writing job because some data writers are failed and keep failing when retry, or
+ * the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails.
+ *
+ * If this method fails (by throwing an exception), the underlying data source may require manual
+ * cleanup.
+ *
+ * Unless the abort is triggered by the failure of commit, the given messages should have some
+ * null slots as there maybe only a few data writers that are committed before the abort
+ * happens, or some data writers were committed but their commit messages haven't reached the
+ * driver when the abort is triggered. So this is just a "best effort" for data sources to
+ * clean up the data left by data writers.
+ */
+ void abort(long epochId, WriterCommitMessage[] messages);
+
+ default void commit(WriterCommitMessage[] messages) {
+ throw new UnsupportedOperationException(
+ "Commit without epoch should not be called with StreamWriter");
+ }
+
+ default void abort(WriterCommitMessage[] messages) {
+ throw new UnsupportedOperationException(
+ "Abort without epoch should not be called with StreamWriter");
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
index f1ef411..8048f50 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
@@ -28,9 +28,7 @@ import org.apache.spark.sql.types.StructType;
/**
* A data source writer that is returned by
* {@link WriteSupport#createWriter(String, StructType, SaveMode, DataSourceV2Options)}/
- * {@link org.apache.spark.sql.sources.v2.streaming.MicroBatchWriteSupport#createMicroBatchWriter(
- * String, long, StructType, OutputMode, DataSourceV2Options)}/
- * {@link org.apache.spark.sql.sources.v2.streaming.ContinuousWriteSupport#createContinuousWriter(
+ * {@link org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport#createStreamWriter(
* String, StructType, OutputMode, DataSourceV2Options)}.
* It can mix in various writing optimization interfaces to speed up the data saving. The actual
* writing logic is delegated to {@link DataWriter}.
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
index 3dbdae7..cd6b3e9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
@@ -26,9 +26,8 @@ import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.streaming.continuous.{CommitPartitionEpoch, ContinuousExecution, EpochCoordinatorRef, SetWriterPartitions}
-import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
+import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
@@ -62,7 +61,9 @@ case class WriteToDataSourceV2Exec(writer: DataSourceV2Writer, query: SparkPlan)
try {
val runTask = writer match {
- case w: ContinuousWriter =>
+ // This case means that we're doing continuous processing. In microbatch streaming, the
+ // StreamWriter is wrapped in a MicroBatchWriter, which is executed as a normal batch.
+ case w: StreamWriter =>
EpochCoordinatorRef.get(
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
sparkContext.env)
@@ -82,13 +83,13 @@ case class WriteToDataSourceV2Exec(writer: DataSourceV2Writer, query: SparkPlan)
(index, message: WriterCommitMessage) => messages(index) = message
)
- if (!writer.isInstanceOf[ContinuousWriter]) {
+ if (!writer.isInstanceOf[StreamWriter]) {
logInfo(s"Data source writer $writer is committing.")
writer.commit(messages)
logInfo(s"Data source writer $writer committed.")
}
} catch {
- case _: InterruptedException if writer.isInstanceOf[ContinuousWriter] =>
+ case _: InterruptedException if writer.isInstanceOf[StreamWriter] =>
// Interruption is how continuous queries are ended, so accept and ignore the exception.
case cause: Throwable =>
logError(s"Data source writer $writer is aborting.")
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 7c38045..9759752 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -28,9 +28,11 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Curre
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2}
+import org.apache.spark.sql.execution.streaming.sources.{InternalRowMicroBatchWriter, MicroBatchWriter}
import org.apache.spark.sql.sources.v2.DataSourceV2Options
-import org.apache.spark.sql.sources.v2.streaming.{MicroBatchReadSupport, MicroBatchWriteSupport}
+import org.apache.spark.sql.sources.v2.streaming.{MicroBatchReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset => OffsetV2}
+import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.{Clock, Utils}
@@ -440,15 +442,18 @@ class MicroBatchExecution(
val triggerLogicalPlan = sink match {
case _: Sink => newAttributePlan
- case s: MicroBatchWriteSupport =>
- val writer = s.createMicroBatchWriter(
+ case s: StreamWriteSupport =>
+ val writer = s.createStreamWriter(
s"$runId",
- currentBatchId,
newAttributePlan.schema,
outputMode,
new DataSourceV2Options(extraOptions.asJava))
- assert(writer.isPresent, "microbatch writer must always be present")
- WriteToDataSourceV2(writer.get, newAttributePlan)
+ if (writer.isInstanceOf[SupportsWriteInternalRow]) {
+ WriteToDataSourceV2(
+ new InternalRowMicroBatchWriter(currentBatchId, writer), newAttributePlan)
+ } else {
+ WriteToDataSourceV2(new MicroBatchWriter(currentBatchId, writer), newAttributePlan)
+ }
case _ => throw new IllegalArgumentException(s"unknown sink type for $sink")
}
@@ -471,7 +476,7 @@ class MicroBatchExecution(
SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) {
sink match {
case s: Sink => s.addBatch(currentBatchId, nextBatch)
- case s: MicroBatchWriteSupport =>
+ case _: StreamWriteSupport =>
// This doesn't accumulate any data - it just forces execution of the microbatch writer.
nextBatch.collect()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
index f2aa325..d5ac0bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
@@ -17,15 +17,12 @@
package org.apache.spark.sql.execution.streaming
-import java.util.Optional
-
import org.apache.spark.sql._
-import org.apache.spark.sql.execution.streaming.sources.{ConsoleContinuousWriter, ConsoleMicroBatchWriter, ConsoleWriter}
+import org.apache.spark.sql.execution.streaming.sources.ConsoleWriter
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister}
import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
-import org.apache.spark.sql.sources.v2.streaming.{ContinuousWriteSupport, MicroBatchWriteSupport}
-import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
-import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer
+import org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport
+import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
@@ -35,26 +32,16 @@ case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame)
}
class ConsoleSinkProvider extends DataSourceV2
- with MicroBatchWriteSupport
- with ContinuousWriteSupport
+ with StreamWriteSupport
with DataSourceRegister
with CreatableRelationProvider {
- override def createMicroBatchWriter(
- queryId: String,
- batchId: Long,
- schema: StructType,
- mode: OutputMode,
- options: DataSourceV2Options): Optional[DataSourceV2Writer] = {
- Optional.of(new ConsoleMicroBatchWriter(batchId, schema, options))
- }
-
- override def createContinuousWriter(
+ override def createStreamWriter(
queryId: String,
schema: StructType,
mode: OutputMode,
- options: DataSourceV2Options): Optional[ContinuousWriter] = {
- Optional.of(new ConsoleContinuousWriter(schema, options))
+ options: DataSourceV2Options): StreamWriter = {
+ new ConsoleWriter(schema, options)
}
def createRelation(
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 462e7d9..60f880f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -24,17 +24,16 @@ import java.util.function.UnaryOperator
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
-import org.apache.spark.{SparkEnv, SparkException}
-import org.apache.spark.sql.{AnalysisException, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, StreamingDataSourceV2Relation, WriteToDataSourceV2}
import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
import org.apache.spark.sql.sources.v2.DataSourceV2Options
-import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, ContinuousWriteSupport}
-import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, Offset, PartitionOffset}
-import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
+import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, PartitionOffset}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{Clock, Utils}
@@ -44,7 +43,7 @@ class ContinuousExecution(
name: String,
checkpointRoot: String,
analyzedPlan: LogicalPlan,
- sink: ContinuousWriteSupport,
+ sink: StreamWriteSupport,
trigger: Trigger,
triggerClock: Clock,
outputMode: OutputMode,
@@ -195,12 +194,12 @@ class ContinuousExecution(
"CurrentTimestamp and CurrentDate not yet supported for continuous processing")
}
- val writer = sink.createContinuousWriter(
+ val writer = sink.createStreamWriter(
s"$runId",
triggerLogicalPlan.schema,
outputMode,
new DataSourceV2Options(extraOptions.asJava))
- val withSink = WriteToDataSourceV2(writer.get(), triggerLogicalPlan)
+ val withSink = WriteToDataSourceV2(writer, triggerLogicalPlan)
val reader = withSink.collect {
case DataSourceV2Relation(_, r: ContinuousReader) => r
@@ -230,7 +229,7 @@ class ContinuousExecution(
// Use the parent Spark session for the endpoint since it's where this query ID is registered.
val epochEndpoint =
EpochCoordinatorRef.create(
- writer.get(), reader, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get)
+ writer, reader, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get)
val epochUpdateThread = new Thread(new Runnable {
override def run: Unit = {
try {
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
index 90b3584..84d2621 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
@@ -17,17 +17,14 @@
package org.apache.spark.sql.execution.streaming.continuous
-import java.util.concurrent.atomic.AtomicLong
-
import scala.collection.mutable
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, PartitionOffset}
-import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
+import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
import org.apache.spark.util.RpcUtils
@@ -85,7 +82,7 @@ private[sql] object EpochCoordinatorRef extends Logging {
* Create a reference to a new [[EpochCoordinator]].
*/
def create(
- writer: ContinuousWriter,
+ writer: StreamWriter,
reader: ContinuousReader,
query: ContinuousExecution,
epochCoordinatorId: String,
@@ -118,7 +115,7 @@ private[sql] object EpochCoordinatorRef extends Logging {
* have both committed and reported an end offset for a given epoch.
*/
private[continuous] class EpochCoordinator(
- writer: ContinuousWriter,
+ writer: StreamWriter,
reader: ContinuousReader,
query: ContinuousExecution,
startEpoch: Long,
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
index 6fb61df..7c1700f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
@@ -20,14 +20,13 @@ package org.apache.spark.sql.execution.streaming.sources
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.sources.v2.DataSourceV2Options
-import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
-import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, WriterCommitMessage}
+import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
+import org.apache.spark.sql.sources.v2.writer.{DataWriterFactory, WriterCommitMessage}
import org.apache.spark.sql.types.StructType
/** Common methods used to create writes for the the console sink */
-trait ConsoleWriter extends Logging {
-
- def options: DataSourceV2Options
+class ConsoleWriter(schema: StructType, options: DataSourceV2Options)
+ extends StreamWriter with Logging {
// Number of rows to display, by default 20 rows
protected val numRowsToShow = options.getInt("numRows", 20)
@@ -40,14 +39,20 @@ trait ConsoleWriter extends Logging {
def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory
- def abort(messages: Array[WriterCommitMessage]): Unit = {}
+ override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
+ // We have to print a "Batch" label for the epoch for compatibility with the pre-data source V2
+ // behavior.
+ printRows(messages, schema, s"Batch: $epochId")
+ }
+
+ def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
protected def printRows(
commitMessages: Array[WriterCommitMessage],
schema: StructType,
printMessage: String): Unit = {
val rows = commitMessages.collect {
- case PackedRowCommitMessage(rows) => rows
+ case PackedRowCommitMessage(rs) => rs
}.flatten
// scalastyle:off println
@@ -59,46 +64,8 @@ trait ConsoleWriter extends Logging {
.createDataFrame(spark.sparkContext.parallelize(rows), schema)
.show(numRowsToShow, isTruncated)
}
-}
-
-
-/**
- * A [[DataSourceV2Writer]] that collects results from a micro-batch query to the driver and
- * prints them in the console. Created by
- * [[org.apache.spark.sql.execution.streaming.ConsoleSinkProvider]].
- *
- * This sink should not be used for production, as it requires sending all rows to the driver
- * and does not support recovery.
- */
-class ConsoleMicroBatchWriter(batchId: Long, schema: StructType, val options: DataSourceV2Options)
- extends DataSourceV2Writer with ConsoleWriter {
-
- override def commit(messages: Array[WriterCommitMessage]): Unit = {
- printRows(messages, schema, s"Batch: $batchId")
- }
-
- override def toString(): String = {
- s"ConsoleMicroBatchWriter[numRows=$numRowsToShow, truncate=$isTruncated]"
- }
-}
-
-
-/**
- * A [[DataSourceV2Writer]] that collects results from a continuous query to the driver and
- * prints them in the console. Created by
- * [[org.apache.spark.sql.execution.streaming.ConsoleSinkProvider]].
- *
- * This sink should not be used for production, as it requires sending all rows to the driver
- * and does not support recovery.
- */
-class ConsoleContinuousWriter(schema: StructType, val options: DataSourceV2Options)
- extends ContinuousWriter with ConsoleWriter {
-
- override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
- printRows(messages, schema, s"Continuous processing epoch $epochId")
- }
override def toString(): String = {
- s"ConsoleContinuousWriter[numRows=$numRowsToShow, truncate=$isTruncated]"
+ s"ConsoleWriter[numRows=$numRowsToShow, truncate=$isTruncated]"
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWriter.scala
new file mode 100644
index 0000000..d7f3ba8
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWriter.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
+import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, SupportsWriteInternalRow, WriterCommitMessage}
+
+/**
+ * A [[DataSourceV2Writer]] used to hook V2 stream writers into a microbatch plan. It implements
+ * the non-streaming interface, forwarding the batch ID determined at construction to a wrapped
+ * streaming writer.
+ */
+class MicroBatchWriter(batchId: Long, writer: StreamWriter) extends DataSourceV2Writer {
+ override def commit(messages: Array[WriterCommitMessage]): Unit = {
+ writer.commit(batchId, messages)
+ }
+
+ override def abort(messages: Array[WriterCommitMessage]): Unit = writer.abort(batchId, messages)
+
+ override def createWriterFactory(): DataWriterFactory[Row] = writer.createWriterFactory()
+}
+
+class InternalRowMicroBatchWriter(batchId: Long, writer: StreamWriter)
+ extends DataSourceV2Writer with SupportsWriteInternalRow {
+ override def commit(messages: Array[WriterCommitMessage]): Unit = {
+ writer.commit(batchId, messages)
+ }
+
+ override def abort(messages: Array[WriterCommitMessage]): Unit = writer.abort(batchId, messages)
+
+ override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] =
+ writer match {
+ case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory()
+ case _ => throw new IllegalStateException(
+ "InternalRowMicroBatchWriter should only be created with base writer support")
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
index da7c31c..ce55e44 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
@@ -30,8 +30,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update}
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
-import org.apache.spark.sql.sources.v2.streaming.{ContinuousWriteSupport, MicroBatchWriteSupport}
-import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
+import org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport
+import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
@@ -40,24 +40,13 @@ import org.apache.spark.sql.types.StructType
* A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit
* tests and does not provide durability.
*/
-class MemorySinkV2 extends DataSourceV2
- with MicroBatchWriteSupport with ContinuousWriteSupport with Logging {
-
- override def createMicroBatchWriter(
+class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with Logging {
+ override def createStreamWriter(
queryId: String,
- batchId: Long,
schema: StructType,
mode: OutputMode,
- options: DataSourceV2Options): java.util.Optional[DataSourceV2Writer] = {
- java.util.Optional.of(new MemoryWriter(this, batchId, mode))
- }
-
- override def createContinuousWriter(
- queryId: String,
- schema: StructType,
- mode: OutputMode,
- options: DataSourceV2Options): java.util.Optional[ContinuousWriter] = {
- java.util.Optional.of(new ContinuousMemoryWriter(this, mode))
+ options: DataSourceV2Options): StreamWriter = {
+ new MemoryStreamWriter(this, mode)
}
private case class AddedData(batchId: Long, data: Array[Row])
@@ -141,8 +130,8 @@ class MemoryWriter(sink: MemorySinkV2, batchId: Long, outputMode: OutputMode)
}
}
-class ContinuousMemoryWriter(val sink: MemorySinkV2, outputMode: OutputMode)
- extends ContinuousWriter {
+class MemoryStreamWriter(val sink: MemorySinkV2, outputMode: OutputMode)
+ extends StreamWriter {
override def createWriterFactory: MemoryWriterFactory = MemoryWriterFactory(outputMode)
@@ -153,7 +142,7 @@ class ContinuousMemoryWriter(val sink: MemorySinkV2, outputMode: OutputMode)
sink.write(epochId, outputMode, newRows)
}
- override def abort(messages: Array[WriterCommitMessage]): Unit = {
+ override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
// Don't accept any of the new input.
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index d24f0dd..3b5b30d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
import org.apache.spark.sql.execution.streaming.sources.{MemoryPlanV2, MemorySinkV2}
-import org.apache.spark.sql.sources.v2.streaming.{ContinuousWriteSupport, MicroBatchWriteSupport}
+import org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport
/**
* Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems,
@@ -281,11 +281,9 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
trigger = trigger)
} else {
val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
- val sink = (ds.newInstance(), trigger) match {
- case (w: ContinuousWriteSupport, _: ContinuousTrigger) => w
- case (_, _: ContinuousTrigger) => throw new UnsupportedOperationException(
- s"Data source $source does not support continuous writing")
- case (w: MicroBatchWriteSupport, _) => w
+ val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",")
+ val sink = ds.newInstance() match {
+ case w: StreamWriteSupport if !disabledSources.contains(w.getClass.getCanonicalName) => w
case _ =>
val ds = DataSource(
df.sparkSession,
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 4b27e0d..fdd709c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, ContinuousTrigger}
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.streaming.{ContinuousWriteSupport, MicroBatchWriteSupport}
+import org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport
import org.apache.spark.util.{Clock, SystemClock, Utils}
/**
@@ -241,7 +241,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
}
(sink, trigger) match {
- case (v2Sink: ContinuousWriteSupport, trigger: ContinuousTrigger) =>
+ case (v2Sink: StreamWriteSupport, trigger: ContinuousTrigger) =>
UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode)
new StreamingQueryWrapper(new ContinuousExecution(
sparkSession,
@@ -254,7 +254,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
outputMode,
extraOptions,
deleteCheckpointOnStop))
- case (_: MicroBatchWriteSupport, _) | (_: Sink, _) =>
+ case _ =>
new StreamingQueryWrapper(new MicroBatchExecution(
sparkSession,
userSpecifiedName.orNull,
@@ -266,9 +266,6 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
outputMode,
extraOptions,
deleteCheckpointOnStop))
- case (_: ContinuousWriteSupport, t) if !t.isInstanceOf[ContinuousTrigger] =>
- throw new AnalysisException(
- "Sink only supports continuous writes, but a continuous trigger was not specified.")
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index a0b25b4..46b38be 100644
--- a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -9,7 +9,6 @@ org.apache.spark.sql.streaming.sources.FakeReadMicroBatchOnly
org.apache.spark.sql.streaming.sources.FakeReadContinuousOnly
org.apache.spark.sql.streaming.sources.FakeReadBothModes
org.apache.spark.sql.streaming.sources.FakeReadNeitherMode
-org.apache.spark.sql.streaming.sources.FakeWriteMicroBatchOnly
-org.apache.spark.sql.streaming.sources.FakeWriteContinuousOnly
-org.apache.spark.sql.streaming.sources.FakeWriteBothModes
-org.apache.spark.sql.streaming.sources.FakeWriteNeitherMode
+org.apache.spark.sql.streaming.sources.FakeWrite
+org.apache.spark.sql.streaming.sources.FakeNoWrite
+org.apache.spark.sql.streaming.sources.FakeWriteV1Fallback
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
index 00d4f0b..9be22d9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala
@@ -40,7 +40,7 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter {
test("continuous writer") {
val sink = new MemorySinkV2
- val writer = new ContinuousMemoryWriter(sink, OutputMode.Append())
+ val writer = new MemoryStreamWriter(sink, OutputMode.Append())
writer.commit(0,
Array(
MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))),
http://git-wip-us.apache.org/repos/asf/spark/blob/49b0207d/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index f152174..d4f8bae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -19,18 +19,18 @@ package org.apache.spark.sql.streaming.sources
import java.util.Optional
-import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.execution.datasources.DataSource
-import org.apache.spark.sql.execution.streaming.{LongOffset, RateStreamOffset}
+import org.apache.spark.sql.execution.streaming.{RateStreamOffset, Sink, StreamingQueryWrapper}
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
-import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
+import org.apache.spark.sql.sources.v2.DataSourceV2Options
import org.apache.spark.sql.sources.v2.reader.ReadTask
-import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, ContinuousWriteSupport, MicroBatchReadSupport, MicroBatchWriteSupport}
+import org.apache.spark.sql.sources.v2.streaming._
import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, MicroBatchReader, Offset, PartitionOffset}
-import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
-import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer
-import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
+import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
@@ -64,23 +64,12 @@ trait FakeContinuousReadSupport extends ContinuousReadSupport {
options: DataSourceV2Options): ContinuousReader = FakeReader()
}
-trait FakeMicroBatchWriteSupport extends MicroBatchWriteSupport {
- def createMicroBatchWriter(
+trait FakeStreamWriteSupport extends StreamWriteSupport {
+ override def createStreamWriter(
queryId: String,
- epochId: Long,
schema: StructType,
mode: OutputMode,
- options: DataSourceV2Options): Optional[DataSourceV2Writer] = {
- throw new IllegalStateException("fake sink - cannot actually write")
- }
-}
-
-trait FakeContinuousWriteSupport extends ContinuousWriteSupport {
- def createContinuousWriter(
- queryId: String,
- schema: StructType,
- mode: OutputMode,
- options: DataSourceV2Options): Optional[ContinuousWriter] = {
+ options: DataSourceV2Options): StreamWriter = {
throw new IllegalStateException("fake sink - cannot actually write")
}
}
@@ -102,23 +91,36 @@ class FakeReadNeitherMode extends DataSourceRegister {
override def shortName(): String = "fake-read-neither-mode"
}
-class FakeWriteMicroBatchOnly extends DataSourceRegister with FakeMicroBatchWriteSupport {
- override def shortName(): String = "fake-write-microbatch-only"
+class FakeWrite extends DataSourceRegister with FakeStreamWriteSupport {
+ override def shortName(): String = "fake-write-microbatch-continuous"
}
-class FakeWriteContinuousOnly extends DataSourceRegister with FakeContinuousWriteSupport {
- override def shortName(): String = "fake-write-continuous-only"
+class FakeNoWrite extends DataSourceRegister {
+ override def shortName(): String = "fake-write-neither-mode"
}
-class FakeWriteBothModes extends DataSourceRegister
- with FakeMicroBatchWriteSupport with FakeContinuousWriteSupport {
- override def shortName(): String = "fake-write-microbatch-continuous"
+
+case class FakeWriteV1FallbackException() extends Exception
+
+class FakeSink extends Sink {
+ override def addBatch(batchId: Long, data: DataFrame): Unit = {}
}
-class FakeWriteNeitherMode extends DataSourceRegister {
- override def shortName(): String = "fake-write-neither-mode"
+class FakeWriteV1Fallback extends DataSourceRegister
+ with FakeStreamWriteSupport with StreamSinkProvider {
+
+ override def createSink(
+ sqlContext: SQLContext,
+ parameters: Map[String, String],
+ partitionColumns: Seq[String],
+ outputMode: OutputMode): Sink = {
+ new FakeSink()
+ }
+
+ override def shortName(): String = "fake-write-v1-fallback"
}
+
class StreamingDataSourceV2Suite extends StreamTest {
override def beforeAll(): Unit = {
@@ -133,8 +135,6 @@ class StreamingDataSourceV2Suite extends StreamTest {
"fake-read-microbatch-continuous",
"fake-read-neither-mode")
val writeFormats = Seq(
- "fake-write-microbatch-only",
- "fake-write-continuous-only",
"fake-write-microbatch-continuous",
"fake-write-neither-mode")
val triggers = Seq(
@@ -151,6 +151,7 @@ class StreamingDataSourceV2Suite extends StreamTest {
.trigger(trigger)
.start()
query.stop()
+ query
}
private def testNegativeCase(
@@ -184,6 +185,24 @@ class StreamingDataSourceV2Suite extends StreamTest {
}
}
+ test("disabled v2 write") {
+ // Ensure the V2 path works normally and generates a V2 sink..
+ val v2Query = testPositiveCase(
+ "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once())
+ assert(v2Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
+ .isInstanceOf[FakeWriteV1Fallback])
+
+ // Ensure we create a V1 sink with the config. Note the config is a comma separated
+ // list, including other fake entries.
+ val fullSinkName = "org.apache.spark.sql.streaming.sources.FakeWriteV1Fallback"
+ withSQLConf(SQLConf.DISABLED_V2_STREAMING_WRITERS.key -> s"a,b,c,test,$fullSinkName,d,e") {
+ val v1Query = testPositiveCase(
+ "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once())
+ assert(v1Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
+ .isInstanceOf[FakeSink])
+ }
+ }
+
// Get a list of (read, write, trigger) tuples for test cases.
val cases = readFormats.flatMap { read =>
writeFormats.flatMap { write =>
@@ -199,12 +218,12 @@ class StreamingDataSourceV2Suite extends StreamTest {
val writeSource = DataSource.lookupDataSource(write, spark.sqlContext.conf).newInstance()
(readSource, writeSource, trigger) match {
// Valid microbatch queries.
- case (_: MicroBatchReadSupport, _: MicroBatchWriteSupport, t)
+ case (_: MicroBatchReadSupport, _: StreamWriteSupport, t)
if !t.isInstanceOf[ContinuousTrigger] =>
testPositiveCase(read, write, trigger)
// Valid continuous queries.
- case (_: ContinuousReadSupport, _: ContinuousWriteSupport, _: ContinuousTrigger) =>
+ case (_: ContinuousReadSupport, _: StreamWriteSupport, _: ContinuousTrigger) =>
testPositiveCase(read, write, trigger)
// Invalid - can't read at all
@@ -214,31 +233,18 @@ class StreamingDataSourceV2Suite extends StreamTest {
testNegativeCase(read, write, trigger,
s"Data source $read does not support streamed reading")
- // Invalid - trigger is continuous but writer is not
- case (_, w, _: ContinuousTrigger) if !w.isInstanceOf[ContinuousWriteSupport] =>
- testNegativeCase(read, write, trigger,
- s"Data source $write does not support continuous writing")
-
- // Invalid - can't write at all
- case (_, w, _)
- if !w.isInstanceOf[MicroBatchWriteSupport]
- && !w.isInstanceOf[ContinuousWriteSupport] =>
+ // Invalid - can't write
+ case (_, w, _) if !w.isInstanceOf[StreamWriteSupport] =>
testNegativeCase(read, write, trigger,
s"Data source $write does not support streamed writing")
- // Invalid - trigger and writer are continuous but reader is not
- case (r, _: ContinuousWriteSupport, _: ContinuousTrigger)
+ // Invalid - trigger is continuous but reader is not
+ case (r, _: StreamWriteSupport, _: ContinuousTrigger)
if !r.isInstanceOf[ContinuousReadSupport] =>
testNegativeCase(read, write, trigger,
s"Data source $read does not support continuous processing")
- // Invalid - trigger is microbatch but writer is not
- case (_, w, t)
- if !w.isInstanceOf[MicroBatchWriteSupport] && !t.isInstanceOf[ContinuousTrigger] =>
- testNegativeCase(read, write, trigger,
- s"Data source $write does not support streamed writing")
-
- // Invalid - trigger and writer are microbatch but reader is not
+ // Invalid - trigger is microbatch but reader is not
case (r, _, t)
if !r.isInstanceOf[MicroBatchReadSupport] && !t.isInstanceOf[ContinuousTrigger] =>
testPostCreationNegativeCase(read, write, trigger,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org