You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/05/07 09:21:16 UTC

spark git commit: [SPARK-7430] [STREAMING] [TEST] General improvements to streaming tests to increase debuggability

Repository: spark
Updated Branches:
  refs/heads/master 2d6612cc8 -> cfdadcbd2


[SPARK-7430] [STREAMING] [TEST] General improvements to streaming tests to increase debuggability

Author: Tathagata Das <ta...@gmail.com>

Closes #5961 from tdas/SPARK-7430 and squashes the following commits:

d654978 [Tathagata Das] Fix scala style
fbf7174 [Tathagata Das] Added more verbose assert failure messages.
6aea07a [Tathagata Das] Ensure SynchronizedBuffer is used in every TestSuiteBase


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cfdadcbd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cfdadcbd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cfdadcbd

Branch: refs/heads/master
Commit: cfdadcbd2b529cd9ac721509a7ebafe436afcd8d
Parents: 2d6612c
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu May 7 00:21:10 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu May 7 00:21:10 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/streaming/TestSuiteBase.scala  | 33 +++++++++++++-------
 1 file changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cfdadcbd/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 4d0cd75..4f70ae7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -73,9 +73,11 @@ class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]],
  *
  * The buffer contains a sequence of RDD's, each containing a sequence of items
  */
-class TestOutputStream[T: ClassTag](parent: DStream[T],
-    val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]())
-  extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
+class TestOutputStream[T: ClassTag](
+    parent: DStream[T],
+    val output: SynchronizedBuffer[Seq[T]] =
+      new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]
+  ) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
     val collected = rdd.collect()
     output += collected
   }) {
@@ -95,8 +97,10 @@ class TestOutputStream[T: ClassTag](parent: DStream[T],
  * The buffer contains a sequence of RDD's, each containing a sequence of partitions, each
  * containing a sequence of items.
  */
-class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
-    val output: ArrayBuffer[Seq[Seq[T]]] = ArrayBuffer[Seq[Seq[T]]]())
+class TestOutputStreamWithPartitions[T: ClassTag](
+    parent: DStream[T],
+    val output: SynchronizedBuffer[Seq[Seq[T]]] =
+      new ArrayBuffer[Seq[Seq[T]]] with SynchronizedBuffer[Seq[Seq[T]]])
   extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
     val collected = rdd.glom().collect().map(_.toSeq)
     output += collected
@@ -108,10 +112,6 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
     ois.defaultReadObject()
     output.clear()
   }
-
-  def toTestOutputStream: TestOutputStream[T] = {
-    new TestOutputStream[T](this.parent, this.output.map(_.flatten))
-  }
 }
 
 /**
@@ -425,12 +425,21 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
     logInfo("--------------------------------")
 
     // Match the output with the expected output
-    assert(output.size === expectedOutput.size, "Number of outputs do not match")
     for (i <- 0 until output.size) {
       if (useSet) {
-        assert(output(i).toSet === expectedOutput(i).toSet)
+        assert(
+          output(i).toSet === expectedOutput(i).toSet,
+          s"Set comparison failed\n" +
+            s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" +
+            s"Generated output (${output.size} items): ${output.mkString("\n")}"
+        )
       } else {
-        assert(output(i).toList === expectedOutput(i).toList)
+        assert(
+          output(i).toList === expectedOutput(i).toList,
+          s"Ordered list comparison failed\n" +
+            s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" +
+            s"Generated output (${output.size} items): ${output.mkString("\n")}"
+        )
       }
     }
     logInfo("Output verified successfully")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org