You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ji...@apache.org on 2020/05/28 00:22:13 UTC

[spark] branch branch-3.0 updated: Revert "[SPARK-31730][CORE][TEST] Fix flaky tests in BarrierTaskContextSuite"

This is an automated email from the ASF dual-hosted git repository.

jiangxb1987 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new e7b88e8  Revert "[SPARK-31730][CORE][TEST] Fix flaky tests in BarrierTaskContextSuite"
e7b88e8 is described below

commit e7b88e82ec5cee0a738f96127b106358cc97cb4f
Author: Xingbo Jiang <xi...@databricks.com>
AuthorDate: Wed May 27 17:21:10 2020 -0700

    Revert "[SPARK-31730][CORE][TEST] Fix flaky tests in BarrierTaskContextSuite"
    
    This reverts commit cb817bb1cf6b074e075c02880001ec96f2f39de7.
---
 .../spark/scheduler/BarrierTaskContextSuite.scala  | 26 +++++++++++++---------
 1 file changed, 16 insertions(+), 10 deletions(-)

diff --git a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
index 54899bf..6191e41 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
@@ -25,7 +25,6 @@ import org.scalatest.concurrent.Eventually
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
-import org.apache.spark.internal.config
 import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
 
 class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with Eventually {
@@ -38,10 +37,10 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
       .setAppName("test-cluster")
       .set(TEST_NO_STAGE_RETRY, true)
     sc = new SparkContext(conf)
-    TestUtils.waitUntilExecutorsUp(sc, numWorker, 60000)
   }
 
-  test("global sync by barrier() call") {
+  // TODO (SPARK-31730): re-enable it
+  ignore("global sync by barrier() call") {
     initLocalClusterSparkContext()
     val rdd = sc.makeRDD(1 to 10, 4)
     val rdd2 = rdd.barrier().mapPartitions { it =>
@@ -58,7 +57,10 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
   }
 
   test("share messages with allGather() call") {
-    initLocalClusterSparkContext()
+    val conf = new SparkConf()
+      .setMaster("local-cluster[4, 1, 1024]")
+      .setAppName("test-cluster")
+    sc = new SparkContext(conf)
     val rdd = sc.makeRDD(1 to 10, 4)
     val rdd2 = rdd.barrier().mapPartitions { it =>
       val context = BarrierTaskContext.get()
@@ -76,7 +78,10 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
   }
 
   test("throw exception if we attempt to synchronize with different blocking calls") {
-    initLocalClusterSparkContext()
+    val conf = new SparkConf()
+      .setMaster("local-cluster[4, 1, 1024]")
+      .setAppName("test-cluster")
+    sc = new SparkContext(conf)
     val rdd = sc.makeRDD(1 to 10, 4)
     val rdd2 = rdd.barrier().mapPartitions { it =>
       val context = BarrierTaskContext.get()
@@ -95,7 +100,10 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
   }
 
   test("successively sync with allGather and barrier") {
-    initLocalClusterSparkContext()
+    val conf = new SparkConf()
+      .setMaster("local-cluster[4, 1, 1024]")
+      .setAppName("test-cluster")
+    sc = new SparkContext(conf)
     val rdd = sc.makeRDD(1 to 10, 4)
     val rdd2 = rdd.barrier().mapPartitions { it =>
       val context = BarrierTaskContext.get()
@@ -121,7 +129,8 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
     assert(times2.max - times2.min <= 1000)
   }
 
-  test("support multiple barrier() call within a single task") {
+  // TODO (SPARK-31730): re-enable it
+  ignore("support multiple barrier() call within a single task") {
     initLocalClusterSparkContext()
     val rdd = sc.makeRDD(1 to 10, 4)
     val rdd2 = rdd.barrier().mapPartitions { it =>
@@ -276,9 +285,6 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
 
   test("SPARK-31485: barrier stage should fail if only partial tasks are launched") {
     initLocalClusterSparkContext(2)
-    // It's required to reset the delay timer when a task is scheduled, otherwise all the tasks
-    // could get scheduled at ANY level.
-    sc.conf.set(config.LEGACY_LOCALITY_WAIT_RESET, true)
     val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
     val dep = new OneToOneDependency[Int](rdd0)
     // set up a barrier stage with 2 tasks and both tasks prefer executor 0 (only 1 core) for


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org