You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2021/04/26 23:40:12 UTC

[spark] branch branch-3.0 updated: [SPARK-28247][SS][TEST] Fix flaky test "query without test harness" on ContinuousSuite

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new ae57b2a  [SPARK-28247][SS][TEST] Fix flaky test "query without test harness" on ContinuousSuite
ae57b2a is described below

commit ae57b2aa4f135ea8962b7764039c333ac2fabeab
Author: Shixiong Zhu <zs...@gmail.com>
AuthorDate: Tue Apr 27 08:07:09 2021 +0900

    [SPARK-28247][SS][TEST] Fix flaky test "query without test harness" on ContinuousSuite
    
    ### What changes were proposed in this pull request?
    
    This is another attempt to fix the flaky test "query without test harness" on ContinuousSuite.
    
    `query without test harness` is flaky because it starts a continuous query with two partitions but assumes they will run at the same speed.
    
    In this test, 0 and 2 will be written to partition 0, 1 and 3 will be written to partition 1. It assumes when we see 3, 2 should be written to the memory sink. But this is not guaranteed. We can add `if (currentValue == 2) Thread.sleep(5000)` at this line https://github.com/apache/spark/blob/b2a2b5d8206b7c09b180b8b6363f73c6c3fdb1d8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala#L135 to reproduce the failure: `Result set Set [...]
    
    The fix is changing `waitForRateSourceCommittedValue` to wait until all partitions reach the desired values before stopping the query.
    
    ### Why are the changes needed?
    
    Fix a flaky test.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests. Manually verify the reproduction I mentioned above doesn't fail after this change.
    
    Closes #32316 from zsxwing/SPARK-28247-fix.
    
    Authored-by: Shixiong Zhu <zs...@gmail.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
    (cherry picked from commit 0df3b501aeb2a88997e5a68a6a8f8e7f5196342c)
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../sql/streaming/continuous/ContinuousSuite.scala | 22 +++++++++++++---------
 1 file changed, 13 insertions(+), 9 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index 0d17f2e..320f8ee 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -55,27 +55,31 @@ class ContinuousSuiteBase extends StreamTest {
 
   protected def waitForRateSourceCommittedValue(
       query: ContinuousExecution,
-      desiredValue: Long,
+      partitionIdToDesiredValue: Map[Int, Long],
       maxWaitTimeMs: Long): Unit = {
-    def readHighestCommittedValue(c: ContinuousExecution): Option[Long] = {
+    def readCommittedValues(c: ContinuousExecution): Option[Map[Int, Long]] = {
       c.committedOffsets.lastOption.map { case (_, offset) =>
         offset match {
           case o: RateStreamOffset =>
-            o.partitionToValueAndRunTimeMs.map {
-              case (_, ValueRunTimeMsPair(value, _)) => value
-            }.max
+            o.partitionToValueAndRunTimeMs.mapValues(_.value).toMap
         }
       }
     }
 
+    def reachDesiredValues: Boolean = {
+      val committedValues = readCommittedValues(query).getOrElse(Map.empty)
+      partitionIdToDesiredValue.forall { case (key, value) =>
+        committedValues.contains(key) && committedValues(key) > value
+      }
+    }
+
     val maxWait = System.currentTimeMillis() + maxWaitTimeMs
-    while (System.currentTimeMillis() < maxWait &&
-      readHighestCommittedValue(query).getOrElse(Long.MinValue) < desiredValue) {
+    while (System.currentTimeMillis() < maxWait && !reachDesiredValues) {
       Thread.sleep(100)
     }
     if (System.currentTimeMillis() > maxWait) {
       logWarning(s"Couldn't reach desired value in $maxWaitTimeMs milliseconds!" +
-        s"Current highest committed value is ${readHighestCommittedValue(query)}")
+        s"Current committed values is ${readCommittedValues(query)}")
     }
   }
 
@@ -265,7 +269,7 @@ class ContinuousSuite extends ContinuousSuiteBase {
     val expected = Set(0, 1, 2, 3)
     val continuousExecution =
       query.asInstanceOf[StreamingQueryWrapper].streamingQuery.asInstanceOf[ContinuousExecution]
-    waitForRateSourceCommittedValue(continuousExecution, expected.max, 20 * 1000)
+    waitForRateSourceCommittedValue(continuousExecution, Map(0 -> 2, 1 -> 3), 20 * 1000)
     query.stop()
 
     val results = spark.read.table("noharness").collect()

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org