You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/09/03 22:47:38 UTC

spark git commit: [SPARK-10431] [CORE] Fix intermittent test failure. Wait for event queue to be clear

Repository: spark
Updated Branches:
  refs/heads/master 49aff7b9a -> d911c682f


[SPARK-10431] [CORE] Fix intermittent test failure. Wait for event queue to be clear

Author: robbins <ro...@uk.ibm.com>

Closes #8582 from robbinspg/InputOutputMetricsSuite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d911c682
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d911c682
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d911c682

Branch: refs/heads/master
Commit: d911c682f00cd5c438568c548098e03d3e7ea05c
Parents: 49aff7b
Author: robbins <ro...@uk.ibm.com>
Authored: Thu Sep 3 13:47:22 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Thu Sep 3 13:47:25 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d911c682/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index d3218a5..44eb5a0 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -286,6 +286,10 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
   private def runAndReturnMetrics(job: => Unit,
       collector: (SparkListenerTaskEnd) => Option[Long]): Long = {
     val taskMetrics = new ArrayBuffer[Long]()
+
+    // Avoid receiving earlier taskEnd events
+    sc.listenerBus.waitUntilEmpty(500)
+
     sc.addSparkListener(new SparkListener() {
       override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
         collector(taskEnd).foreach(taskMetrics += _)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org