You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/12/15 23:46:06 UTC
spark git commit: [SPARK-18868][FLAKY-TEST] Deflake
StreamingQueryListenerSuite: single listener, check trigger...
Repository: spark
Updated Branches:
refs/heads/master 32ff96452 -> 9c7f83b02
[SPARK-18868][FLAKY-TEST] Deflake StreamingQueryListenerSuite: single listener, check trigger...
## What changes were proposed in this pull request?
Use `recentProgress` instead of `lastProgress` and filter out last non-zero value. Also add eventually to the latest assertQuery similar to first `assertQuery`
## How was this patch tested?
Ran test 1000 times
Author: Burak Yavuz <br...@gmail.com>
Closes #16287 from brkyvz/SPARK-18868.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9c7f83b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9c7f83b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9c7f83b0
Branch: refs/heads/master
Commit: 9c7f83b0289ba4550b156e6af31cf7c44580eb12
Parents: 32ff964
Author: Burak Yavuz <br...@gmail.com>
Authored: Thu Dec 15 15:46:03 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Thu Dec 15 15:46:03 2016 -0800
----------------------------------------------------------------------
.../streaming/StreamingQueryListenerSuite.scala | 25 +++++++++++++-------
1 file changed, 16 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/9c7f83b0/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 7c6745ac..a057d1d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -84,7 +84,11 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
CheckAnswer(10, 5),
AssertOnQuery { query =>
assert(listener.progressEvents.nonEmpty)
- assert(listener.progressEvents.last.json === query.lastProgress.json)
+ // SPARK-18868: We can't use query.lastProgress, because in progressEvents, we filter
+ // out non-zero input rows, but the lastProgress may be a zero input row trigger
+ val lastNonZeroProgress = query.recentProgress.filter(_.numInputRows > 0).lastOption
+ .getOrElse(fail("No progress updates received in StreamingQuery!"))
+ assert(listener.progressEvents.last.json === lastNonZeroProgress.json)
assert(listener.terminationEvent === null)
true
},
@@ -109,14 +113,17 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
AdvanceManualClock(100),
ExpectFailure[SparkException],
AssertOnQuery { query =>
- assert(listener.terminationEvent !== null)
- assert(listener.terminationEvent.id === query.id)
- assert(listener.terminationEvent.exception.nonEmpty)
- // Make sure that the exception message reported through listener
- // contains the actual exception and relevant stack trace
- assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException"))
- assert(listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException"))
- assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite"))
+ eventually(Timeout(streamingTimeout)) {
+ assert(listener.terminationEvent !== null)
+ assert(listener.terminationEvent.id === query.id)
+ assert(listener.terminationEvent.exception.nonEmpty)
+ // Make sure that the exception message reported through listener
+ // contains the actual exception and relevant stack trace
+ assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException"))
+ assert(
+ listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException"))
+ assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite"))
+ }
listener.checkAsyncErrors()
true
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org