You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/13 21:18:23 UTC

[6/9] git commit: Fixed persistence logic of WindowedDStream, and fixed default persistence level of input streams.

Fixed persistence logic of WindowedDStream, and fixed default persistence level of input streams.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/034f89aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/034f89aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/034f89aa

Branch: refs/heads/master
Commit: 034f89aaab1db95e8908432f2445d6841526efcf
Parents: 74d0126
Author: Tathagata Das <ta...@gmail.com>
Authored: Sun Jan 12 19:02:27 2014 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Sun Jan 12 19:02:27 2014 -0800

----------------------------------------------------------------------
 .../apache/spark/streaming/flume/FlumeUtils.scala  |  1 +
 .../apache/spark/streaming/kafka/KafkaUtils.scala  |  3 ++-
 .../apache/spark/streaming/mqtt/MQTTUtils.scala    |  1 +
 .../spark/streaming/twitter/TwitterUtils.scala     |  4 ++++
 .../org/apache/spark/streaming/DStreamGraph.scala  |  2 +-
 .../apache/spark/streaming/StreamingContext.scala  |  5 +++--
 .../streaming/api/java/JavaStreamingContext.scala  |  4 ++--
 .../spark/streaming/dstream/WindowedDStream.scala  | 17 +++++++++++++----
 .../spark/streaming/WindowOperationsSuite.scala    | 14 ++++++++++++++
 9 files changed, 41 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/034f89aa/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index a01c17a..a6af53e 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -43,6 +43,7 @@ object FlumeUtils {
 
   /**
    * Creates a input stream from a Flume source.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param hostname Hostname of the slave machine to which the flume data will be sent
    * @param port     Port of the slave machine to which the flume data will be sent
    */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/034f89aa/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index df4ecac..76f9c46 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -78,6 +78,7 @@ object KafkaUtils {
 
   /**
    * Create an input stream that pulls messages form a Kafka Broker.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param jssc      JavaStreamingContext object
    * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
    * @param groupId   The group id for this consumer
@@ -128,7 +129,7 @@ object KafkaUtils {
    *                    see http://kafka.apache.org/08/configuration.html
    * @param topics  Map of (topic_name -> numPartitions) to consume. Each partition is consumed
    *                in its own thread
-   * @param storageLevel RDD storage level. Defaults to MEMORY_AND_DISK_2.
+   * @param storageLevel RDD storage level.
    */
   def createStream[K, V, U <: Decoder[_], T <: Decoder[_]](
       jssc: JavaStreamingContext,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/034f89aa/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index eacb26f..caa86b2 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -44,6 +44,7 @@ object MQTTUtils {
 
   /**
    * Create an input stream that receives messages pushed by a MQTT publisher.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param jssc      JavaStreamingContext object
    * @param brokerUrl Url of remote MQTT publisher
    * @param topic     Topic name to subscribe to

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/034f89aa/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
----------------------------------------------------------------------
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
index 8ea52c4..a23d685 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
@@ -51,6 +51,7 @@ object TwitterUtils {
    * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
    * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
    * twitter4j.oauth.accessTokenSecret.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param jssc   JavaStreamingContext object
    */
   def createStream(jssc: JavaStreamingContext): JavaDStream[Status] = {
@@ -62,6 +63,7 @@ object TwitterUtils {
    * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
    * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
    * twitter4j.oauth.accessTokenSecret.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param jssc    JavaStreamingContext object
    * @param filters Set of filter strings to get only those tweets that match them
    */
@@ -88,6 +90,7 @@ object TwitterUtils {
 
   /**
    * Create a input stream that returns tweets received from Twitter.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param jssc        JavaStreamingContext object
    * @param twitterAuth Twitter4J Authorization
    */
@@ -97,6 +100,7 @@ object TwitterUtils {
 
   /**
    * Create a input stream that returns tweets received from Twitter.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param jssc        JavaStreamingContext object
    * @param twitterAuth Twitter4J Authorization
    * @param filters     Set of filter strings to get only those tweets that match them

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/034f89aa/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 31038a0..8faa79f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -78,7 +78,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
   def remember(duration: Duration) {
     this.synchronized {
       if (rememberDuration != null) {
-        throw new Exception("Batch duration already set as " + batchDuration +
+        throw new Exception("Remember duration already set as " + batchDuration +
           ". cannot set it again.")
       }
       rememberDuration = duration

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/034f89aa/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ee83ae9..7b27933 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -168,7 +168,7 @@ class StreamingContext private[streaming] (
   }
 
   /**
-   * Set the context to periodically checkpoint the DStream operations for master
+   * Set the context to periodically checkpoint the DStream operations for driver
    * fault-tolerance.
    * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored.
    *                  Note that this must be a fault-tolerant file system like HDFS for
@@ -220,7 +220,7 @@ class StreamingContext private[streaming] (
   def actorStream[T: ClassTag](
       props: Props,
       name: String,
-      storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
+      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
       supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
     ): DStream[T] = {
     networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
@@ -272,6 +272,7 @@ class StreamingContext private[streaming] (
    * @param hostname      Hostname to connect to for receiving data
    * @param port          Port to connect to for receiving data
    * @param storageLevel  Storage level to use for storing the received objects
+   *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
    * @tparam T            Type of the objects in the received blocks
    */
   def rawSocketStream[T: ClassTag](

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/034f89aa/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index a9d605d..a2f0b88 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -151,7 +151,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
    * @param hostname      Hostname to connect to for receiving data
    * @param port          Port to connect to for receiving data
    * @param storageLevel  Storage level to use for storing the received objects
-   *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
    */
   def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel)
   : JavaDStream[String] = {
@@ -161,7 +160,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   /**
    * Create a input stream from network source hostname:port. Data is received using
    * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
-   * lines.
+   * lines. Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param hostname      Hostname to connect to for receiving data
    * @param port          Port to connect to for receiving data
    */
@@ -302,6 +301,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
 
   /**
    * Create an input stream with any arbitrary user implemented actor receiver.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param props Props object defining creation of the actor
    * @param name Name of the actor
    *

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/034f89aa/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index 89c43ff..6301772 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -32,13 +32,14 @@ class WindowedDStream[T: ClassTag](
   extends DStream[T](parent.ssc) {
 
   if (!_windowDuration.isMultipleOf(parent.slideDuration))
-    throw new Exception("The window duration of WindowedDStream (" + _slideDuration + ") " +
-    "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
+    throw new Exception("The window duration of windowed DStream (" + _slideDuration + ") " +
+    "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
 
   if (!_slideDuration.isMultipleOf(parent.slideDuration))
-    throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " +
-    "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
+    throw new Exception("The slide duration of windowed DStream (" + _slideDuration + ") " +
+    "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
 
+  // Persist parent level by default, as those RDDs are going to be obviously reused.
   parent.persist(StorageLevel.MEMORY_ONLY_SER)
 
   def windowDuration: Duration =  _windowDuration
@@ -49,6 +50,14 @@ class WindowedDStream[T: ClassTag](
 
   override def parentRememberDuration: Duration = rememberDuration + windowDuration
 
+  override def persist(level: StorageLevel): DStream[T] = {
+    // Do not let this windowed DStream be persisted as windowed (union-ed) RDDs share underlying
+    // RDDs and persisting the windowed RDDs would store numerous copies of the underlying data.
+    // Instead control the persistence of the parent DStream.
+    parent.persist(level)
+    this
+  }
+
   override def compute(validTime: Time): Option[RDD[T]] = {
     val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
     val rddsInWindow = parent.slice(currentWindow)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/034f89aa/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
index 8f3c2dd..471c99f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.streaming
 
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.storage.StorageLevel
 
 class WindowOperationsSuite extends TestSuiteBase {
 
@@ -144,6 +145,19 @@ class WindowOperationsSuite extends TestSuiteBase {
     Seconds(3)
   )
 
+  test("window - persistence level") {
+    val input = Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5))
+    val ssc = new StreamingContext(conf, batchDuration)
+    val inputStream = new TestInputStream[Int](ssc, input, 1)
+    val windowStream1 = inputStream.window(batchDuration * 2)
+    assert(windowStream1.storageLevel === StorageLevel.NONE)
+    assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY_SER)
+    windowStream1.persist(StorageLevel.MEMORY_ONLY)
+    assert(windowStream1.storageLevel === StorageLevel.NONE)
+    assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY)
+    ssc.stop()
+  }
+
   // Testing naive reduceByKeyAndWindow (without invertible function)
 
   testReduceByKeyAndWindow(