You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/09/11 19:08:06 UTC

[spark] branch branch-2.4 updated: [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new be76ee9  [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite
be76ee9 is described below

commit be76ee96bdb168dc2b44b25c01c3dd6cf7946f6b
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Fri Sep 11 11:48:34 2020 -0700

    [SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite
    
    This PR aims to add `sinkParameter`  to check sink options robustly and independently in DataStreamReaderWriterSuite
    
    `LastOptions.parameters` is designed to catch three cases: `sourceSchema`, `createSource`, `createSink`. However, `StreamQuery.stop` invokes `queryExecutionThread.join`, `runStream`, `createSource` immediately and reset the stored options by `createSink`.
    
    To catch `createSink` options, currently, the test suite is trying a workaround pattern. However, we observed a flakiness in this pattern sometimes. If we split `createSink` option separately, we don't need this workaround and can eliminate this flakiness.
    
    ```scala
    val query = df.writeStream.
       ...
       .start()
    assert(LastOptions.paramters(..))
    query.stop()
    ```
    
    No. This is a test-only change.
    
    Pass the newly updated test case.
    
    Closes #29730 from dongjoon-hyun/SPARK-32845.
    
    Authored-by: Dongjoon Hyun <do...@apache.org>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit b4be6a6d12bf62f02cffe0bcc97ef32d27827d57)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../test/DataStreamReaderWriterSuite.scala         | 23 +++++++++++-----------
 1 file changed, 12 insertions(+), 11 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 4744aca..38d5c74 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -43,11 +43,13 @@ object LastOptions {
   var mockStreamSourceProvider = mock(classOf[StreamSourceProvider])
   var mockStreamSinkProvider = mock(classOf[StreamSinkProvider])
   var parameters: Map[String, String] = null
+  var sinkParameters: Map[String, String] = null
   var schema: Option[StructType] = null
   var partitionColumns: Seq[String] = Nil
 
   def clear(): Unit = {
     parameters = null
+    sinkParameters = null
     schema = null
     partitionColumns = null
     reset(mockStreamSourceProvider)
@@ -101,7 +103,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
       parameters: Map[String, String],
       partitionColumns: Seq[String],
       outputMode: OutputMode): Sink = {
-    LastOptions.parameters = parameters
+    LastOptions.sinkParameters = parameters
     LastOptions.partitionColumns = partitionColumns
     LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns, outputMode)
     new Sink {
@@ -171,20 +173,19 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
 
     LastOptions.clear()
 
-    val query = df.writeStream
+    df.writeStream
       .format("org.apache.spark.sql.streaming.test")
       .option("opt1", "5")
       .options(Map("opt2" -> "4"))
       .options(map)
       .option("checkpointLocation", newMetadataDir)
       .start()
+      .stop()
 
-    assert(LastOptions.parameters("opt1") == "5")
-    assert(LastOptions.parameters("opt2") == "4")
-    assert(LastOptions.parameters("opt3") == "3")
-    assert(LastOptions.parameters.contains("checkpointLocation"))
-
-    query.stop()
+    assert(LastOptions.sinkParameters("opt1") == "5")
+    assert(LastOptions.sinkParameters("opt2") == "4")
+    assert(LastOptions.sinkParameters("opt3") == "3")
+    assert(LastOptions.sinkParameters.contains("checkpointLocation"))
   }
 
   test("SPARK-32832: later option should override earlier options for load()") {
@@ -205,7 +206,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
       .load()
     assert(LastOptions.parameters.isEmpty)
 
-    val query = ds.writeStream
+    ds.writeStream
       .format("org.apache.spark.sql.streaming.test")
       .option("checkpointLocation", newMetadataDir)
       .option("paTh", "1")
@@ -214,8 +215,8 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
       .option("patH", "4")
       .option("path", "5")
       .start()
-    assert(LastOptions.parameters("path") == "5")
-    query.stop()
+      .stop()
+    assert(LastOptions.sinkParameters("path") == "5")
   }
 
   test("partitioning") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org