You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/07/13 08:22:13 UTC

[spark] branch master updated: [SPARK-36109][SS][TEST] Check data after adding data to topic in KafkaSourceStressSuite

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 201566c  [SPARK-36109][SS][TEST] Check data after adding data to topic in KafkaSourceStressSuite
201566c is described below

commit 201566cdd556f362aaf7cae21d961b23065c022b
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Tue Jul 13 01:21:32 2021 -0700

    [SPARK-36109][SS][TEST] Check data after adding data to topic in KafkaSourceStressSuite
    
    ### What changes were proposed in this pull request?
    
    This patch proposes to check data after adding data to topic in `KafkaSourceStressSuite`.
    
    ### Why are the changes needed?
    
    The test logic in `KafkaSourceStressSuite` is not stable. For example, https://github.com/apache/spark/runs/3049244904.
    
    Once we add data to a topic and then delete the topic before checking data, the expected answer is different to retrieved data from the sink.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests.
    
    Closes #33311 from viirya/stream-assert.
    
    Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 5 +++--
 .../src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala  | 6 +++++-
 2 files changed, 8 insertions(+), 3 deletions(-)

diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 714da92..d9fad5e 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -2429,8 +2429,9 @@ class KafkaSourceStressSuite extends KafkaSourceTest {
       (d, running) => {
         Random.nextInt(5) match {
           case 0 => // Add a new topic
-            topics = topics ++ Seq(newStressTopic)
-            AddKafkaData(topics.toSet, d: _*)(message = s"Add topic $newStressTopic",
+            val newTopic = newStressTopic
+            topics = topics ++ Seq(newTopic)
+            AddKafkaData(topics.toSet, d: _*)(message = s"Add topic $newTopic",
               topicAction = (topic, partition) => {
                 if (partition.isEmpty) {
                   testUtils.createTopic(topic, partitions = nextInt(1, 6))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 624b630..ff182b5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -283,7 +283,7 @@ trait StreamTest extends QueryTest with SharedSparkSession with TimeLimits with
 
   /** Assert that a condition on the active query is true */
   class AssertOnQuery(val condition: StreamExecution => Boolean, val message: String)
-    extends StreamAction {
+    extends StreamAction with StreamMustBeRunning {
     override def toString: String = s"AssertOnQuery(<condition>, $message)"
   }
 
@@ -871,6 +871,10 @@ trait StreamTest extends QueryTest with SharedSparkSession with TimeLimits with
 
           case r if r < 0.7 => // AddData
             addRandomData()
+            // In some suites, e.g. `KafkaSourceStressSuite`, we delete Kafka topic in the
+            // `addData` closure. In the case, the topic with added data might be deleted
+            // before next check. So we must check data after adding data here.
+            addCheck()
 
           case _ => // StopStream
             addCheck()

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org