You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/04/19 22:06:33 UTC

spark git commit: [SPARK-24022][TEST] Make SparkContextSuite not flaky

Repository: spark
Updated Branches:
  refs/heads/master 9ea8d3d31 -> e55953b0b


[SPARK-24022][TEST] Make SparkContextSuite not flaky

## What changes were proposed in this pull request?

SparkContextSuite.test("Cancelling stages/jobs with custom reasons.") could stay in an infinite loop because of the problem found and fixed in [SPARK-23775](https://issues.apache.org/jira/browse/SPARK-23775).

This PR solves this mentioned flakyness by removing shared variable usages when cancel happens in a loop and using wait and CountDownLatch for synhronization.

## How was this patch tested?

Existing unit test.

Author: Gabor Somogyi <ga...@gmail.com>

Closes #21105 from gaborgsomogyi/SPARK-24022.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e55953b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e55953b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e55953b0

Branch: refs/heads/master
Commit: e55953b0bf2a80b34127ba123417ee54955a6064
Parents: 9ea8d3d
Author: Gabor Somogyi <ga...@gmail.com>
Authored: Thu Apr 19 15:06:27 2018 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Apr 19 15:06:27 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/SparkContextSuite.scala    | 61 +++++++++-----------
 1 file changed, 26 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e55953b0/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index b30bd74..ce9f2be 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark
 import java.io.File
 import java.net.{MalformedURLException, URI}
 import java.nio.charset.StandardCharsets
-import java.util.concurrent.{Semaphore, TimeUnit}
+import java.util.concurrent.{CountDownLatch, Semaphore, TimeUnit}
 
 import scala.concurrent.duration._
 
@@ -498,45 +498,36 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
 
   test("Cancelling stages/jobs with custom reasons.") {
     sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+    sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")
     val REASON = "You shall not pass"
-    val slices = 10
 
-    val listener = new SparkListener {
-      override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
-        if (SparkContextSuite.cancelStage) {
-          eventually(timeout(10.seconds)) {
-            assert(SparkContextSuite.isTaskStarted)
+    for (cancelWhat <- Seq("stage", "job")) {
+      // This countdown latch used to make sure stage or job canceled in listener
+      val latch = new CountDownLatch(1)
+
+      val listener = cancelWhat match {
+        case "stage" =>
+          new SparkListener {
+            override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+              sc.cancelStage(taskStart.stageId, REASON)
+              latch.countDown()
+            }
           }
-          sc.cancelStage(taskStart.stageId, REASON)
-          SparkContextSuite.cancelStage = false
-          SparkContextSuite.semaphore.release(slices)
-        }
-      }
-
-      override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
-        if (SparkContextSuite.cancelJob) {
-          eventually(timeout(10.seconds)) {
-            assert(SparkContextSuite.isTaskStarted)
+        case "job" =>
+          new SparkListener {
+            override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+              sc.cancelJob(jobStart.jobId, REASON)
+              latch.countDown()
+            }
           }
-          sc.cancelJob(jobStart.jobId, REASON)
-          SparkContextSuite.cancelJob = false
-          SparkContextSuite.semaphore.release(slices)
-        }
       }
-    }
-    sc.addSparkListener(listener)
-
-    for (cancelWhat <- Seq("stage", "job")) {
-      SparkContextSuite.semaphore.drainPermits()
-      SparkContextSuite.isTaskStarted = false
-      SparkContextSuite.cancelStage = (cancelWhat == "stage")
-      SparkContextSuite.cancelJob = (cancelWhat == "job")
+      sc.addSparkListener(listener)
 
       val ex = intercept[SparkException] {
-        sc.range(0, 10000L, numSlices = slices).mapPartitions { x =>
-          SparkContextSuite.isTaskStarted = true
-          // Block waiting for the listener to cancel the stage or job.
-          SparkContextSuite.semaphore.acquire()
+        sc.range(0, 10000L, numSlices = 10).mapPartitions { x =>
+          x.synchronized {
+            x.wait()
+          }
           x
         }.count()
       }
@@ -550,9 +541,11 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
           fail("Expected the cause to be SparkException, got " + cause.toString() + " instead.")
       }
 
+      latch.await(20, TimeUnit.SECONDS)
       eventually(timeout(20.seconds)) {
         assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
       }
+      sc.removeSparkListener(listener)
     }
   }
 
@@ -637,8 +630,6 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
 }
 
 object SparkContextSuite {
-  @volatile var cancelJob = false
-  @volatile var cancelStage = false
   @volatile var isTaskStarted = false
   @volatile var taskKilled = false
   @volatile var taskSucceeded = false


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org