You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/22 00:32:51 UTC

spark git commit: [SPARK-13405][STREAMING][TESTS] Make sure no messages leak to the next test

Repository: spark
Updated Branches:
  refs/heads/master 03e62aa3f -> 76bd98d91


[SPARK-13405][STREAMING][TESTS] Make sure no messages leak to the next test

## What changes were proposed in this pull request?

Fixed the test failure `org.apache.spark.sql.util.ContinuousQueryListenerSuite.event ordering`: https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-2.6/202/testReport/junit/org.apache.spark.sql.util/ContinuousQueryListenerSuite/event_ordering/

```
      org.scalatest.exceptions.TestFailedException:
Assert failed: : null equaled null onQueryTerminated called before onQueryStarted
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500)
	org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
	org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:466)
	org.apache.spark.sql.util.ContinuousQueryListenerSuite$QueryStatusCollector$$anonfun$onQueryTerminated$1.apply$mcV$sp(ContinuousQueryListenerSuite.scala:204)
	org.scalatest.concurrent.AsyncAssertions$Waiter.apply(AsyncAssertions.scala:349)
	org.apache.spark.sql.util.ContinuousQueryListenerSuite$QueryStatusCollector.onQueryTerminated(ContinuousQueryListenerSuite.scala:203)
	org.apache.spark.sql.execution.streaming.ContinuousQueryListenerBus.doPostEvent(ContinuousQueryListenerBus.scala:67)
	org.apache.spark.sql.execution.streaming.ContinuousQueryListenerBus.doPostEvent(ContinuousQueryListenerBus.scala:32)
	org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
	org.apache.spark.sql.execution.streaming.ContinuousQueryListenerBus.postToAll(ContinuousQueryListenerBus.scala:32)
```

In the previous codes, when the test `adding and removing listener` finishes, there may be still some QueryTerminated events in the listener bus queue. Then when `event ordering` starts to run, it may see these events and throw the above exception.

This PR just added `waitUntilEmpty` in `after` to make sure all events be consumed after each test.

## How was the this patch tested?

Jenkins tests.

Author: Shixiong Zhu <sh...@databricks.com>

Closes #11275 from zsxwing/SPARK-13405.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76bd98d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76bd98d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76bd98d9

Branch: refs/heads/master
Commit: 76bd98d914ecdf6ff0d73d9d8d221fe3403f8627
Parents: 03e62aa
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Sun Feb 21 15:32:49 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sun Feb 21 15:32:49 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala   | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/76bd98d9/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
index d6cc6ad..5278328 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
@@ -41,6 +41,8 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
     sqlContext.streams.active.foreach(_.stop())
     assert(sqlContext.streams.active.isEmpty)
     assert(addedListeners.isEmpty)
+    // Make sure we don't leak any events to the next test
+    sqlContext.sparkContext.listenerBus.waitUntilEmpty(10000)
   }
 
   test("single listener") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org