You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/07/13 08:23:27 UTC
[spark] branch branch-3.2 updated: [SPARK-36109][SS][TEST] Check
data after adding data to topic in KafkaSourceStressSuite
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 19478bd [SPARK-36109][SS][TEST] Check data after adding data to topic in KafkaSourceStressSuite
19478bd is described below
commit 19478bdf52e834b2f18050654fa5651139f28071
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Tue Jul 13 01:21:32 2021 -0700
[SPARK-36109][SS][TEST] Check data after adding data to topic in KafkaSourceStressSuite
### What changes were proposed in this pull request?
This patch proposes to check data after adding data to topic in `KafkaSourceStressSuite`.
### Why are the changes needed?
The test logic in `KafkaSourceStressSuite` is not stable. For example, https://github.com/apache/spark/runs/3049244904.
Once we add data to a topic and then delete the topic before checking data, the expected answer is different to retrieved data from the sink.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests.
Closes #33311 from viirya/stream-assert.
Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
(cherry picked from commit 201566cdd556f362aaf7cae21d961b23065c022b)
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 5 +++--
.../src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala | 6 +++++-
2 files changed, 8 insertions(+), 3 deletions(-)
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 714da92..d9fad5e 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -2429,8 +2429,9 @@ class KafkaSourceStressSuite extends KafkaSourceTest {
(d, running) => {
Random.nextInt(5) match {
case 0 => // Add a new topic
- topics = topics ++ Seq(newStressTopic)
- AddKafkaData(topics.toSet, d: _*)(message = s"Add topic $newStressTopic",
+ val newTopic = newStressTopic
+ topics = topics ++ Seq(newTopic)
+ AddKafkaData(topics.toSet, d: _*)(message = s"Add topic $newTopic",
topicAction = (topic, partition) => {
if (partition.isEmpty) {
testUtils.createTopic(topic, partitions = nextInt(1, 6))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 624b630..ff182b5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -283,7 +283,7 @@ trait StreamTest extends QueryTest with SharedSparkSession with TimeLimits with
/** Assert that a condition on the active query is true */
class AssertOnQuery(val condition: StreamExecution => Boolean, val message: String)
- extends StreamAction {
+ extends StreamAction with StreamMustBeRunning {
override def toString: String = s"AssertOnQuery(<condition>, $message)"
}
@@ -871,6 +871,10 @@ trait StreamTest extends QueryTest with SharedSparkSession with TimeLimits with
case r if r < 0.7 => // AddData
addRandomData()
+ // In some suites, e.g. `KafkaSourceStressSuite`, we delete Kafka topic in the
+ // `addData` closure. In the case, the topic with added data might be deleted
+ // before next check. So we must check data after adding data here.
+ addCheck()
case _ => // StopStream
addCheck()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org