You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/05/07 09:21:16 UTC
spark git commit: [SPARK-7430] [STREAMING] [TEST] General
improvements to streaming tests to increase debuggability
Repository: spark
Updated Branches:
refs/heads/master 2d6612cc8 -> cfdadcbd2
[SPARK-7430] [STREAMING] [TEST] General improvements to streaming tests to increase debuggability
Author: Tathagata Das <ta...@gmail.com>
Closes #5961 from tdas/SPARK-7430 and squashes the following commits:
d654978 [Tathagata Das] Fix scala style
fbf7174 [Tathagata Das] Added more verbose assert failure messages.
6aea07a [Tathagata Das] Ensure SynchronizedBuffer is used in every TestSuiteBase
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cfdadcbd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cfdadcbd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cfdadcbd
Branch: refs/heads/master
Commit: cfdadcbd2b529cd9ac721509a7ebafe436afcd8d
Parents: 2d6612c
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu May 7 00:21:10 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu May 7 00:21:10 2015 -0700
----------------------------------------------------------------------
.../apache/spark/streaming/TestSuiteBase.scala | 33 +++++++++++++-------
1 file changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/cfdadcbd/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 4d0cd75..4f70ae7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -73,9 +73,11 @@ class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]],
*
* The buffer contains a sequence of RDD's, each containing a sequence of items
*/
-class TestOutputStream[T: ClassTag](parent: DStream[T],
- val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]())
- extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
+class TestOutputStream[T: ClassTag](
+ parent: DStream[T],
+ val output: SynchronizedBuffer[Seq[T]] =
+ new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]
+ ) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
}) {
@@ -95,8 +97,10 @@ class TestOutputStream[T: ClassTag](parent: DStream[T],
* The buffer contains a sequence of RDD's, each containing a sequence of partitions, each
* containing a sequence of items.
*/
-class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
- val output: ArrayBuffer[Seq[Seq[T]]] = ArrayBuffer[Seq[Seq[T]]]())
+class TestOutputStreamWithPartitions[T: ClassTag](
+ parent: DStream[T],
+ val output: SynchronizedBuffer[Seq[Seq[T]]] =
+ new ArrayBuffer[Seq[Seq[T]]] with SynchronizedBuffer[Seq[Seq[T]]])
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.glom().collect().map(_.toSeq)
output += collected
@@ -108,10 +112,6 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
ois.defaultReadObject()
output.clear()
}
-
- def toTestOutputStream: TestOutputStream[T] = {
- new TestOutputStream[T](this.parent, this.output.map(_.flatten))
- }
}
/**
@@ -425,12 +425,21 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
logInfo("--------------------------------")
// Match the output with the expected output
- assert(output.size === expectedOutput.size, "Number of outputs do not match")
for (i <- 0 until output.size) {
if (useSet) {
- assert(output(i).toSet === expectedOutput(i).toSet)
+ assert(
+ output(i).toSet === expectedOutput(i).toSet,
+ s"Set comparison failed\n" +
+ s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" +
+ s"Generated output (${output.size} items): ${output.mkString("\n")}"
+ )
} else {
- assert(output(i).toList === expectedOutput(i).toList)
+ assert(
+ output(i).toList === expectedOutput(i).toList,
+ s"Ordered list comparison failed\n" +
+ s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" +
+ s"Generated output (${output.size} items): ${output.mkString("\n")}"
+ )
}
}
logInfo("Output verified successfully")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org