You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/09/11 19:08:06 UTC
[spark] branch branch-2.4 updated: [SPARK-32845][SS][TESTS] Add
sinkParameter to check sink options robustly in DataStreamReaderWriterSuite
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new be76ee9 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite
be76ee9 is described below
commit be76ee96bdb168dc2b44b25c01c3dd6cf7946f6b
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Fri Sep 11 11:48:34 2020 -0700
[SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite
This PR aims to add `sinkParameter` to check sink options robustly and independently in DataStreamReaderWriterSuite
`LastOptions.parameters` is designed to catch three cases: `sourceSchema`, `createSource`, `createSink`. However, `StreamQuery.stop` invokes `queryExecutionThread.join`, `runStream`, `createSource` immediately and reset the stored options by `createSink`.
To catch `createSink` options, currently, the test suite is trying a workaround pattern. However, we observed a flakiness in this pattern sometimes. If we split `createSink` option separately, we don't need this workaround and can eliminate this flakiness.
```scala
val query = df.writeStream.
...
.start()
assert(LastOptions.paramters(..))
query.stop()
```
No. This is a test-only change.
Pass the newly updated test case.
Closes #29730 from dongjoon-hyun/SPARK-32845.
Authored-by: Dongjoon Hyun <do...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
(cherry picked from commit b4be6a6d12bf62f02cffe0bcc97ef32d27827d57)
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../test/DataStreamReaderWriterSuite.scala | 23 +++++++++++-----------
1 file changed, 12 insertions(+), 11 deletions(-)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 4744aca..38d5c74 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -43,11 +43,13 @@ object LastOptions {
var mockStreamSourceProvider = mock(classOf[StreamSourceProvider])
var mockStreamSinkProvider = mock(classOf[StreamSinkProvider])
var parameters: Map[String, String] = null
+ var sinkParameters: Map[String, String] = null
var schema: Option[StructType] = null
var partitionColumns: Seq[String] = Nil
def clear(): Unit = {
parameters = null
+ sinkParameters = null
schema = null
partitionColumns = null
reset(mockStreamSourceProvider)
@@ -101,7 +103,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
- LastOptions.parameters = parameters
+ LastOptions.sinkParameters = parameters
LastOptions.partitionColumns = partitionColumns
LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns, outputMode)
new Sink {
@@ -171,20 +173,19 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
LastOptions.clear()
- val query = df.writeStream
+ df.writeStream
.format("org.apache.spark.sql.streaming.test")
.option("opt1", "5")
.options(Map("opt2" -> "4"))
.options(map)
.option("checkpointLocation", newMetadataDir)
.start()
+ .stop()
- assert(LastOptions.parameters("opt1") == "5")
- assert(LastOptions.parameters("opt2") == "4")
- assert(LastOptions.parameters("opt3") == "3")
- assert(LastOptions.parameters.contains("checkpointLocation"))
-
- query.stop()
+ assert(LastOptions.sinkParameters("opt1") == "5")
+ assert(LastOptions.sinkParameters("opt2") == "4")
+ assert(LastOptions.sinkParameters("opt3") == "3")
+ assert(LastOptions.sinkParameters.contains("checkpointLocation"))
}
test("SPARK-32832: later option should override earlier options for load()") {
@@ -205,7 +206,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
.load()
assert(LastOptions.parameters.isEmpty)
- val query = ds.writeStream
+ ds.writeStream
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
.option("paTh", "1")
@@ -214,8 +215,8 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
.option("patH", "4")
.option("path", "5")
.start()
- assert(LastOptions.parameters("path") == "5")
- query.stop()
+ .stop()
+ assert(LastOptions.sinkParameters("path") == "5")
}
test("partitioning") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org