You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2021/04/26 23:08:26 UTC
[spark] branch branch-3.1 updated: [SPARK-28247][SS][TEST] Fix
flaky test "query without test harness" on ContinuousSuite
This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 526490f [SPARK-28247][SS][TEST] Fix flaky test "query without test harness" on ContinuousSuite
526490f is described below
commit 526490fc2562b93046214e3f90eecee08722cd33
Author: Shixiong Zhu <zs...@gmail.com>
AuthorDate: Tue Apr 27 08:07:09 2021 +0900
[SPARK-28247][SS][TEST] Fix flaky test "query without test harness" on ContinuousSuite
### What changes were proposed in this pull request?
This is another attempt to fix the flaky test "query without test harness" on ContinuousSuite.
`query without test harness` is flaky because it starts a continuous query with two partitions but assumes they will run at the same speed.
In this test, 0 and 2 will be written to partition 0, 1 and 3 will be written to partition 1. It assumes when we see 3, 2 should be written to the memory sink. But this is not guaranteed. We can add `if (currentValue == 2) Thread.sleep(5000)` at this line https://github.com/apache/spark/blob/b2a2b5d8206b7c09b180b8b6363f73c6c3fdb1d8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala#L135 to reproduce the failure: `Result set Set [...]
The fix is changing `waitForRateSourceCommittedValue` to wait until all partitions reach the desired values before stopping the query.
### Why are the changes needed?
Fix a flaky test.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests. Manually verify the reproduction I mentioned above doesn't fail after this change.
Closes #32316 from zsxwing/SPARK-28247-fix.
Authored-by: Shixiong Zhu <zs...@gmail.com>
Signed-off-by: Jungtaek Lim <ka...@gmail.com>
(cherry picked from commit 0df3b501aeb2a88997e5a68a6a8f8e7f5196342c)
Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
.../sql/streaming/continuous/ContinuousSuite.scala | 22 +++++++++++++---------
1 file changed, 13 insertions(+), 9 deletions(-)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index 02f9139..0e2fcfb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -54,27 +54,31 @@ class ContinuousSuiteBase extends StreamTest {
protected def waitForRateSourceCommittedValue(
query: ContinuousExecution,
- desiredValue: Long,
+ partitionIdToDesiredValue: Map[Int, Long],
maxWaitTimeMs: Long): Unit = {
- def readHighestCommittedValue(c: ContinuousExecution): Option[Long] = {
+ def readCommittedValues(c: ContinuousExecution): Option[Map[Int, Long]] = {
c.committedOffsets.lastOption.map { case (_, offset) =>
offset match {
case o: RateStreamOffset =>
- o.partitionToValueAndRunTimeMs.map {
- case (_, ValueRunTimeMsPair(value, _)) => value
- }.max
+ o.partitionToValueAndRunTimeMs.mapValues(_.value).toMap
}
}
}
+ def reachDesiredValues: Boolean = {
+ val committedValues = readCommittedValues(query).getOrElse(Map.empty)
+ partitionIdToDesiredValue.forall { case (key, value) =>
+ committedValues.contains(key) && committedValues(key) > value
+ }
+ }
+
val maxWait = System.currentTimeMillis() + maxWaitTimeMs
- while (System.currentTimeMillis() < maxWait &&
- readHighestCommittedValue(query).getOrElse(Long.MinValue) < desiredValue) {
+ while (System.currentTimeMillis() < maxWait && !reachDesiredValues) {
Thread.sleep(100)
}
if (System.currentTimeMillis() > maxWait) {
logWarning(s"Couldn't reach desired value in $maxWaitTimeMs milliseconds!" +
- s"Current highest committed value is ${readHighestCommittedValue(query)}")
+ s"Current committed values is ${readCommittedValues(query)}")
}
}
@@ -264,7 +268,7 @@ class ContinuousSuite extends ContinuousSuiteBase {
val expected = Set(0, 1, 2, 3)
val continuousExecution =
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.asInstanceOf[ContinuousExecution]
- waitForRateSourceCommittedValue(continuousExecution, expected.max, 20 * 1000)
+ waitForRateSourceCommittedValue(continuousExecution, Map(0 -> 2, 1 -> 3), 20 * 1000)
query.stop()
val results = spark.read.table("noharness").collect()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org