You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/06/04 00:03:10 UTC

spark git commit: [SPARK-8001] [CORE] Make AsynchronousListenerBus.waitUntilEmpty throw TimeoutException if timeout

Repository: spark
Updated Branches:
  refs/heads/master aa40c4420 -> 1d8669f15


[SPARK-8001] [CORE] Make AsynchronousListenerBus.waitUntilEmpty throw TimeoutException if timeout

Some places forget to call `assert` to check the return value of `AsynchronousListenerBus.waitUntilEmpty`. Instead of adding `assert` in these places, I think it's better to make `AsynchronousListenerBus.waitUntilEmpty` throw `TimeoutException`.

Author: zsxwing <zs...@gmail.com>

Closes #6550 from zsxwing/SPARK-8001 and squashes the following commits:

607674a [zsxwing] Make AsynchronousListenerBus.waitUntilEmpty throw TimeoutException if timeout


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1d8669f1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1d8669f1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1d8669f1

Branch: refs/heads/master
Commit: 1d8669f15c136cd81f494dd487400c62c9498602
Parents: aa40c44
Author: zsxwing <zs...@gmail.com>
Authored: Wed Jun 3 15:03:07 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Wed Jun 3 15:03:07 2015 -0700

----------------------------------------------------------------------
 .../spark/util/AsynchronousListenerBus.scala    | 11 +++++-----
 .../spark/deploy/LogUrlsStandaloneSuite.scala   |  4 ++--
 .../spark/scheduler/DAGSchedulerSuite.scala     | 18 ++++++++--------
 .../spark/scheduler/SparkListenerSuite.scala    | 22 ++++++++++----------
 .../SparkListenerWithClusterSuite.scala         |  2 +-
 .../spark/deploy/yarn/YarnClusterSuite.scala    |  2 +-
 6 files changed, 30 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1d8669f1/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
index 1861d38..61b5a4c 100644
--- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
@@ -120,21 +120,22 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
 
   /**
    * For testing only. Wait until there are no more events in the queue, or until the specified
-   * time has elapsed. Return true if the queue has emptied and false is the specified time
-   * elapsed before the queue emptied.
+   * time has elapsed. Throw `TimeoutException` if the specified time elapsed before the queue
+   * emptied.
    */
   @VisibleForTesting
-  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
+  @throws(classOf[TimeoutException])
+  def waitUntilEmpty(timeoutMillis: Long): Unit = {
     val finishTime = System.currentTimeMillis + timeoutMillis
     while (!queueIsEmpty) {
       if (System.currentTimeMillis > finishTime) {
-        return false
+        throw new TimeoutException(
+          s"The event queue is not empty after $timeoutMillis milliseconds")
       }
       /* Sleep rather than using wait/notify, because this is used only for testing and
        * wait/notify add overhead in the general case. */
       Thread.sleep(10)
     }
-    true
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/1d8669f1/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
index c215b05..ddc9281 100644
--- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
@@ -41,7 +41,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
     // Trigger a job so that executors get added
     sc.parallelize(1 to 100, 4).map(_.toString).count()
 
-    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     listener.addedExecutorInfos.values.foreach { info =>
       assert(info.logUrlMap.nonEmpty)
       // Browse to each URL to check that it's valid
@@ -71,7 +71,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
     // Trigger a job so that executors get added
     sc.parallelize(1 to 100, 4).map(_.toString).count()
 
-    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
     assert(listeners.size === 1)
     val listener = listeners(0)

http://git-wip-us.apache.org/repos/asf/spark/blob/1d8669f1/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index bfcf918..47b2868 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -254,7 +254,7 @@ class DAGSchedulerSuite
   test("[SPARK-3353] parent stage should have lower stage id") {
     sparkListener.stageByOrderOfExecution.clear()
     sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count()
-    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     assert(sparkListener.stageByOrderOfExecution.length === 2)
     assert(sparkListener.stageByOrderOfExecution(0) < sparkListener.stageByOrderOfExecution(1))
   }
@@ -389,7 +389,7 @@ class DAGSchedulerSuite
     submit(unserializableRdd, Array(0))
     assert(failure.getMessage.startsWith(
       "Job aborted due to stage failure: Task not serializable:"))
-    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     assert(sparkListener.failedStages.contains(0))
     assert(sparkListener.failedStages.size === 1)
     assertDataStructuresEmpty()
@@ -399,7 +399,7 @@ class DAGSchedulerSuite
     submit(new MyRDD(sc, 1, Nil), Array(0))
     failed(taskSets(0), "some failure")
     assert(failure.getMessage === "Job aborted due to stage failure: some failure")
-    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     assert(sparkListener.failedStages.contains(0))
     assert(sparkListener.failedStages.size === 1)
     assertDataStructuresEmpty()
@@ -410,7 +410,7 @@ class DAGSchedulerSuite
     val jobId = submit(rdd, Array(0))
     cancel(jobId)
     assert(failure.getMessage === s"Job $jobId cancelled ")
-    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     assert(sparkListener.failedStages.contains(0))
     assert(sparkListener.failedStages.size === 1)
     assertDataStructuresEmpty()
@@ -462,7 +462,7 @@ class DAGSchedulerSuite
     assert(results === Map(0 -> 42))
     assertDataStructuresEmpty()
 
-    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     assert(sparkListener.failedStages.isEmpty)
     assert(sparkListener.successfulStages.contains(0))
   }
@@ -531,7 +531,7 @@ class DAGSchedulerSuite
       Map[Long, Any](),
       createFakeTaskInfo(),
       null))
-    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     assert(sparkListener.failedStages.contains(1))
 
     // The second ResultTask fails, with a fetch failure for the output from the second mapper.
@@ -543,7 +543,7 @@ class DAGSchedulerSuite
       createFakeTaskInfo(),
       null))
     // The SparkListener should not receive redundant failure events.
-    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     assert(sparkListener.failedStages.size == 1)
   }
 
@@ -592,7 +592,7 @@ class DAGSchedulerSuite
 
     // Listener bus should get told about the map stage failing, but not the reduce stage
     // (since the reduce stage hasn't been started yet).
-    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     assert(sparkListener.failedStages.toSet === Set(0))
 
     assertDataStructuresEmpty()
@@ -643,7 +643,7 @@ class DAGSchedulerSuite
     assert(cancelledStages.toSet === Set(0, 2))
 
     // Make sure the listeners got told about both failed stages.
-    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     assert(sparkListener.successfulStages.isEmpty)
     assert(sparkListener.failedStages.toSet === Set(0, 2))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1d8669f1/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 06fb909..651295b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -47,7 +47,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
 
     // Starting listener bus should flush all buffered events
     bus.start(sc)
-    assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     assert(counter.count === 5)
 
     // After listener bus has stopped, posting events should not increment counter
@@ -131,7 +131,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
     rdd2.setName("Target RDD")
     rdd2.count()
 
-    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
 
     listener.stageInfos.size should be {1}
     val (stageInfo, taskInfoMetrics) = listener.stageInfos.head
@@ -156,7 +156,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
     rdd3.setName("Trois")
 
     rdd1.count()
-    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     listener.stageInfos.size should be {1}
     val stageInfo1 = listener.stageInfos.keys.find(_.stageId == 0).get
     stageInfo1.rddInfos.size should be {1} // ParallelCollectionRDD
@@ -165,7 +165,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
     listener.stageInfos.clear()
 
     rdd2.count()
-    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     listener.stageInfos.size should be {1}
     val stageInfo2 = listener.stageInfos.keys.find(_.stageId == 1).get
     stageInfo2.rddInfos.size should be {3} // ParallelCollectionRDD, FilteredRDD, MappedRDD
@@ -174,7 +174,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
     listener.stageInfos.clear()
 
     rdd3.count()
-    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     listener.stageInfos.size should be {2} // Shuffle map stage + result stage
     val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 3).get
     stageInfo3.rddInfos.size should be {1} // ShuffledRDD
@@ -190,7 +190,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
     val rdd2 = rdd1.map(_.toString)
     sc.runJob(rdd2, (items: Iterator[String]) => items.size, Seq(0, 1), true)
 
-    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
 
     listener.stageInfos.size should be {1}
     val (stageInfo, _) = listener.stageInfos.head
@@ -214,7 +214,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
 
     val d = sc.parallelize(0 to 1e4.toInt, 64).map(w)
     d.count()
-    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     listener.stageInfos.size should be (1)
 
     val d2 = d.map { i => w(i) -> i * 2 }.setName("shuffle input 1")
@@ -225,7 +225,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
     d4.setName("A Cogroup")
     d4.collectAsMap()
 
-    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     listener.stageInfos.size should be (4)
     listener.stageInfos.foreach { case (stageInfo, taskInfoMetrics) =>
       /**
@@ -281,7 +281,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
       .reduce { case (x, y) => x }
     assert(result === 1.to(akkaFrameSize).toArray)
 
-    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     val TASK_INDEX = 0
     assert(listener.startedTasks.contains(TASK_INDEX))
     assert(listener.startedGettingResultTasks.contains(TASK_INDEX))
@@ -297,7 +297,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
     val result = sc.parallelize(Seq(1), 1).map(2 * _).reduce { case (x, y) => x }
     assert(result === 2)
 
-    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     val TASK_INDEX = 0
     assert(listener.startedTasks.contains(TASK_INDEX))
     assert(listener.startedGettingResultTasks.isEmpty)
@@ -352,7 +352,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
 
     // Post events to all listeners, and wait until the queue is drained
     (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
-    assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
 
     // The exception should be caught, and the event should be propagated to other listeners
     assert(bus.listenerThreadIsAlive)

http://git-wip-us.apache.org/repos/asf/spark/blob/1d8669f1/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
index c7f179e..50273bc 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
@@ -46,7 +46,7 @@ class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext
     rdd2.setName("Target RDD")
     rdd2.count()
 
-    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     assert(listener.addedExecutorInfo.size == 2)
     assert(listener.addedExecutorInfo("0").totalCores == 1)
     assert(listener.addedExecutorInfo("1").totalCores == 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/1d8669f1/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index d8bc253..bc42e12 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -326,7 +326,7 @@ private object YarnClusterDriver extends Logging with Matchers {
     var result = "failure"
     try {
       val data = sc.parallelize(1 to 4, 4).collect().toSet
-      assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+      sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
       data should be (Set(1, 2, 3, 4))
       result = "success"
     } finally {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org