You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/04/19 22:06:33 UTC
spark git commit: [SPARK-24022][TEST] Make SparkContextSuite not flaky
Repository: spark
Updated Branches:
refs/heads/master 9ea8d3d31 -> e55953b0b
[SPARK-24022][TEST] Make SparkContextSuite not flaky
## What changes were proposed in this pull request?
SparkContextSuite.test("Cancelling stages/jobs with custom reasons.") could stay in an infinite loop because of the problem found and fixed in [SPARK-23775](https://issues.apache.org/jira/browse/SPARK-23775).
This PR solves this mentioned flakyness by removing shared variable usages when cancel happens in a loop and using wait and CountDownLatch for synhronization.
## How was this patch tested?
Existing unit test.
Author: Gabor Somogyi <ga...@gmail.com>
Closes #21105 from gaborgsomogyi/SPARK-24022.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e55953b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e55953b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e55953b0
Branch: refs/heads/master
Commit: e55953b0bf2a80b34127ba123417ee54955a6064
Parents: 9ea8d3d
Author: Gabor Somogyi <ga...@gmail.com>
Authored: Thu Apr 19 15:06:27 2018 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Apr 19 15:06:27 2018 -0700
----------------------------------------------------------------------
.../org/apache/spark/SparkContextSuite.scala | 61 +++++++++-----------
1 file changed, 26 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e55953b0/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index b30bd74..ce9f2be 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark
import java.io.File
import java.net.{MalformedURLException, URI}
import java.nio.charset.StandardCharsets
-import java.util.concurrent.{Semaphore, TimeUnit}
+import java.util.concurrent.{CountDownLatch, Semaphore, TimeUnit}
import scala.concurrent.duration._
@@ -498,45 +498,36 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
test("Cancelling stages/jobs with custom reasons.") {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+ sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")
val REASON = "You shall not pass"
- val slices = 10
- val listener = new SparkListener {
- override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
- if (SparkContextSuite.cancelStage) {
- eventually(timeout(10.seconds)) {
- assert(SparkContextSuite.isTaskStarted)
+ for (cancelWhat <- Seq("stage", "job")) {
+ // This countdown latch used to make sure stage or job canceled in listener
+ val latch = new CountDownLatch(1)
+
+ val listener = cancelWhat match {
+ case "stage" =>
+ new SparkListener {
+ override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+ sc.cancelStage(taskStart.stageId, REASON)
+ latch.countDown()
+ }
}
- sc.cancelStage(taskStart.stageId, REASON)
- SparkContextSuite.cancelStage = false
- SparkContextSuite.semaphore.release(slices)
- }
- }
-
- override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
- if (SparkContextSuite.cancelJob) {
- eventually(timeout(10.seconds)) {
- assert(SparkContextSuite.isTaskStarted)
+ case "job" =>
+ new SparkListener {
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+ sc.cancelJob(jobStart.jobId, REASON)
+ latch.countDown()
+ }
}
- sc.cancelJob(jobStart.jobId, REASON)
- SparkContextSuite.cancelJob = false
- SparkContextSuite.semaphore.release(slices)
- }
}
- }
- sc.addSparkListener(listener)
-
- for (cancelWhat <- Seq("stage", "job")) {
- SparkContextSuite.semaphore.drainPermits()
- SparkContextSuite.isTaskStarted = false
- SparkContextSuite.cancelStage = (cancelWhat == "stage")
- SparkContextSuite.cancelJob = (cancelWhat == "job")
+ sc.addSparkListener(listener)
val ex = intercept[SparkException] {
- sc.range(0, 10000L, numSlices = slices).mapPartitions { x =>
- SparkContextSuite.isTaskStarted = true
- // Block waiting for the listener to cancel the stage or job.
- SparkContextSuite.semaphore.acquire()
+ sc.range(0, 10000L, numSlices = 10).mapPartitions { x =>
+ x.synchronized {
+ x.wait()
+ }
x
}.count()
}
@@ -550,9 +541,11 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
fail("Expected the cause to be SparkException, got " + cause.toString() + " instead.")
}
+ latch.await(20, TimeUnit.SECONDS)
eventually(timeout(20.seconds)) {
assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
}
+ sc.removeSparkListener(listener)
}
}
@@ -637,8 +630,6 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}
object SparkContextSuite {
- @volatile var cancelJob = false
- @volatile var cancelStage = false
@volatile var isTaskStarted = false
@volatile var taskKilled = false
@volatile var taskSucceeded = false
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org