You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/04/20 17:22:32 UTC

spark git commit: Revert "[SPARK-23775][TEST] Make DataFrameRangeSuite not flaky"

Repository: spark
Updated Branches:
  refs/heads/branch-2.3 9b562d6fe -> 8eb64a5e2


Revert "[SPARK-23775][TEST] Make DataFrameRangeSuite not flaky"

This reverts commit 130641102ceecf2a795d7f0dc6412c7e56eb03a8.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8eb64a5e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8eb64a5e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8eb64a5e

Branch: refs/heads/branch-2.3
Commit: 8eb64a5e222137578ab14b6c91f08a24bd32a3e4
Parents: 9b562d6
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Fri Apr 20 10:22:14 2018 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Fri Apr 20 10:22:14 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/DataFrameRangeSuite.scala  | 78 +++++++++-----------
 1 file changed, 33 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8eb64a5e/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
index a0fd740..57a930d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
@@ -17,16 +17,14 @@
 
 package org.apache.spark.sql
 
-import java.util.concurrent.{CountDownLatch, TimeUnit}
-
 import scala.concurrent.duration._
 import scala.math.abs
 import scala.util.Random
 
 import org.scalatest.concurrent.Eventually
 
-import org.apache.spark.{SparkContext, SparkException}
-import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
+import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -154,53 +152,39 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
   }
 
   test("Cancelling stage in a query with Range.") {
-    // Save and restore the value because SparkContext is shared
-    val savedInterruptOnCancel = sparkContext
-      .getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL)
-
-    try {
-      sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")
-
-      for (codegen <- Seq(true, false)) {
-        // This countdown latch used to make sure with all the stages cancelStage called in listener
-        val latch = new CountDownLatch(2)
-
-        val listener = new SparkListener {
-          override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
-            sparkContext.cancelStage(taskStart.stageId)
-            latch.countDown()
-          }
+    val listener = new SparkListener {
+      override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+        eventually(timeout(10.seconds), interval(1.millis)) {
+          assert(DataFrameRangeSuite.stageToKill > 0)
         }
+        sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
+      }
+    }
 
-        sparkContext.addSparkListener(listener)
-        withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) {
-          val ex = intercept[SparkException] {
-            sparkContext.range(0, 10000L, numSlices = 10).mapPartitions { x =>
-              x.synchronized {
-                x.wait()
-              }
-              x
-            }.toDF("id").agg(sum("id")).collect()
-          }
-          ex.getCause() match {
-            case null =>
-              assert(ex.getMessage().contains("cancelled"))
-            case cause: SparkException =>
-              assert(cause.getMessage().contains("cancelled"))
-            case cause: Throwable =>
-              fail("Expected the cause to be SparkException, got " + cause.toString() + " instead.")
-          }
+    sparkContext.addSparkListener(listener)
+    for (codegen <- Seq(true, false)) {
+      withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) {
+        DataFrameRangeSuite.stageToKill = -1
+        val ex = intercept[SparkException] {
+          spark.range(0, 100000000000L, 1, 1).map { x =>
+            DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
+            x
+          }.toDF("id").agg(sum("id")).collect()
         }
-        latch.await(20, TimeUnit.SECONDS)
-        eventually(timeout(20.seconds)) {
-          assert(sparkContext.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
+        ex.getCause() match {
+          case null =>
+            assert(ex.getMessage().contains("cancelled"))
+          case cause: SparkException =>
+            assert(cause.getMessage().contains("cancelled"))
+          case cause: Throwable =>
+            fail("Expected the cause to be SparkException, got " + cause.toString() + " instead.")
         }
-        sparkContext.removeSparkListener(listener)
       }
-    } finally {
-      sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,
-        savedInterruptOnCancel)
+      eventually(timeout(20.seconds)) {
+        assert(sparkContext.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
+      }
     }
+    sparkContext.removeSparkListener(listener)
   }
 
   test("SPARK-20430 Initialize Range parameters in a driver side") {
@@ -220,3 +204,7 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
     }
   }
 }
+
+object DataFrameRangeSuite {
+  @volatile var stageToKill = -1
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org