You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ho...@apache.org on 2020/05/23 01:20:15 UTC

[spark] branch master updated: [SPARK-31791][CORE][TEST] Improve cache block migration test reliability

This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 721cba5  [SPARK-31791][CORE][TEST] Improve cache block migration test reliability
721cba5 is described below

commit 721cba540292d8d76102b18922dabe2a7d918dc5
Author: Holden Karau <hk...@apple.com>
AuthorDate: Fri May 22 18:19:41 2020 -0700

    [SPARK-31791][CORE][TEST] Improve cache block migration test reliability
    
    ### What changes were proposed in this pull request?
    
    Increase the timeout and register the listener earlier to avoid any race condition of the job starting before the listener is registered.
    
    ### Why are the changes needed?
    
    The test is currently semi-flaky.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    I'm currently running the following bash script on my dev machine to verify the flakiness decreases. It has gotten to 356 iterations without any test failures so I believe issue is fixed.
    
    ```
    set -ex
    ./build/sbt clean compile package
    ((failures=0))
    for (( i=0;i<1000;++i )); do
      echo "Run $i"
      ((failed=0))
      ./build/sbt "core/testOnly org.apache.spark.scheduler.WorkerDecommissionSuite" || ((failed=1))
      echo "Resulted in $failed"
      ((failures=failures+failed))
      echo "Current status is failures: $failures out of $i runs"
    done
    ```
    
    Closes #28614 from holdenk/SPARK-31791-improve-cache-block-migration-test-reliability.
    
    Authored-by: Holden Karau <hk...@apple.com>
    Signed-off-by: Holden Karau <hk...@apple.com>
---
 .../spark/scheduler/WorkerDecommissionSuite.scala  | 22 +++++++++++++---------
 1 file changed, 13 insertions(+), 9 deletions(-)

diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
index 8c6f86a..148d20e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
@@ -22,7 +22,8 @@ import java.util.concurrent.Semaphore
 import scala.concurrent.TimeoutException
 import scala.concurrent.duration._
 
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite,
+  TestUtils}
 import org.apache.spark.internal.config
 import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
 import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils}
@@ -48,12 +49,6 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext {
 
   test("verify a task with all workers decommissioned succeeds") {
     val input = sc.parallelize(1 to 10)
-    // Do a count to wait for the executors to be registered.
-    input.count()
-    val sleepyRdd = input.mapPartitions{ x =>
-      Thread.sleep(50)
-      x
-    }
     // Listen for the job
     val sem = new Semaphore(0)
     sc.addSparkListener(new SparkListener {
@@ -61,22 +56,31 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext {
         sem.release()
       }
     })
+    TestUtils.waitUntilExecutorsUp(sc = sc,
+      numExecutors = 2,
+      timeout = 10000) // 10s
+    val sleepyRdd = input.mapPartitions{ x =>
+      Thread.sleep(5000) // 5s
+      x
+    }
     // Start the task.
     val asyncCount = sleepyRdd.countAsync()
     // Wait for the job to have started
     sem.acquire(1)
+    // Give it time to make it to the worker otherwise we'll block
+    Thread.sleep(2000) // 2s
     // Decommission all the executors, this should not halt the current task.
     // decom.sh message passing is tested manually.
     val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
     val execs = sched.getExecutorIds()
     execs.foreach(execId => sched.decommissionExecutor(execId))
-    val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 10.seconds)
+    val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds)
     assert(asyncCountResult === 10)
     // Try and launch task after decommissioning, this should fail
     val postDecommissioned = input.map(x => x)
     val postDecomAsyncCount = postDecommissioned.countAsync()
     val thrown = intercept[java.util.concurrent.TimeoutException]{
-      val result = ThreadUtils.awaitResult(postDecomAsyncCount, 10.seconds)
+      val result = ThreadUtils.awaitResult(postDecomAsyncCount, 20.seconds)
     }
     assert(postDecomAsyncCount.isCompleted === false,
       "After exec decommission new task could not launch")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org