You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/03/13 11:48:22 UTC
[spark] branch master updated: [SPARK-27064][SS] create
StreamingWrite at the beginning of streaming execution
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d3813d8 [SPARK-27064][SS] create StreamingWrite at the beginning of streaming execution
d3813d8 is described below
commit d3813d8b210d127ea278015ef27ead9348365787
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Wed Mar 13 19:47:54 2019 +0800
[SPARK-27064][SS] create StreamingWrite at the beginning of streaming execution
## What changes were proposed in this pull request?
According to the [design](https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing), the life cycle of `StreamingWrite` should be the same as the read side `MicroBatch/ContinuousStream`, i.e. each run of the stream query, instead of each epoch.
This PR fixes it.
## How was this patch tested?
existing tests
Closes #23981 from cloud-fan/dsv2.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/kafka010/KafkaContinuousSinkSuite.scala | 101 +++++----------------
.../execution/streaming/MicroBatchExecution.scala | 18 ++--
.../sql/execution/streaming/StreamExecution.scala | 2 +-
.../streaming/continuous/ContinuousExecution.scala | 20 ++--
.../sources/WriteToMicroBatchDataSource.scala | 39 ++++++++
.../sources/StreamingDataSourceV2Suite.scala | 18 +++-
6 files changed, 104 insertions(+), 94 deletions(-)
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
index b21037b..3c3aeeb 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
@@ -22,9 +22,8 @@ import java.util.Locale
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.scalatest.time.SpanSugar._
-import scala.collection.JavaConverters._
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection}
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types.{BinaryType, DataType}
@@ -227,39 +226,23 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
val topic = newTopic()
testUtils.createTopic(topic)
- /* No topic field or topic option */
- var writer: StreamingQuery = null
- var ex: Exception = null
- try {
- writer = createKafkaWriter(input.toDF())(
+ val ex = intercept[AnalysisException] {
+ /* No topic field or topic option */
+ createKafkaWriter(input.toDF())(
withSelectExpr = "value as key", "value"
)
- testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
- eventually(timeout(streamingTimeout)) {
- assert(writer.exception.isDefined)
- ex = writer.exception.get
- }
- } finally {
- writer.stop()
}
assert(ex.getMessage
.toLowerCase(Locale.ROOT)
.contains("topic option required when no 'topic' attribute is present"))
- try {
+ val ex2 = intercept[AnalysisException] {
/* No value field */
- writer = createKafkaWriter(input.toDF())(
+ createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "value as key"
)
- testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
- eventually(timeout(streamingTimeout)) {
- assert(writer.exception.isDefined)
- ex = writer.exception.get
- }
- } finally {
- writer.stop()
}
- assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
+ assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains(
"required attribute 'value' not found"))
}
@@ -278,53 +261,30 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
val topic = newTopic()
testUtils.createTopic(topic)
- var writer: StreamingQuery = null
- var ex: Exception = null
- try {
+ val ex = intercept[AnalysisException] {
/* topic field wrong type */
- writer = createKafkaWriter(input.toDF())(
+ createKafkaWriter(input.toDF())(
withSelectExpr = s"CAST('1' as INT) as topic", "value"
)
- testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
- eventually(timeout(streamingTimeout)) {
- assert(writer.exception.isDefined)
- ex = writer.exception.get
- }
- } finally {
- writer.stop()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("topic type must be a string"))
- try {
+ val ex2 = intercept[AnalysisException] {
/* value field wrong type */
- writer = createKafkaWriter(input.toDF())(
+ createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as value"
)
- testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
- eventually(timeout(streamingTimeout)) {
- assert(writer.exception.isDefined)
- ex = writer.exception.get
- }
- } finally {
- writer.stop()
}
- assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
+ assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains(
"value attribute type must be a string or binary"))
- try {
+ val ex3 = intercept[AnalysisException] {
/* key field wrong type */
- writer = createKafkaWriter(input.toDF())(
+ createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as key", "value"
)
- testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
- eventually(timeout(streamingTimeout)) {
- assert(writer.exception.isDefined)
- ex = writer.exception.get
- }
- } finally {
- writer.stop()
}
- assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
+ assert(ex3.getMessage.toLowerCase(Locale.ROOT).contains(
"key attribute type must be a string or binary"))
}
@@ -369,35 +329,22 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
- var writer: StreamingQuery = null
- var ex: Exception = null
- try {
- writer = createKafkaWriter(
+
+ val ex = intercept[IllegalArgumentException] {
+ createKafkaWriter(
input.toDF(),
withOptions = Map("kafka.key.serializer" -> "foo"))()
- eventually(timeout(streamingTimeout)) {
- assert(writer.exception.isDefined)
- ex = writer.exception.get
- }
- assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
- "kafka option 'key.serializer' is not supported"))
- } finally {
- writer.stop()
}
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
+ "kafka option 'key.serializer' is not supported"))
- try {
- writer = createKafkaWriter(
+ val ex2 = intercept[IllegalArgumentException] {
+ createKafkaWriter(
input.toDF(),
withOptions = Map("kafka.value.serializer" -> "foo"))()
- eventually(timeout(streamingTimeout)) {
- assert(writer.exception.isDefined)
- ex = writer.exception.get
- }
- assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
- "kafka option 'value.serializer' is not supported"))
- } finally {
- writer.stop()
}
+ assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains(
+ "kafka option 'value.serializer' is not supported"))
}
test("generic - write big data with small producer buffer") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index de7cbe2..bedcb9f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -26,8 +26,8 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatch
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2, WriteToDataSourceV2Exec}
-import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWrite, RateControlMicroBatchStream}
+import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2Exec}
+import org.apache.spark.sql.execution.streaming.sources.{RateControlMicroBatchStream, WriteToMicroBatchDataSource}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2}
@@ -122,7 +122,14 @@ class MicroBatchExecution(
case r: StreamingDataSourceV2Relation => r.stream
}
uniqueSources = sources.distinct
- _logicalPlan
+
+ sink match {
+ case s: SupportsStreamingWrite =>
+ val streamingWrite = createStreamingWrite(s, extraOptions, _logicalPlan)
+ WriteToMicroBatchDataSource(streamingWrite, _logicalPlan)
+
+ case _ => _logicalPlan
+ }
}
/**
@@ -513,9 +520,8 @@ class MicroBatchExecution(
val triggerLogicalPlan = sink match {
case _: Sink => newAttributePlan
- case s: SupportsStreamingWrite =>
- val streamingWrite = createStreamingWrite(s, extraOptions, newAttributePlan)
- WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, streamingWrite), newAttributePlan)
+ case _: SupportsStreamingWrite =>
+ newAttributePlan.asInstanceOf[WriteToMicroBatchDataSource].createPlan(currentBatchId)
case _ => throw new IllegalArgumentException(s"unknown sink type for $sink")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index bba640e..180a23c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -585,7 +585,7 @@ abstract class StreamExecution(
options: Map[String, String],
inputPlan: LogicalPlan): StreamingWrite = {
val writeBuilder = table.newWriteBuilder(new DataSourceOptions(options.asJava))
- .withQueryId(runId.toString)
+ .withQueryId(id.toString)
.withInputDataSchema(inputPlan.schema)
outputMode match {
case Append =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index aef556d..f55a45d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -61,7 +61,7 @@ class ContinuousExecution(
// Throwable that caused the execution to fail
private val failure: AtomicReference[Throwable] = new AtomicReference[Throwable](null)
- override val logicalPlan: LogicalPlan = {
+ override val logicalPlan: WriteToContinuousDataSource = {
val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2Relation]()
var nextSourceId = 0
val _logicalPlan = analyzedPlan.transform {
@@ -88,7 +88,8 @@ class ContinuousExecution(
}
uniqueSources = sources.distinct
- _logicalPlan
+ WriteToContinuousDataSource(
+ createStreamingWrite(sink, extraOptions, _logicalPlan), _logicalPlan)
}
private val triggerExecutor = trigger match {
@@ -178,13 +179,10 @@ class ContinuousExecution(
"CurrentTimestamp and CurrentDate not yet supported for continuous processing")
}
- val streamingWrite = createStreamingWrite(sink, extraOptions, withNewSources)
- val planWithSink = WriteToContinuousDataSource(streamingWrite, withNewSources)
-
reportTimeTaken("queryPlanning") {
lastExecution = new IncrementalExecution(
sparkSessionForQuery,
- planWithSink,
+ withNewSources,
outputMode,
checkpointFile("state"),
id,
@@ -194,7 +192,7 @@ class ContinuousExecution(
lastExecution.executedPlan // Force the lazy generation of execution plan
}
- val stream = planWithSink.collect {
+ val stream = withNewSources.collect {
case relation: StreamingDataSourceV2Relation =>
relation.stream.asInstanceOf[ContinuousStream]
}.head
@@ -215,7 +213,13 @@ class ContinuousExecution(
// Use the parent Spark session for the endpoint since it's where this query ID is registered.
val epochEndpoint = EpochCoordinatorRef.create(
- streamingWrite, stream, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get)
+ logicalPlan.write,
+ stream,
+ this,
+ epochCoordinatorId,
+ currentBatchId,
+ sparkSession,
+ SparkEnv.get)
val epochUpdateThread = new Thread(new Runnable {
override def run: Unit = {
try {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala
new file mode 100644
index 0000000..a3f58fa
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
+
+/**
+ * The logical plan for writing data to a micro-batch stream.
+ *
+ * Note that this logical plan does not have a corresponding physical plan, as it will be converted
+ * to [[WriteToDataSourceV2]] with [[MicroBatchWrite]] before execution.
+ */
+case class WriteToMicroBatchDataSource(write: StreamingWrite, query: LogicalPlan)
+ extends LogicalPlan {
+ override def children: Seq[LogicalPlan] = Seq(query)
+ override def output: Seq[Attribute] = Nil
+
+ def createPlan(batchId: Long): WriteToDataSourceV2 = {
+ WriteToDataSourceV2(new MicroBatchWrite(batchId, write), query)
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index 97b694e..3c2c700 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -26,7 +26,8 @@ import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming._
-import org.apache.spark.sql.sources.v2.writer.WriteBuilder
+import org.apache.spark.sql.sources.v2.writer.{WriteBuilder, WriterCommitMessage}
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamTest, Trigger}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
@@ -59,6 +60,19 @@ class FakeScanBuilder extends ScanBuilder with Scan {
override def toContinuousStream(checkpointLocation: String): ContinuousStream = new FakeDataStream
}
+class FakeWriteBuilder extends WriteBuilder with StreamingWrite {
+ override def buildForStreaming(): StreamingWrite = this
+ override def createStreamingWriterFactory(): StreamingDataWriterFactory = {
+ throw new IllegalStateException("fake sink - cannot actually write")
+ }
+ override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
+ throw new IllegalStateException("fake sink - cannot actually write")
+ }
+ override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
+ throw new IllegalStateException("fake sink - cannot actually write")
+ }
+}
+
trait FakeMicroBatchReadTable extends Table with SupportsMicroBatchRead {
override def name(): String = "fake"
override def schema(): StructType = StructType(Seq())
@@ -75,7 +89,7 @@ trait FakeStreamingWriteTable extends Table with SupportsStreamingWrite {
override def name(): String = "fake"
override def schema(): StructType = StructType(Seq())
override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
- throw new IllegalStateException("fake sink - cannot actually write")
+ new FakeWriteBuilder
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org