You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ji...@apache.org on 2020/05/28 23:31:08 UTC

[spark] branch branch-3.0 updated: [SPARK-31730][CORE][TEST][3.0] Fix flaky tests in BarrierTaskContextSuite

This is an automated email from the ASF dual-hosted git repository.

jiangxb1987 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 3ff4db9  [SPARK-31730][CORE][TEST][3.0] Fix flaky tests in BarrierTaskContextSuite
3ff4db9 is described below

commit 3ff4db97fb966e35c0b7450a7210cebf5d331be6
Author: Xingbo Jiang <xi...@databricks.com>
AuthorDate: Thu May 28 16:29:40 2020 -0700

    [SPARK-31730][CORE][TEST][3.0] Fix flaky tests in BarrierTaskContextSuite
    
    ### What changes were proposed in this pull request?
    
    To wait until all the executors have started before submitting any job. This could avoid the flakiness caused by waiting for executors coming up.
    
    ### How was this patch tested?
    
    Existing tests.
    
    Closes #28658 from jiangxb1987/barrierTest.
    
    Authored-by: Xingbo Jiang <xi...@databricks.com>
    Signed-off-by: Xingbo Jiang <xi...@databricks.com>
---
 .../spark/scheduler/BarrierTaskContextSuite.scala  | 22 ++++++----------------
 1 file changed, 6 insertions(+), 16 deletions(-)

diff --git a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
index 6191e41..764b4b7 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
@@ -37,10 +37,10 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
       .setAppName("test-cluster")
       .set(TEST_NO_STAGE_RETRY, true)
     sc = new SparkContext(conf)
+    TestUtils.waitUntilExecutorsUp(sc, numWorker, 60000)
   }
 
-  // TODO (SPARK-31730): re-enable it
-  ignore("global sync by barrier() call") {
+  test("global sync by barrier() call") {
     initLocalClusterSparkContext()
     val rdd = sc.makeRDD(1 to 10, 4)
     val rdd2 = rdd.barrier().mapPartitions { it =>
@@ -57,10 +57,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
   }
 
   test("share messages with allGather() call") {
-    val conf = new SparkConf()
-      .setMaster("local-cluster[4, 1, 1024]")
-      .setAppName("test-cluster")
-    sc = new SparkContext(conf)
+    initLocalClusterSparkContext()
     val rdd = sc.makeRDD(1 to 10, 4)
     val rdd2 = rdd.barrier().mapPartitions { it =>
       val context = BarrierTaskContext.get()
@@ -78,10 +75,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
   }
 
   test("throw exception if we attempt to synchronize with different blocking calls") {
-    val conf = new SparkConf()
-      .setMaster("local-cluster[4, 1, 1024]")
-      .setAppName("test-cluster")
-    sc = new SparkContext(conf)
+    initLocalClusterSparkContext()
     val rdd = sc.makeRDD(1 to 10, 4)
     val rdd2 = rdd.barrier().mapPartitions { it =>
       val context = BarrierTaskContext.get()
@@ -100,10 +94,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
   }
 
   test("successively sync with allGather and barrier") {
-    val conf = new SparkConf()
-      .setMaster("local-cluster[4, 1, 1024]")
-      .setAppName("test-cluster")
-    sc = new SparkContext(conf)
+    initLocalClusterSparkContext()
     val rdd = sc.makeRDD(1 to 10, 4)
     val rdd2 = rdd.barrier().mapPartitions { it =>
       val context = BarrierTaskContext.get()
@@ -129,8 +120,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
     assert(times2.max - times2.min <= 1000)
   }
 
-  // TODO (SPARK-31730): re-enable it
-  ignore("support multiple barrier() call within a single task") {
+  test("support multiple barrier() call within a single task") {
     initLocalClusterSparkContext()
     val rdd = sc.makeRDD(1 to 10, 4)
     val rdd2 = rdd.barrier().mapPartitions { it =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org