You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/05/07 06:45:35 UTC

spark git commit: [SPARK-23775][TEST] Make DataFrameRangeSuite not flaky

Repository: spark
Updated Branches:
  refs/heads/master d2aa859b4 -> c5981976f


[SPARK-23775][TEST] Make DataFrameRangeSuite not flaky

## What changes were proposed in this pull request?

DataFrameRangeSuite.test("Cancelling stage in a query with Range.") stays sometimes in an infinite loop and times out the build.

There were multiple issues with the test:

1. The first valid stageId is zero when the test started alone and not in a suite and the following code waits until timeout:

```
eventually(timeout(10.seconds), interval(1.millis)) {
  assert(DataFrameRangeSuite.stageToKill > 0)
}
```

2. The `DataFrameRangeSuite.stageToKill` was overwritten by the task's thread after the reset which ended up in canceling the same stage 2 times. This caused the infinite wait.

This PR solves this mentioned flakyness by removing the shared `DataFrameRangeSuite.stageToKill` and using `onTaskStart` where stage ID is provided. In order to make sure cancelStage called for all stages `waitUntilEmpty` is called on `ListenerBus`.

In [PR20888](https://github.com/apache/spark/pull/20888) this tried to get solved by:
* Stopping the executor thread with `wait`
* Wait for all `cancelStage` called
* Kill the executor thread by setting `SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL`

but the thread killing left the shared `SparkContext` sometimes in a state where further jobs can't be submitted. As a result DataFrameRangeSuite.test("Cancelling stage in a query with Range.") test passed properly but the next test inside the suite was hanging.

## How was this patch tested?

Existing unit test executed 10k times.

Author: Gabor Somogyi <ga...@gmail.com>

Closes #21214 from gaborgsomogyi/SPARK-23775_1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5981976
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5981976
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5981976

Branch: refs/heads/master
Commit: c5981976f1d514a3ad8a684b9a21cebe38b786fa
Parents: d2aa859
Author: Gabor Somogyi <ga...@gmail.com>
Authored: Mon May 7 14:45:14 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon May 7 14:45:14 2018 +0800

----------------------------------------------------------------------
 .../apache/spark/sql/DataFrameRangeSuite.scala  | 24 +++++++-------------
 1 file changed, 8 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c5981976/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
index 57a930d..b0b4664 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
@@ -23,8 +23,8 @@ import scala.util.Random
 
 import org.scalatest.concurrent.Eventually
 
-import org.apache.spark.{SparkException, TaskContext}
-import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
+import org.apache.spark.SparkException
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -153,23 +153,17 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
 
   test("Cancelling stage in a query with Range.") {
     val listener = new SparkListener {
-      override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
-        eventually(timeout(10.seconds), interval(1.millis)) {
-          assert(DataFrameRangeSuite.stageToKill > 0)
-        }
-        sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
+      override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+        sparkContext.cancelStage(taskStart.stageId)
       }
     }
 
     sparkContext.addSparkListener(listener)
     for (codegen <- Seq(true, false)) {
       withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) {
-        DataFrameRangeSuite.stageToKill = -1
         val ex = intercept[SparkException] {
-          spark.range(0, 100000000000L, 1, 1).map { x =>
-            DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
-            x
-          }.toDF("id").agg(sum("id")).collect()
+          spark.range(0, 100000000000L, 1, 1)
+            .toDF("id").agg(sum("id")).collect()
         }
         ex.getCause() match {
           case null =>
@@ -180,6 +174,8 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
             fail("Expected the cause to be SparkException, got " + cause.toString() + " instead.")
         }
       }
+      // Wait until all ListenerBus events consumed to make sure cancelStage called for all stages
+      sparkContext.listenerBus.waitUntilEmpty(20.seconds.toMillis)
       eventually(timeout(20.seconds)) {
         assert(sparkContext.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
       }
@@ -204,7 +200,3 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
     }
   }
 }
-
-object DataFrameRangeSuite {
-  @volatile var stageToKill = -1
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org