You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/14 07:29:46 UTC

[1/2] git commit: Added unpersisting and modified testsuite to better test out metadata cleaning.

Updated Branches:
  refs/heads/master b07bc02a0 -> 08b9fec93


Added unpersisting and modified testsuite to better test out metadata cleaning.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/27311b13
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/27311b13
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/27311b13

Branch: refs/heads/master
Commit: 27311b13321ba60ee1324b86234f0aaf63df9f67
Parents: b93f9d4
Author: Tathagata Das <ta...@gmail.com>
Authored: Mon Jan 13 14:57:07 2014 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Jan 13 14:57:07 2014 -0800

----------------------------------------------------------------------
 .../spark/scheduler/TaskSchedulerImpl.scala     |  5 +-
 .../streaming/api/java/JavaDStreamLike.scala    |  3 +-
 .../spark/streaming/dstream/DStream.scala       | 11 ++-
 .../dstream/DStreamCheckpointData.scala         |  2 +-
 .../spark/streaming/BasicOperationsSuite.scala  | 72 +++++++++++++-------
 5 files changed, 63 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/27311b13/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index d4f74d3..6cc608e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -352,9 +352,8 @@ private[spark] class TaskSchedulerImpl(
       taskResultGetter.stop()
     }
 
-    // sleeping for an arbitrary 5 seconds : to ensure that messages are sent out.
-    // TODO: Do something better !
-    Thread.sleep(5000L)
+    // sleeping for an arbitrary 1 seconds to ensure that messages are sent out.
+    Thread.sleep(1000L)
   }
 
   override def defaultParallelism() = backend.defaultParallelism()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/27311b13/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 1ec4492..a493a82 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -208,7 +208,6 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
     dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration)
   }
 
-
   /**
    * Return a new DStream in which each RDD has a single element generated by reducing all
    * elements in a sliding window over this DStream. However, the reduction is done incrementally
@@ -410,7 +409,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
   }
 
   /**
-   * Enable periodic checkpointing of RDDs of this DStream
+   * Enable periodic checkpointing of RDDs of this DStream.
    * @param interval Time interval after which generated RDD will be checkpointed
    */
   def checkpoint(interval: Duration) = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/27311b13/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index a7c4cca..9e2b36f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -334,6 +334,10 @@ abstract class DStream[T: ClassTag] (
   private[streaming] def clearMetadata(time: Time) {
     val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
     generatedRDDs --= oldRDDs.keys
+    if (ssc.conf.getBoolean("spark.streaming.unpersist", false)) {
+      logDebug("Unpersisting old RDDs: " + oldRDDs.keys.mkString(", "))
+      oldRDDs.values.foreach(_.unpersist(false))
+    }
     logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
       (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
     dependencies.foreach(_.clearMetadata(time))
@@ -751,7 +755,12 @@ abstract class DStream[T: ClassTag] (
     this.foreachRDD(saveFunc)
   }
 
-  def register() {
+  /**
+   * Register this streaming as an output stream. This would ensure that RDDs of this
+   * DStream will be generated.
+   */
+  def register(): DStream[T] = {
     ssc.registerOutputStream(this)
+    this
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/27311b13/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 2da4127..38bad5a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -94,7 +94,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
             }
         }
       case None =>
-        logInfo("Nothing to delete")
+        logDebug("Nothing to delete")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/27311b13/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 7037aae..b73edf8 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -24,7 +24,9 @@ import org.apache.spark.SparkContext._
 
 import util.ManualClock
 import org.apache.spark.{SparkContext, SparkConf}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.dstream.{WindowedDStream, DStream}
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.reflect.ClassTag
 
 class BasicOperationsSuite extends TestSuiteBase {
   test("map") {
@@ -395,40 +397,31 @@ class BasicOperationsSuite extends TestSuiteBase {
     Thread.sleep(1000)
   }
 
-  test("forgetting of RDDs - map and window operations") {
-    assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second")
+  val cleanupTestInput = (0 until 10).map(x => Seq(x, x + 1)).toSeq
 
-    val input = (0 until 10).map(x => Seq(x, x + 1)).toSeq
+  test("rdd cleanup - map and window") {
     val rememberDuration = Seconds(3)
-
-    assert(input.size === 10, "Number of inputs have changed")
-
     def operation(s: DStream[Int]): DStream[(Int, Int)] = {
       s.map(x => (x % 10, 1))
        .window(Seconds(2), Seconds(1))
        .window(Seconds(4), Seconds(2))
     }
 
-    val ssc = setupStreams(input, operation _)
-    ssc.remember(rememberDuration)
-    runStreams[(Int, Int)](ssc, input.size, input.size / 2)
-
-    val windowedStream2 = ssc.graph.getOutputStreams().head.dependencies.head
-    val windowedStream1 = windowedStream2.dependencies.head
+    val operatedStream = runCleanupTest(conf, operation _,
+      numExpectedOutput = cleanupTestInput.size / 2, rememberDuration = Seconds(3))
+    val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]]
+    val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]]
     val mappedStream = windowedStream1.dependencies.head
 
-    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-    assert(clock.time === Seconds(10).milliseconds)
-
-    // IDEALLY
-    // WindowedStream2 should remember till 7 seconds: 10, 8,
-    // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5
-    // MappedStream should remember till 7 seconds:    10, 9, 8, 7, 6, 5, 4, 3,
+    // Checkpoint remember durations
+    assert(windowedStream2.rememberDuration === rememberDuration)
+    assert(windowedStream1.rememberDuration === rememberDuration + windowedStream2.windowDuration)
+    assert(mappedStream.rememberDuration ===
+      rememberDuration + windowedStream2.windowDuration + windowedStream1.windowDuration)
 
-    // IN THIS TEST
-    // WindowedStream2 should remember till 7 seconds: 10, 8,
+    // WindowedStream2 should remember till 7 seconds: 10, 9, 8, 7
     // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4
-    // MappedStream should remember till 7 seconds:    10, 9, 8, 7, 6, 5, 4, 3, 2
+    // MappedStream should remember till 2 seconds:    10, 9, 8, 7, 6, 5, 4, 3, 2
 
     // WindowedStream2
     assert(windowedStream2.generatedRDDs.contains(Time(10000)))
@@ -445,4 +438,37 @@ class BasicOperationsSuite extends TestSuiteBase {
     assert(mappedStream.generatedRDDs.contains(Time(2000)))
     assert(!mappedStream.generatedRDDs.contains(Time(1000)))
   }
+
+  test("rdd cleanup - updateStateByKey") {
+    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+      Some(values.foldLeft(0)(_ + _) + state.getOrElse(0))
+    }
+    val stateStream = runCleanupTest(
+      conf, _.map(_ -> 1).updateStateByKey(updateFunc).checkpoint(Seconds(3)))
+
+    assert(stateStream.rememberDuration === stateStream.checkpointDuration * 2)
+    assert(stateStream.generatedRDDs.contains(Time(10000)))
+    assert(!stateStream.generatedRDDs.contains(Time(4000)))
+  }
+
+  /** Test cleanup of RDDs in DStream metadata */
+  def runCleanupTest[T: ClassTag](
+      conf2: SparkConf,
+      operation: DStream[Int] => DStream[T],
+      numExpectedOutput: Int = cleanupTestInput.size,
+      rememberDuration: Duration = null
+    ): DStream[T] = {
+
+    // Setup the stream computation
+    assert(batchDuration === Seconds(1),
+      "Batch duration has changed from 1 second, check cleanup tests")
+    val ssc = setupStreams(cleanupTestInput, operation)
+    val operatedStream = ssc.graph.getOutputStreams().head.dependencies.head.asInstanceOf[DStream[T]]
+    if (rememberDuration != null) ssc.remember(rememberDuration)
+    val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput)
+    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+    assert(clock.time === Seconds(10).milliseconds)
+    assert(output.size === numExpectedOutput)
+    operatedStream
+  }
 }


[2/2] git commit: Merge pull request #409 from tdas/unpersist

Posted by pw...@apache.org.
Merge pull request #409 from tdas/unpersist

Automatically unpersisting RDDs that have been cleaned up from DStreams

Earlier RDDs generated by DStreams were forgotten but not unpersisted. The system relied on the natural BlockManager LRU to drop the data. The cleaner.ttl was a hammer to clean up RDDs but it is something that needs to be set separately and need to be set very conservatively (at best, few minutes). This automatic unpersisting allows the system to handle this automatically, which reduces memory usage. As a side effect it will also improve GC performance as there are less number of objects stored in memory. In fact, for some workloads, it may allow RDDs to be cached as deserialized, which speeds up processing without too much GC overheads.

This is disabled by default. To enable it set configuration spark.streaming.unpersist to true. In future release, this will be set to true by default.

Also, reduced sleep time in TaskSchedulerImpl.stop() from 5 second to 1 second. From my conversation with Matei, there does not seem to be any good reason for the sleep for letting messages be sent out be so long.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/08b9fec9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/08b9fec9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/08b9fec9

Branch: refs/heads/master
Commit: 08b9fec93d00ff0ebb49af4d9ac72d2806eded02
Parents: b07bc02 27311b1
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 13 22:29:03 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 13 22:29:03 2014 -0800

----------------------------------------------------------------------
 .../spark/scheduler/TaskSchedulerImpl.scala     |  5 +-
 .../streaming/api/java/JavaDStreamLike.scala    |  3 +-
 .../spark/streaming/dstream/DStream.scala       | 11 ++-
 .../dstream/DStreamCheckpointData.scala         |  2 +-
 .../spark/streaming/BasicOperationsSuite.scala  | 72 +++++++++++++-------
 5 files changed, 63 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/08b9fec9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------