You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/12/21 19:59:23 UTC

spark git commit: [SPARK-18954][TESTS] Fix flaky test: o.a.s.streaming.BasicOperationsSuite rdd cleanup - map and window

Repository: spark
Updated Branches:
  refs/heads/master ccfe60a83 -> 078c71c2d


[SPARK-18954][TESTS] Fix flaky test: o.a.s.streaming.BasicOperationsSuite rdd cleanup - map and window

## What changes were proposed in this pull request?

The issue in this test is the cleanup of RDDs may not be able to finish before stopping StreamingContext. This PR basically just puts the assertions into `eventually` and runs it before stopping StreamingContext.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <sh...@databricks.com>

Closes #16362 from zsxwing/SPARK-18954.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/078c71c2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/078c71c2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/078c71c2

Branch: refs/heads/master
Commit: 078c71c2dcbb1470d22f8eb8138fb17e3d7c2414
Parents: ccfe60a
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Wed Dec 21 11:59:21 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Dec 21 11:59:21 2016 -0800

----------------------------------------------------------------------
 .../spark/streaming/BasicOperationsSuite.scala  | 98 ++++++++++++--------
 .../apache/spark/streaming/TestSuiteBase.scala  | 19 +++-
 2 files changed, 73 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/078c71c2/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 4e702bb..a3062ac 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -19,13 +19,13 @@ package org.apache.spark.streaming
 
 import java.util.concurrent.ConcurrentLinkedQueue
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.language.existentials
 import scala.reflect.ClassTag
 
+import org.scalatest.concurrent.Eventually.eventually
+
 import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.SparkContext._
 import org.apache.spark.rdd.{BlockRDD, RDD}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.{DStream, WindowedDStream}
@@ -657,48 +657,57 @@ class BasicOperationsSuite extends TestSuiteBase {
        .window(Seconds(4), Seconds(2))
     }
 
-    val operatedStream = runCleanupTest(conf, operation _,
-      numExpectedOutput = cleanupTestInput.size / 2, rememberDuration = Seconds(3))
-    val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]]
-    val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]]
-    val mappedStream = windowedStream1.dependencies.head
-
-    // Checkpoint remember durations
-    assert(windowedStream2.rememberDuration === rememberDuration)
-    assert(windowedStream1.rememberDuration === rememberDuration + windowedStream2.windowDuration)
-    assert(mappedStream.rememberDuration ===
-      rememberDuration + windowedStream2.windowDuration + windowedStream1.windowDuration)
-
-    // WindowedStream2 should remember till 7 seconds: 10, 9, 8, 7
-    // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4
-    // MappedStream should remember till 2 seconds:    10, 9, 8, 7, 6, 5, 4, 3, 2
-
-    // WindowedStream2
-    assert(windowedStream2.generatedRDDs.contains(Time(10000)))
-    assert(windowedStream2.generatedRDDs.contains(Time(8000)))
-    assert(!windowedStream2.generatedRDDs.contains(Time(6000)))
-
-    // WindowedStream1
-    assert(windowedStream1.generatedRDDs.contains(Time(10000)))
-    assert(windowedStream1.generatedRDDs.contains(Time(4000)))
-    assert(!windowedStream1.generatedRDDs.contains(Time(3000)))
-
-    // MappedStream
-    assert(mappedStream.generatedRDDs.contains(Time(10000)))
-    assert(mappedStream.generatedRDDs.contains(Time(2000)))
-    assert(!mappedStream.generatedRDDs.contains(Time(1000)))
+    runCleanupTest(
+        conf,
+        operation _,
+        numExpectedOutput = cleanupTestInput.size / 2,
+        rememberDuration = Seconds(3)) { operatedStream =>
+      eventually(eventuallyTimeout) {
+        val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]]
+        val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]]
+        val mappedStream = windowedStream1.dependencies.head
+
+        // Checkpoint remember durations
+        assert(windowedStream2.rememberDuration === rememberDuration)
+        assert(
+          windowedStream1.rememberDuration === rememberDuration + windowedStream2.windowDuration)
+        assert(mappedStream.rememberDuration ===
+          rememberDuration + windowedStream2.windowDuration + windowedStream1.windowDuration)
+
+        // WindowedStream2 should remember till 7 seconds: 10, 9, 8, 7
+        // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4
+        // MappedStream should remember till 2 seconds:    10, 9, 8, 7, 6, 5, 4, 3, 2
+
+        // WindowedStream2
+        assert(windowedStream2.generatedRDDs.contains(Time(10000)))
+        assert(windowedStream2.generatedRDDs.contains(Time(8000)))
+        assert(!windowedStream2.generatedRDDs.contains(Time(6000)))
+
+        // WindowedStream1
+        assert(windowedStream1.generatedRDDs.contains(Time(10000)))
+        assert(windowedStream1.generatedRDDs.contains(Time(4000)))
+        assert(!windowedStream1.generatedRDDs.contains(Time(3000)))
+
+        // MappedStream
+        assert(mappedStream.generatedRDDs.contains(Time(10000)))
+        assert(mappedStream.generatedRDDs.contains(Time(2000)))
+        assert(!mappedStream.generatedRDDs.contains(Time(1000)))
+      }
+    }
   }
 
   test("rdd cleanup - updateStateByKey") {
     val updateFunc = (values: Seq[Int], state: Option[Int]) => {
       Some(values.sum + state.getOrElse(0))
     }
-    val stateStream = runCleanupTest(
-      conf, _.map(_ -> 1).updateStateByKey(updateFunc).checkpoint(Seconds(3)))
-
-    assert(stateStream.rememberDuration === stateStream.checkpointDuration * 2)
-    assert(stateStream.generatedRDDs.contains(Time(10000)))
-    assert(!stateStream.generatedRDDs.contains(Time(4000)))
+    runCleanupTest(
+      conf, _.map(_ -> 1).updateStateByKey(updateFunc).checkpoint(Seconds(3))) { stateStream =>
+      eventually(eventuallyTimeout) {
+        assert(stateStream.rememberDuration === stateStream.checkpointDuration * 2)
+        assert(stateStream.generatedRDDs.contains(Time(10000)))
+        assert(!stateStream.generatedRDDs.contains(Time(4000)))
+      }
+    }
   }
 
   test("rdd cleanup - input blocks and persisted RDDs") {
@@ -779,13 +788,16 @@ class BasicOperationsSuite extends TestSuiteBase {
     }
   }
 
-  /** Test cleanup of RDDs in DStream metadata */
+  /**
+   * Test cleanup of RDDs in DStream metadata. `assertCleanup` is the function that asserts the
+   * cleanup of RDDs is successful.
+   */
   def runCleanupTest[T: ClassTag](
       conf2: SparkConf,
       operation: DStream[Int] => DStream[T],
       numExpectedOutput: Int = cleanupTestInput.size,
       rememberDuration: Duration = null
-    ): DStream[T] = {
+    )(assertCleanup: (DStream[T]) => Unit): DStream[T] = {
 
     // Setup the stream computation
     assert(batchDuration === Seconds(1),
@@ -794,7 +806,11 @@ class BasicOperationsSuite extends TestSuiteBase {
       val operatedStream =
         ssc.graph.getOutputStreams().head.dependencies.head.asInstanceOf[DStream[T]]
       if (rememberDuration != null) ssc.remember(rememberDuration)
-      val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput)
+      val output = runStreams[(Int, Int)](
+        ssc,
+        cleanupTestInput.size,
+        numExpectedOutput,
+        () => assertCleanup(operatedStream))
       val clock = ssc.scheduler.clock.asInstanceOf[Clock]
       assert(clock.getTimeMillis() === Seconds(10).milliseconds)
       assert(output.size === numExpectedOutput)

http://git-wip-us.apache.org/repos/asf/spark/blob/078c71c2/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index fa975a1..dbab708 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -359,14 +359,20 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
    * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached.
    *
    * Returns a sequence of items for each RDD.
+   *
+   * @param ssc The StreamingContext
+   * @param numBatches The number of batches should be run
+   * @param numExpectedOutput The number of expected output
+   * @param preStop The function to run before stopping StreamingContext
    */
   def runStreams[V: ClassTag](
       ssc: StreamingContext,
       numBatches: Int,
-      numExpectedOutput: Int
+      numExpectedOutput: Int,
+      preStop: () => Unit = () => {}
     ): Seq[Seq[V]] = {
     // Flatten each RDD into a single Seq
-    runStreamsWithPartitions(ssc, numBatches, numExpectedOutput).map(_.flatten.toSeq)
+    runStreamsWithPartitions(ssc, numBatches, numExpectedOutput, preStop).map(_.flatten.toSeq)
   }
 
   /**
@@ -376,11 +382,17 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
    *
    * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each
    * representing one partition.
+   *
+   * @param ssc The StreamingContext
+   * @param numBatches The number of batches should be run
+   * @param numExpectedOutput The number of expected output
+   * @param preStop The function to run before stopping StreamingContext
    */
   def runStreamsWithPartitions[V: ClassTag](
       ssc: StreamingContext,
       numBatches: Int,
-      numExpectedOutput: Int
+      numExpectedOutput: Int,
+      preStop: () => Unit = () => {}
     ): Seq[Seq[Seq[V]]] = {
     assert(numBatches > 0, "Number of batches to run stream computation is zero")
     assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero")
@@ -424,6 +436,7 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
       assert(output.size === numExpectedOutput, "Unexpected number of outputs generated")
 
       Thread.sleep(100) // Give some time for the forgetting old RDDs to complete
+      preStop()
     } finally {
       ssc.stop(stopSparkContext = true)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org