You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/13 21:18:18 UTC

[1/9] Moved DStream, DStreamCheckpointData and PairDStream from org.apache.spark.streaming to org.apache.spark.streaming.dstream.

Updated Branches:
  refs/heads/master e6ed13f25 -> b93f9d42f


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
index f670f65..475569c 100644
--- a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
@@ -24,8 +24,9 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark._
 import org.apache.spark.api.java._
 import org.apache.spark.rdd.{RDD, DoubleRDDFunctions, PairRDDFunctions, OrderedRDDFunctions}
-import org.apache.spark.streaming.{PairDStreamFunctions, DStream, StreamingContext}
+import org.apache.spark.streaming.{StreamingContext}
 import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.{DStream, PairDStreamFunctions}
 
 
 private[spark] abstract class SparkType(val name: String)
@@ -147,7 +148,7 @@ object JavaAPICompletenessChecker {
               } else {
                 ParameterizedType(classOf[JavaRDD[_]].getName, parameters.map(applySubs))
               }
-            case "org.apache.spark.streaming.DStream" =>
+            case "org.apache.spark.streaming.dstream.DStream" =>
               if (parameters(0).name == classOf[Tuple2[_, _]].getName) {
                 val tupleParams =
                   parameters(0).asInstanceOf[ParameterizedType].parameters.map(applySubs)
@@ -248,30 +249,29 @@ object JavaAPICompletenessChecker {
       "org.apache.spark.SparkContext.getSparkHome",
       "org.apache.spark.SparkContext.executorMemoryRequested",
       "org.apache.spark.SparkContext.getExecutorStorageStatus",
-      "org.apache.spark.streaming.DStream.generatedRDDs",
-      "org.apache.spark.streaming.DStream.zeroTime",
-      "org.apache.spark.streaming.DStream.rememberDuration",
-      "org.apache.spark.streaming.DStream.storageLevel",
-      "org.apache.spark.streaming.DStream.mustCheckpoint",
-      "org.apache.spark.streaming.DStream.checkpointDuration",
-      "org.apache.spark.streaming.DStream.checkpointData",
-      "org.apache.spark.streaming.DStream.graph",
-      "org.apache.spark.streaming.DStream.isInitialized",
-      "org.apache.spark.streaming.DStream.parentRememberDuration",
-      "org.apache.spark.streaming.DStream.initialize",
-      "org.apache.spark.streaming.DStream.validate",
-      "org.apache.spark.streaming.DStream.setContext",
-      "org.apache.spark.streaming.DStream.setGraph",
-      "org.apache.spark.streaming.DStream.remember",
-      "org.apache.spark.streaming.DStream.getOrCompute",
-      "org.apache.spark.streaming.DStream.generateJob",
-      "org.apache.spark.streaming.DStream.clearOldMetadata",
-      "org.apache.spark.streaming.DStream.addMetadata",
-      "org.apache.spark.streaming.DStream.updateCheckpointData",
-      "org.apache.spark.streaming.DStream.restoreCheckpointData",
-      "org.apache.spark.streaming.DStream.isTimeValid",
+      "org.apache.spark.streaming.dstream.DStream.generatedRDDs",
+      "org.apache.spark.streaming.dstream.DStream.zeroTime",
+      "org.apache.spark.streaming.dstream.DStream.rememberDuration",
+      "org.apache.spark.streaming.dstream.DStream.storageLevel",
+      "org.apache.spark.streaming.dstream.DStream.mustCheckpoint",
+      "org.apache.spark.streaming.dstream.DStream.checkpointDuration",
+      "org.apache.spark.streaming.dstream.DStream.checkpointData",
+      "org.apache.spark.streaming.dstream.DStream.graph",
+      "org.apache.spark.streaming.dstream.DStream.isInitialized",
+      "org.apache.spark.streaming.dstream.DStream.parentRememberDuration",
+      "org.apache.spark.streaming.dstream.DStream.initialize",
+      "org.apache.spark.streaming.dstream.DStream.validate",
+      "org.apache.spark.streaming.dstream.DStream.setContext",
+      "org.apache.spark.streaming.dstream.DStream.setGraph",
+      "org.apache.spark.streaming.dstream.DStream.remember",
+      "org.apache.spark.streaming.dstream.DStream.getOrCompute",
+      "org.apache.spark.streaming.dstream.DStream.generateJob",
+      "org.apache.spark.streaming.dstream.DStream.clearOldMetadata",
+      "org.apache.spark.streaming.dstream.DStream.addMetadata",
+      "org.apache.spark.streaming.dstream.DStream.updateCheckpointData",
+      "org.apache.spark.streaming.dstream.DStream.restoreCheckpointData",
+      "org.apache.spark.streaming.dstream.DStream.isTimeValid",
       "org.apache.spark.streaming.StreamingContext.nextNetworkInputStreamId",
-      "org.apache.spark.streaming.StreamingContext.networkInputTracker",
       "org.apache.spark.streaming.StreamingContext.checkpointDir",
       "org.apache.spark.streaming.StreamingContext.checkpointDuration",
       "org.apache.spark.streaming.StreamingContext.receiverJobThread",


[5/9] git commit: Merge remote-tracking branch 'apache/master' into dstream-move

Posted by pw...@apache.org.
Merge remote-tracking branch 'apache/master' into dstream-move


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/74d01262
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/74d01262
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/74d01262

Branch: refs/heads/master
Commit: 74d0126257838f29e3fad519b9f1a5acde88bef6
Parents: d1820fe 074f502
Author: Tathagata Das <ta...@gmail.com>
Authored: Sun Jan 12 18:02:05 2014 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Sun Jan 12 18:02:05 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/executor/Executor.scala   |  2 +-
 .../org/apache/spark/scheduler/DAGScheduler.scala    |  4 +++-
 .../org/apache/spark/scheduler/SparkListener.scala   |  3 +++
 .../apache/spark/scheduler/SparkListenerBus.scala    | 15 +++++++++++----
 4 files changed, 18 insertions(+), 6 deletions(-)
----------------------------------------------------------------------



[6/9] git commit: Fixed persistence logic of WindowedDStream, and fixed default persistence level of input streams.

Posted by pw...@apache.org.
Fixed persistence logic of WindowedDStream, and fixed default persistence level of input streams.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/034f89aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/034f89aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/034f89aa

Branch: refs/heads/master
Commit: 034f89aaab1db95e8908432f2445d6841526efcf
Parents: 74d0126
Author: Tathagata Das <ta...@gmail.com>
Authored: Sun Jan 12 19:02:27 2014 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Sun Jan 12 19:02:27 2014 -0800

----------------------------------------------------------------------
 .../apache/spark/streaming/flume/FlumeUtils.scala  |  1 +
 .../apache/spark/streaming/kafka/KafkaUtils.scala  |  3 ++-
 .../apache/spark/streaming/mqtt/MQTTUtils.scala    |  1 +
 .../spark/streaming/twitter/TwitterUtils.scala     |  4 ++++
 .../org/apache/spark/streaming/DStreamGraph.scala  |  2 +-
 .../apache/spark/streaming/StreamingContext.scala  |  5 +++--
 .../streaming/api/java/JavaStreamingContext.scala  |  4 ++--
 .../spark/streaming/dstream/WindowedDStream.scala  | 17 +++++++++++++----
 .../spark/streaming/WindowOperationsSuite.scala    | 14 ++++++++++++++
 9 files changed, 41 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/034f89aa/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index a01c17a..a6af53e 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -43,6 +43,7 @@ object FlumeUtils {
 
   /**
    * Creates a input stream from a Flume source.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param hostname Hostname of the slave machine to which the flume data will be sent
    * @param port     Port of the slave machine to which the flume data will be sent
    */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/034f89aa/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index df4ecac..76f9c46 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -78,6 +78,7 @@ object KafkaUtils {
 
   /**
    * Create an input stream that pulls messages form a Kafka Broker.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param jssc      JavaStreamingContext object
    * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
    * @param groupId   The group id for this consumer
@@ -128,7 +129,7 @@ object KafkaUtils {
    *                    see http://kafka.apache.org/08/configuration.html
    * @param topics  Map of (topic_name -> numPartitions) to consume. Each partition is consumed
    *                in its own thread
-   * @param storageLevel RDD storage level. Defaults to MEMORY_AND_DISK_2.
+   * @param storageLevel RDD storage level.
    */
   def createStream[K, V, U <: Decoder[_], T <: Decoder[_]](
       jssc: JavaStreamingContext,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/034f89aa/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index eacb26f..caa86b2 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -44,6 +44,7 @@ object MQTTUtils {
 
   /**
    * Create an input stream that receives messages pushed by a MQTT publisher.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param jssc      JavaStreamingContext object
    * @param brokerUrl Url of remote MQTT publisher
    * @param topic     Topic name to subscribe to

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/034f89aa/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
----------------------------------------------------------------------
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
index 8ea52c4..a23d685 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
@@ -51,6 +51,7 @@ object TwitterUtils {
    * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
    * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
    * twitter4j.oauth.accessTokenSecret.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param jssc   JavaStreamingContext object
    */
   def createStream(jssc: JavaStreamingContext): JavaDStream[Status] = {
@@ -62,6 +63,7 @@ object TwitterUtils {
    * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
    * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
    * twitter4j.oauth.accessTokenSecret.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param jssc    JavaStreamingContext object
    * @param filters Set of filter strings to get only those tweets that match them
    */
@@ -88,6 +90,7 @@ object TwitterUtils {
 
   /**
    * Create a input stream that returns tweets received from Twitter.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param jssc        JavaStreamingContext object
    * @param twitterAuth Twitter4J Authorization
    */
@@ -97,6 +100,7 @@ object TwitterUtils {
 
   /**
    * Create a input stream that returns tweets received from Twitter.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param jssc        JavaStreamingContext object
    * @param twitterAuth Twitter4J Authorization
    * @param filters     Set of filter strings to get only those tweets that match them

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/034f89aa/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 31038a0..8faa79f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -78,7 +78,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
   def remember(duration: Duration) {
     this.synchronized {
       if (rememberDuration != null) {
-        throw new Exception("Batch duration already set as " + batchDuration +
+        throw new Exception("Remember duration already set as " + batchDuration +
           ". cannot set it again.")
       }
       rememberDuration = duration

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/034f89aa/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ee83ae9..7b27933 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -168,7 +168,7 @@ class StreamingContext private[streaming] (
   }
 
   /**
-   * Set the context to periodically checkpoint the DStream operations for master
+   * Set the context to periodically checkpoint the DStream operations for driver
    * fault-tolerance.
    * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored.
    *                  Note that this must be a fault-tolerant file system like HDFS for
@@ -220,7 +220,7 @@ class StreamingContext private[streaming] (
   def actorStream[T: ClassTag](
       props: Props,
       name: String,
-      storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
+      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
       supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
     ): DStream[T] = {
     networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
@@ -272,6 +272,7 @@ class StreamingContext private[streaming] (
    * @param hostname      Hostname to connect to for receiving data
    * @param port          Port to connect to for receiving data
    * @param storageLevel  Storage level to use for storing the received objects
+   *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
    * @tparam T            Type of the objects in the received blocks
    */
   def rawSocketStream[T: ClassTag](

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/034f89aa/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index a9d605d..a2f0b88 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -151,7 +151,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
    * @param hostname      Hostname to connect to for receiving data
    * @param port          Port to connect to for receiving data
    * @param storageLevel  Storage level to use for storing the received objects
-   *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
    */
   def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel)
   : JavaDStream[String] = {
@@ -161,7 +160,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   /**
    * Create a input stream from network source hostname:port. Data is received using
    * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
-   * lines.
+   * lines. Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param hostname      Hostname to connect to for receiving data
    * @param port          Port to connect to for receiving data
    */
@@ -302,6 +301,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
 
   /**
    * Create an input stream with any arbitrary user implemented actor receiver.
+   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param props Props object defining creation of the actor
    * @param name Name of the actor
    *

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/034f89aa/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index 89c43ff..6301772 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -32,13 +32,14 @@ class WindowedDStream[T: ClassTag](
   extends DStream[T](parent.ssc) {
 
   if (!_windowDuration.isMultipleOf(parent.slideDuration))
-    throw new Exception("The window duration of WindowedDStream (" + _slideDuration + ") " +
-    "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
+    throw new Exception("The window duration of windowed DStream (" + _slideDuration + ") " +
+    "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
 
   if (!_slideDuration.isMultipleOf(parent.slideDuration))
-    throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " +
-    "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
+    throw new Exception("The slide duration of windowed DStream (" + _slideDuration + ") " +
+    "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
 
+  // Persist parent level by default, as those RDDs are going to be obviously reused.
   parent.persist(StorageLevel.MEMORY_ONLY_SER)
 
   def windowDuration: Duration =  _windowDuration
@@ -49,6 +50,14 @@ class WindowedDStream[T: ClassTag](
 
   override def parentRememberDuration: Duration = rememberDuration + windowDuration
 
+  override def persist(level: StorageLevel): DStream[T] = {
+    // Do not let this windowed DStream be persisted as windowed (union-ed) RDDs share underlying
+    // RDDs and persisting the windowed RDDs would store numerous copies of the underlying data.
+    // Instead control the persistence of the parent DStream.
+    parent.persist(level)
+    this
+  }
+
   override def compute(validTime: Time): Option[RDD[T]] = {
     val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
     val rddsInWindow = parent.slice(currentWindow)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/034f89aa/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
index 8f3c2dd..471c99f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.streaming
 
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.storage.StorageLevel
 
 class WindowOperationsSuite extends TestSuiteBase {
 
@@ -144,6 +145,19 @@ class WindowOperationsSuite extends TestSuiteBase {
     Seconds(3)
   )
 
+  test("window - persistence level") {
+    val input = Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5))
+    val ssc = new StreamingContext(conf, batchDuration)
+    val inputStream = new TestInputStream[Int](ssc, input, 1)
+    val windowStream1 = inputStream.window(batchDuration * 2)
+    assert(windowStream1.storageLevel === StorageLevel.NONE)
+    assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY_SER)
+    windowStream1.persist(StorageLevel.MEMORY_ONLY)
+    assert(windowStream1.storageLevel === StorageLevel.NONE)
+    assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY)
+    ssc.stop()
+  }
+
   // Testing naive reduceByKeyAndWindow (without invertible function)
 
   testReduceByKeyAndWindow(


[3/9] git commit: Moved DStream, DStreamCheckpointData and PairDStream from org.apache.spark.streaming to org.apache.spark.streaming.dstream.

Posted by pw...@apache.org.
Moved DStream, DStreamCheckpointData and PairDStream from org.apache.spark.streaming to org.apache.spark.streaming.dstream.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/448aef67
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/448aef67
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/448aef67

Branch: refs/heads/master
Commit: 448aef6790caa3728bcc43f518afb69807597c39
Parents: c5921e5
Author: Tathagata Das <ta...@gmail.com>
Authored: Sun Jan 12 11:31:54 2014 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Sun Jan 12 11:31:54 2014 -0800

----------------------------------------------------------------------
 docs/streaming-programming-guide.md             |   2 +-
 .../spark/streaming/flume/FlumeUtils.scala      |   3 +-
 .../spark/streaming/kafka/KafkaUtils.scala      |   3 +-
 .../apache/spark/streaming/mqtt/MQTTUtils.scala |   3 +-
 .../spark/streaming/twitter/TwitterUtils.scala  |   3 +-
 .../spark/streaming/zeromq/ZeroMQUtils.scala    |   3 +-
 .../org/apache/spark/streaming/DStream.scala    | 741 ------------------
 .../spark/streaming/DStreamCheckpointData.scala | 128 ----
 .../apache/spark/streaming/DStreamGraph.scala   |   4 +-
 .../spark/streaming/PairDStreamFunctions.scala  | 621 ----------------
 .../spark/streaming/api/java/JavaDStream.scala  |   3 +-
 .../streaming/api/java/JavaDStreamLike.scala    |   1 +
 .../streaming/api/java/JavaPairDStream.scala    |   1 +
 .../api/java/JavaStreamingContext.scala         |   1 +
 .../spark/streaming/dstream/DStream.scala       | 742 +++++++++++++++++++
 .../dstream/DStreamCheckpointData.scala         | 126 ++++
 .../streaming/dstream/FileInputDStream.scala    |   2 +-
 .../streaming/dstream/FilteredDStream.scala     |   2 +-
 .../dstream/FlatMapValuedDStream.scala          |   2 +-
 .../streaming/dstream/FlatMappedDStream.scala   |   2 +-
 .../streaming/dstream/ForEachDStream.scala      |   2 +-
 .../streaming/dstream/GlommedDStream.scala      |   2 +-
 .../spark/streaming/dstream/InputDStream.scala  |   2 +-
 .../dstream/MapPartitionedDStream.scala         |   2 +-
 .../streaming/dstream/MapValuedDStream.scala    |   2 +-
 .../spark/streaming/dstream/MappedDStream.scala |   2 +-
 .../dstream/PairDStreamFunctions.scala          | 622 ++++++++++++++++
 .../dstream/ReducedWindowedDStream.scala        |   2 +-
 .../streaming/dstream/ShuffledDStream.scala     |   2 +-
 .../spark/streaming/dstream/StateDStream.scala  |   2 +-
 .../streaming/dstream/TransformedDStream.scala  |   2 +-
 .../spark/streaming/dstream/UnionDStream.scala  |   3 +-
 .../streaming/util/MasterFailureTest.scala      |   2 +-
 .../spark/streaming/BasicOperationsSuite.scala  |   1 +
 .../spark/streaming/CheckpointSuite.scala       |   2 +-
 .../spark/streaming/StreamingContextSuite.scala |   1 +
 .../streaming/StreamingListenerSuite.scala      |   1 +
 .../apache/spark/streaming/TestSuiteBase.scala  |   2 +-
 .../spark/streaming/WindowOperationsSuite.scala |   1 +
 .../tools/JavaAPICompletenessChecker.scala      |  50 +-
 40 files changed, 1555 insertions(+), 1543 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 1c9ece6..cec1b75 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -167,7 +167,7 @@ Spark Streaming features windowed computations, which allow you to apply transfo
 </tr>
 </table>
 
-A complete list of DStream operations is available in the API documentation of [DStream](api/streaming/index.html#org.apache.spark.streaming.DStream) and [PairDStreamFunctions](api/streaming/index.html#org.apache.spark.streaming.PairDStreamFunctions).
+A complete list of DStream operations is available in the API documentation of [DStream](api/streaming/index.html#org.apache.spark.streaming.dstream.DStream) and [PairDStreamFunctions](api/streaming/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions).
 
 ## Output Operations
 When an output operator is called, it triggers the computation of a stream. Currently the following output operators are defined:

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 834b775..a01c17a 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -18,8 +18,9 @@
 package org.apache.spark.streaming.flume
 
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.{StreamingContext}
 import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
+import org.apache.spark.streaming.dstream.DStream
 
 object FlumeUtils {
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index c2d851f..df4ecac 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -26,8 +26,9 @@ import java.util.{Map => JMap}
 import kafka.serializer.{Decoder, StringDecoder}
 
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.{StreamingContext}
 import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
+import org.apache.spark.streaming.dstream.DStream
 
 
 object KafkaUtils {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index 0e6c25d..eacb26f 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -18,9 +18,10 @@
 package org.apache.spark.streaming.mqtt
 
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.{StreamingContext}
 import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
 import scala.reflect.ClassTag
+import org.apache.spark.streaming.dstream.DStream
 
 object MQTTUtils {
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
----------------------------------------------------------------------
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
index 5e506ff..8ea52c4 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
@@ -20,8 +20,9 @@ package org.apache.spark.streaming.twitter
 import twitter4j.Status
 import twitter4j.auth.Authorization
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.{StreamingContext}
 import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.DStream
 
 object TwitterUtils {
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index 546d9df..669eb0d 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -25,8 +25,9 @@ import akka.zeromq.Subscribe
 import org.apache.spark.api.java.function.{Function => JFunction}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy
-import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.{StreamingContext}
 import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
+import org.apache.spark.streaming.dstream.DStream
 
 object ZeroMQUtils {
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
deleted file mode 100644
index d59146e..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ /dev/null
@@ -1,741 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-import StreamingContext._
-import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.scheduler.Job
-import org.apache.spark.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.MetadataCleaner
-
-import scala.collection.mutable.HashMap
-import scala.reflect.ClassTag
-
-import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
-
-
-/**
- * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
- * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]]
- * for more details on RDDs). DStreams can either be created from live data (such as, data from
- * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
- * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
- * DStream periodically generates a RDD, either from live data or by transforming the RDD generated
- * by a parent DStream.
- *
- * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
- * `window`. In addition, [[org.apache.spark.streaming.PairDStreamFunctions]] contains operations available
- * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations
- * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through
- * implicit conversions when `spark.streaming.StreamingContext._` is imported.
- *
- * DStreams internally is characterized by a few basic properties:
- *  - A list of other DStreams that the DStream depends on
- *  - A time interval at which the DStream generates an RDD
- *  - A function that is used to generate an RDD after each time interval
- */
-
-abstract class DStream[T: ClassTag] (
-    @transient private[streaming] var ssc: StreamingContext
-  ) extends Serializable with Logging {
-
-  // =======================================================================
-  // Methods that should be implemented by subclasses of DStream
-  // =======================================================================
-
-  /** Time interval after which the DStream generates a RDD */
-  def slideDuration: Duration
-
-  /** List of parent DStreams on which this DStream depends on */
-  def dependencies: List[DStream[_]]
-
-  /** Method that generates a RDD for the given time */
-  def compute (validTime: Time): Option[RDD[T]]
-
-  // =======================================================================
-  // Methods and fields available on all DStreams
-  // =======================================================================
-
-  // RDDs generated, marked as private[streaming] so that testsuites can access it
-  @transient
-  private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
-
-  // Time zero for the DStream
-  private[streaming] var zeroTime: Time = null
-
-  // Duration for which the DStream will remember each RDD created
-  private[streaming] var rememberDuration: Duration = null
-
-  // Storage level of the RDDs in the stream
-  private[streaming] var storageLevel: StorageLevel = StorageLevel.NONE
-
-  // Checkpoint details
-  private[streaming] val mustCheckpoint = false
-  private[streaming] var checkpointDuration: Duration = null
-  private[streaming] val checkpointData = new DStreamCheckpointData(this)
-
-  // Reference to whole DStream graph
-  private[streaming] var graph: DStreamGraph = null
-
-  private[streaming] def isInitialized = (zeroTime != null)
-
-  // Duration for which the DStream requires its parent DStream to remember each RDD created
-  private[streaming] def parentRememberDuration = rememberDuration
-
-  /** Return the StreamingContext associated with this DStream */
-  def context = ssc
-
-  /** Persist the RDDs of this DStream with the given storage level */
-  def persist(level: StorageLevel): DStream[T] = {
-    if (this.isInitialized) {
-      throw new UnsupportedOperationException(
-        "Cannot change storage level of an DStream after streaming context has started")
-    }
-    this.storageLevel = level
-    this
-  }
-
-  /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
-  def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)
-
-  /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
-  def cache(): DStream[T] = persist()
-
-  /**
-   * Enable periodic checkpointing of RDDs of this DStream
-   * @param interval Time interval after which generated RDD will be checkpointed
-   */
-  def checkpoint(interval: Duration): DStream[T] = {
-    if (isInitialized) {
-      throw new UnsupportedOperationException(
-        "Cannot change checkpoint interval of an DStream after streaming context has started")
-    }
-    persist()
-    checkpointDuration = interval
-    this
-  }
-
-  /**
-   * Initialize the DStream by setting the "zero" time, based on which
-   * the validity of future times is calculated. This method also recursively initializes
-   * its parent DStreams.
-   */
-  private[streaming] def initialize(time: Time) {
-    if (zeroTime != null && zeroTime != time) {
-      throw new Exception("ZeroTime is already initialized to " + zeroTime
-        + ", cannot initialize it again to " + time)
-    }
-    zeroTime = time
-
-    // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
-    if (mustCheckpoint && checkpointDuration == null) {
-      checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
-      logInfo("Checkpoint interval automatically set to " + checkpointDuration)
-    }
-
-    // Set the minimum value of the rememberDuration if not already set
-    var minRememberDuration = slideDuration
-    if (checkpointDuration != null && minRememberDuration <= checkpointDuration) {
-      minRememberDuration = checkpointDuration * 2  // times 2 just to be sure that the latest checkpoint is not forgetten
-    }
-    if (rememberDuration == null || rememberDuration < minRememberDuration) {
-      rememberDuration = minRememberDuration
-    }
-
-    // Initialize the dependencies
-    dependencies.foreach(_.initialize(zeroTime))
-  }
-
-  private[streaming] def validate() {
-    assert(rememberDuration != null, "Remember duration is set to null")
-
-    assert(
-      !mustCheckpoint || checkpointDuration != null,
-      "The checkpoint interval for " + this.getClass.getSimpleName + " has not been set." +
-        " Please use DStream.checkpoint() to set the interval."
-    )
-
-    assert(
-     checkpointDuration == null || context.sparkContext.checkpointDir.isDefined,
-      "The checkpoint directory has not been set. Please use StreamingContext.checkpoint()" +
-      " or SparkContext.checkpoint() to set the checkpoint directory."
-    )
-
-    assert(
-      checkpointDuration == null || checkpointDuration >= slideDuration,
-      "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
-        checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " +
-        "Please set it to at least " + slideDuration + "."
-    )
-
-    assert(
-      checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration),
-      "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
-        checkpointDuration + " which not a multiple of its slide time (" + slideDuration + "). " +
-        "Please set it to a multiple " + slideDuration + "."
-    )
-
-    assert(
-      checkpointDuration == null || storageLevel != StorageLevel.NONE,
-      "" + this.getClass.getSimpleName + " has been marked for checkpointing but the storage " +
-        "level has not been set to enable persisting. Please use DStream.persist() to set the " +
-        "storage level to use memory for better checkpointing performance."
-    )
-
-    assert(
-      checkpointDuration == null || rememberDuration > checkpointDuration,
-      "The remember duration for " + this.getClass.getSimpleName + " has been set to " +
-        rememberDuration + " which is not more than the checkpoint interval (" +
-        checkpointDuration + "). Please set it to higher than " + checkpointDuration + "."
-    )
-
-    val metadataCleanerDelay = MetadataCleaner.getDelaySeconds(ssc.conf)
-    logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
-    assert(
-      metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000,
-      "It seems you are doing some DStream window operation or setting a checkpoint interval " +
-        "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
-        "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" +
-        "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " +
-        "set the Java property 'spark.cleaner.delay' to more than " +
-        math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds."
-    )
-
-    dependencies.foreach(_.validate())
-
-    logInfo("Slide time = " + slideDuration)
-    logInfo("Storage level = " + storageLevel)
-    logInfo("Checkpoint interval = " + checkpointDuration)
-    logInfo("Remember duration = " + rememberDuration)
-    logInfo("Initialized and validated " + this)
-  }
-
-  private[streaming] def setContext(s: StreamingContext) {
-    if (ssc != null && ssc != s) {
-      throw new Exception("Context is already set in " + this + ", cannot set it again")
-    }
-    ssc = s
-    logInfo("Set context for " + this)
-    dependencies.foreach(_.setContext(ssc))
-  }
-
-  private[streaming] def setGraph(g: DStreamGraph) {
-    if (graph != null && graph != g) {
-      throw new Exception("Graph is already set in " + this + ", cannot set it again")
-    }
-    graph = g
-    dependencies.foreach(_.setGraph(graph))
-  }
-
-  private[streaming] def remember(duration: Duration) {
-    if (duration != null && duration > rememberDuration) {
-      rememberDuration = duration
-      logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this)
-    }
-    dependencies.foreach(_.remember(parentRememberDuration))
-  }
-
-  /** Checks whether the 'time' is valid wrt slideDuration for generating RDD */
-  private[streaming] def isTimeValid(time: Time): Boolean = {
-    if (!isInitialized) {
-      throw new Exception (this + " has not been initialized")
-    } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
-      logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
-      false
-    } else {
-      logDebug("Time " + time + " is valid")
-      true
-    }
-  }
-
-  /**
-   * Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal
-   * method that should not be called directly.
-   */
-  private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
-    // If this DStream was not initialized (i.e., zeroTime not set), then do it
-    // If RDD was already generated, then retrieve it from HashMap
-    generatedRDDs.get(time) match {
-
-      // If an RDD was already generated and is being reused, then
-      // probably all RDDs in this DStream will be reused and hence should be cached
-      case Some(oldRDD) => Some(oldRDD)
-
-      // if RDD was not generated, and if the time is valid
-      // (based on sliding time of this DStream), then generate the RDD
-      case None => {
-        if (isTimeValid(time)) {
-          compute(time) match {
-            case Some(newRDD) =>
-              if (storageLevel != StorageLevel.NONE) {
-                newRDD.persist(storageLevel)
-                logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time)
-              }
-              if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
-                newRDD.checkpoint()
-                logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time)
-              }
-              generatedRDDs.put(time, newRDD)
-              Some(newRDD)
-            case None =>
-              None
-          }
-        } else {
-          None
-        }
-      }
-    }
-  }
-
-  /**
-   * Generate a SparkStreaming job for the given time. This is an internal method that
-   * should not be called directly. This default implementation creates a job
-   * that materializes the corresponding RDD. Subclasses of DStream may override this
-   * to generate their own jobs.
-   */
-  private[streaming] def generateJob(time: Time): Option[Job] = {
-    getOrCompute(time) match {
-      case Some(rdd) => {
-        val jobFunc = () => {
-          val emptyFunc = { (iterator: Iterator[T]) => {} }
-          context.sparkContext.runJob(rdd, emptyFunc)
-        }
-        Some(new Job(time, jobFunc))
-      }
-      case None => None
-    }
-  }
-
-  /**
-   * Clear metadata that are older than `rememberDuration` of this DStream.
-   * This is an internal method that should not be called directly. This default
-   * implementation clears the old generated RDDs. Subclasses of DStream may override
-   * this to clear their own metadata along with the generated RDDs.
-   */
-  private[streaming] def clearMetadata(time: Time) {
-    val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
-    generatedRDDs --= oldRDDs.keys
-    logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
-      (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
-    dependencies.foreach(_.clearMetadata(time))
-  }
-
-  /* Adds metadata to the Stream while it is running.
-   * This method should be overwritten by sublcasses of InputDStream.
-   */
-  private[streaming] def addMetadata(metadata: Any) {
-    if (metadata != null) {
-      logInfo("Dropping Metadata: " + metadata.toString)
-    }
-  }
-
-  /**
-   * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of
-   * this stream. This is an internal method that should not be called directly. This is
-   * a default implementation that saves only the file names of the checkpointed RDDs to
-   * checkpointData. Subclasses of DStream (especially those of InputDStream) may override
-   * this method to save custom checkpoint data.
-   */
-  private[streaming] def updateCheckpointData(currentTime: Time) {
-    logInfo("Updating checkpoint data for time " + currentTime)
-    checkpointData.update(currentTime)
-    dependencies.foreach(_.updateCheckpointData(currentTime))
-    logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData)
-  }
-
-  private[streaming] def clearCheckpointData(time: Time) {
-    logInfo("Clearing checkpoint data")
-    checkpointData.cleanup(time)
-    dependencies.foreach(_.clearCheckpointData(time))
-    logInfo("Cleared checkpoint data")
-  }
-
-  /**
-   * Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method
-   * that should not be called directly. This is a default implementation that recreates RDDs
-   * from the checkpoint file names stored in checkpointData. Subclasses of DStream that
-   * override the updateCheckpointData() method would also need to override this method.
-   */
-  private[streaming] def restoreCheckpointData() {
-    // Create RDDs from the checkpoint data
-    logInfo("Restoring checkpoint data")
-    checkpointData.restore()
-    dependencies.foreach(_.restoreCheckpointData())
-    logInfo("Restored checkpoint data")
-  }
-
-  @throws(classOf[IOException])
-  private def writeObject(oos: ObjectOutputStream) {
-    logDebug(this.getClass().getSimpleName + ".writeObject used")
-    if (graph != null) {
-      graph.synchronized {
-        if (graph.checkpointInProgress) {
-          oos.defaultWriteObject()
-        } else {
-          val msg = "Object of " + this.getClass.getName + " is being serialized " +
-            " possibly as a part of closure of an RDD operation. This is because " +
-            " the DStream object is being referred to from within the closure. " +
-            " Please rewrite the RDD operation inside this DStream to avoid this. " +
-            " This has been enforced to avoid bloating of Spark tasks " +
-            " with unnecessary objects."
-          throw new java.io.NotSerializableException(msg)
-        }
-      }
-    } else {
-      throw new java.io.NotSerializableException("Graph is unexpectedly null when DStream is being serialized.")
-    }
-  }
-
-  @throws(classOf[IOException])
-  private def readObject(ois: ObjectInputStream) {
-    logDebug(this.getClass().getSimpleName + ".readObject used")
-    ois.defaultReadObject()
-    generatedRDDs = new HashMap[Time, RDD[T]] ()
-  }
-
-  // =======================================================================
-  // DStream operations
-  // =======================================================================
-
-  /** Return a new DStream by applying a function to all elements of this DStream. */
-  def map[U: ClassTag](mapFunc: T => U): DStream[U] = {
-    new MappedDStream(this, context.sparkContext.clean(mapFunc))
-  }
-
-  /**
-   * Return a new DStream by applying a function to all elements of this DStream,
-   * and then flattening the results
-   */
-  def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = {
-    new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
-  }
-
-  /** Return a new DStream containing only the elements that satisfy a predicate. */
-  def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc)
-
-  /**
-   * Return a new DStream in which each RDD is generated by applying glom() to each RDD of
-   * this DStream. Applying glom() to an RDD coalesces all elements within each partition into
-   * an array.
-   */
-  def glom(): DStream[Array[T]] = new GlommedDStream(this)
-
-
-  /**
-   * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the
-   * returned DStream has exactly numPartitions partitions.
-   */
-  def repartition(numPartitions: Int): DStream[T] = this.transform(_.repartition(numPartitions))
-
-  /**
-   * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
-   * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
-   * of the RDD.
-   */
-  def mapPartitions[U: ClassTag](
-      mapPartFunc: Iterator[T] => Iterator[U],
-      preservePartitioning: Boolean = false
-    ): DStream[U] = {
-    new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning)
-  }
-
-  /**
-   * Return a new DStream in which each RDD has a single element generated by reducing each RDD
-   * of this DStream.
-   */
-  def reduce(reduceFunc: (T, T) => T): DStream[T] =
-    this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
-
-  /**
-   * Return a new DStream in which each RDD has a single element generated by counting each RDD
-   * of this DStream.
-   */
-  def count(): DStream[Long] = {
-    this.map(_ => (null, 1L))
-        .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1)))
-        .reduceByKey(_ + _)
-        .map(_._2)
-  }
-
-  /**
-   * Return a new DStream in which each RDD contains the counts of each distinct value in
-   * each RDD of this DStream. Hash partitioning is used to generate
-   * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
-   * `numPartitions` not specified).
-   */
-  def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)] =
-    this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
-
-  /**
-   * Apply a function to each RDD in this DStream. This is an output operator, so
-   * 'this' DStream will be registered as an output stream and therefore materialized.
-   */
-  def foreach(foreachFunc: RDD[T] => Unit) {
-    this.foreach((r: RDD[T], t: Time) => foreachFunc(r))
-  }
-
-  /**
-   * Apply a function to each RDD in this DStream. This is an output operator, so
-   * 'this' DStream will be registered as an output stream and therefore materialized.
-   */
-  def foreach(foreachFunc: (RDD[T], Time) => Unit) {
-    ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc)))
-  }
-
-  /**
-   * Return a new DStream in which each RDD is generated by applying a function
-   * on each RDD of 'this' DStream.
-   */
-  def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
-    transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r)))
-  }
-
-  /**
-   * Return a new DStream in which each RDD is generated by applying a function
-   * on each RDD of 'this' DStream.
-   */
-  def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
-    //new TransformedDStream(this, context.sparkContext.clean(transformFunc))
-    val cleanedF = context.sparkContext.clean(transformFunc)
-    val realTransformFunc =  (rdds: Seq[RDD[_]], time: Time) => {
-      assert(rdds.length == 1)
-      cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
-    }
-    new TransformedDStream[U](Seq(this), realTransformFunc)
-  }
-
-  /**
-   * Return a new DStream in which each RDD is generated by applying a function
-   * on each RDD of 'this' DStream and 'other' DStream.
-   */
-  def transformWith[U: ClassTag, V: ClassTag](
-      other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
-    ): DStream[V] = {
-    val cleanedF = ssc.sparkContext.clean(transformFunc)
-    transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2))
-  }
-
-  /**
-   * Return a new DStream in which each RDD is generated by applying a function
-   * on each RDD of 'this' DStream and 'other' DStream.
-   */
-  def transformWith[U: ClassTag, V: ClassTag](
-      other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
-    ): DStream[V] = {
-    val cleanedF = ssc.sparkContext.clean(transformFunc)
-    val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
-      assert(rdds.length == 2)
-      val rdd1 = rdds(0).asInstanceOf[RDD[T]]
-      val rdd2 = rdds(1).asInstanceOf[RDD[U]]
-      cleanedF(rdd1, rdd2, time)
-    }
-    new TransformedDStream[V](Seq(this, other), realTransformFunc)
-  }
-
-  /**
-   * Print the first ten elements of each RDD generated in this DStream. This is an output
-   * operator, so this DStream will be registered as an output stream and there materialized.
-   */
-  def print() {
-    def foreachFunc = (rdd: RDD[T], time: Time) => {
-      val first11 = rdd.take(11)
-      println ("-------------------------------------------")
-      println ("Time: " + time)
-      println ("-------------------------------------------")
-      first11.take(10).foreach(println)
-      if (first11.size > 10) println("...")
-      println()
-    }
-    val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
-    ssc.registerOutputStream(newStream)
-  }
-
-  /**
-   * Return a new DStream in which each RDD contains all the elements in seen in a
-   * sliding window of time over this DStream. The new DStream generates RDDs with
-   * the same interval as this DStream.
-   * @param windowDuration width of the window; must be a multiple of this DStream's interval.
-   */
-  def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
-
-  /**
-   * Return a new DStream in which each RDD contains all the elements in seen in a
-   * sliding window of time over this DStream.
-   * @param windowDuration width of the window; must be a multiple of this DStream's
-   *                       batching interval
-   * @param slideDuration  sliding interval of the window (i.e., the interval after which
-   *                       the new DStream will generate RDDs); must be a multiple of this
-   *                       DStream's batching interval
-   */
-  def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = {
-    new WindowedDStream(this, windowDuration, slideDuration)
-  }
-
-  /**
-   * Return a new DStream in which each RDD has a single element generated by reducing all
-   * elements in a sliding window over this DStream.
-   * @param reduceFunc associative reduce function
-   * @param windowDuration width of the window; must be a multiple of this DStream's
-   *                       batching interval
-   * @param slideDuration  sliding interval of the window (i.e., the interval after which
-   *                       the new DStream will generate RDDs); must be a multiple of this
-   *                       DStream's batching interval
-   */
-  def reduceByWindow(
-      reduceFunc: (T, T) => T,
-      windowDuration: Duration,
-      slideDuration: Duration
-    ): DStream[T] = {
-    this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
-  }
-
-  /**
-   * Return a new DStream in which each RDD has a single element generated by reducing all
-   * elements in a sliding window over this DStream. However, the reduction is done incrementally
-   * using the old window's reduced value :
-   *  1. reduce the new values that entered the window (e.g., adding new counts)
-   *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
-   *  This is more efficient than reduceByWindow without "inverse reduce" function.
-   *  However, it is applicable to only "invertible reduce functions".
-   * @param reduceFunc associative reduce function
-   * @param invReduceFunc inverse reduce function
-   * @param windowDuration width of the window; must be a multiple of this DStream's
-   *                       batching interval
-   * @param slideDuration  sliding interval of the window (i.e., the interval after which
-   *                       the new DStream will generate RDDs); must be a multiple of this
-   *                       DStream's batching interval
-   */
-  def reduceByWindow(
-      reduceFunc: (T, T) => T,
-      invReduceFunc: (T, T) => T,
-      windowDuration: Duration,
-      slideDuration: Duration
-    ): DStream[T] = {
-      this.map(x => (1, x))
-          .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
-          .map(_._2)
-  }
-
-  /**
-   * Return a new DStream in which each RDD has a single element generated by counting the number
-   * of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with
-   * Spark's default number of partitions.
-   * @param windowDuration width of the window; must be a multiple of this DStream's
-   *                       batching interval
-   * @param slideDuration  sliding interval of the window (i.e., the interval after which
-   *                       the new DStream will generate RDDs); must be a multiple of this
-   *                       DStream's batching interval
-   */
-  def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = {
-    this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
-  }
-
-  /**
-   * Return a new DStream in which each RDD contains the count of distinct elements in
-   * RDDs in a sliding window over this DStream. Hash partitioning is used to generate
-   * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
-   * `numPartitions` not specified).
-   * @param windowDuration width of the window; must be a multiple of this DStream's
-   *                       batching interval
-   * @param slideDuration  sliding interval of the window (i.e., the interval after which
-   *                       the new DStream will generate RDDs); must be a multiple of this
-   *                       DStream's batching interval
-   * @param numPartitions  number of partitions of each RDD in the new DStream.
-   */
-  def countByValueAndWindow(
-      windowDuration: Duration,
-      slideDuration: Duration,
-      numPartitions: Int = ssc.sc.defaultParallelism
-    ): DStream[(T, Long)] = {
-
-    this.map(x => (x, 1L)).reduceByKeyAndWindow(
-      (x: Long, y: Long) => x + y,
-      (x: Long, y: Long) => x - y,
-      windowDuration,
-      slideDuration,
-      numPartitions,
-      (x: (T, Long)) => x._2 != 0L
-    )
-  }
-
-  /**
-   * Return a new DStream by unifying data of another DStream with this DStream.
-   * @param that Another DStream having the same slideDuration as this DStream.
-   */
-  def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that))
-
-  /**
-   * Return all the RDDs defined by the Interval object (both end times included)
-   */
-  def slice(interval: Interval): Seq[RDD[T]] = {
-    slice(interval.beginTime, interval.endTime)
-  }
-
-  /**
-   * Return all the RDDs between 'fromTime' to 'toTime' (both included)
-   */
-  def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
-    if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
-      logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
-    }
-    if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
-      logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
-    }
-    val alignedToTime = toTime.floor(slideDuration)
-    val alignedFromTime = fromTime.floor(slideDuration)
-
-    logInfo("Slicing from " + fromTime + " to " + toTime +
-      " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
-
-    alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
-      if (time >= zeroTime) getOrCompute(time) else None
-    })
-  }
-
-  /**
-   * Save each RDD in this DStream as a Sequence file of serialized objects.
-   * The file name at each batch interval is generated based on `prefix` and
-   * `suffix`: "prefix-TIME_IN_MS.suffix".
-   */
-  def saveAsObjectFiles(prefix: String, suffix: String = "") {
-    val saveFunc = (rdd: RDD[T], time: Time) => {
-      val file = rddToFileName(prefix, suffix, time)
-      rdd.saveAsObjectFile(file)
-    }
-    this.foreach(saveFunc)
-  }
-
-  /**
-   * Save each RDD in this DStream as at text file, using string representation
-   * of elements. The file name at each batch interval is generated based on
-   * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
-   */
-  def saveAsTextFiles(prefix: String, suffix: String = "") {
-    val saveFunc = (rdd: RDD[T], time: Time) => {
-      val file = rddToFileName(prefix, suffix, time)
-      rdd.saveAsTextFile(file)
-    }
-    this.foreach(saveFunc)
-  }
-
-  def register() {
-    ssc.registerOutputStream(this)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
deleted file mode 100644
index 671f7bb..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-import scala.collection.mutable.{HashMap, HashSet}
-import scala.reflect.ClassTag
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.fs.FileSystem
-
-import org.apache.spark.Logging
-
-import java.io.{ObjectInputStream, IOException}
-
-private[streaming]
-class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
-  extends Serializable with Logging {
-  protected val data = new HashMap[Time, AnyRef]()
-
-  // Mapping of the batch time to the checkpointed RDD file of that time
-  @transient private var timeToCheckpointFile = new HashMap[Time, String]
-  // Mapping of the batch time to the time of the oldest checkpointed RDD
-  // in that batch's checkpoint data
-  @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time]
-
-  @transient private var fileSystem : FileSystem = null
-  protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]]
-
-  /**
-   * Updates the checkpoint data of the DStream. This gets called every time
-   * the graph checkpoint is initiated. Default implementation records the
-   * checkpoint files to which the generate RDDs of the DStream has been saved.
-   */
-  def update(time: Time) {
-
-    // Get the checkpointed RDDs from the generated RDDs
-    val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined)
-                                       .map(x => (x._1, x._2.getCheckpointFile.get))
-    logDebug("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n"))
-
-    // Add the checkpoint files to the data to be serialized 
-    if (!checkpointFiles.isEmpty) {
-      currentCheckpointFiles.clear()
-      currentCheckpointFiles ++= checkpointFiles
-      // Add the current checkpoint files to the map of all checkpoint files
-      // This will be used to delete old checkpoint files
-      timeToCheckpointFile ++= currentCheckpointFiles
-      // Remember the time of the oldest checkpoint RDD in current state
-      timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering)
-    }
-  }
-
-  /**
-   * Cleanup old checkpoint data. This gets called after a checkpoint of `time` has been
-   * written to the checkpoint directory.
-   */
-  def cleanup(time: Time) {
-    // Get the time of the oldest checkpointed RDD that was written as part of the
-    // checkpoint of `time`
-    timeToOldestCheckpointFileTime.remove(time) match {
-      case Some(lastCheckpointFileTime) =>
-        // Find all the checkpointed RDDs (i.e. files) that are older than `lastCheckpointFileTime`
-        // This is because checkpointed RDDs older than this are not going to be needed
-        // even after master fails, as the checkpoint data of `time` does not refer to those files
-        val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime)
-        logDebug("Files to delete:\n" + filesToDelete.mkString(","))
-        filesToDelete.foreach {
-          case (time, file) =>
-            try {
-              val path = new Path(file)
-              if (fileSystem == null) {
-                fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration)
-              }
-              fileSystem.delete(path, true)
-              timeToCheckpointFile -= time
-              logInfo("Deleted checkpoint file '" + file + "' for time " + time)
-            } catch {
-              case e: Exception =>
-                logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e)
-                fileSystem = null
-            }
-        }
-      case None =>
-        logInfo("Nothing to delete")
-    }
-  }
-
-  /**
-   * Restore the checkpoint data. This gets called once when the DStream graph
-   * (along with its DStreams) are being restored from a graph checkpoint file.
-   * Default implementation restores the RDDs from their checkpoint files.
-   */
-  def restore() {
-    // Create RDDs from the checkpoint data
-    currentCheckpointFiles.foreach {
-      case(time, file) => {
-        logInfo("Restoring checkpointed RDD for time " + time + " from file '" + file + "'")
-        dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file)))
-      }
-    }
-  }
-
-  override def toString() = {
-    "[\n" + currentCheckpointFiles.size + " checkpoint files \n" + currentCheckpointFiles.mkString("\n") + "\n]"
-  }
-
-  @throws(classOf[IOException])
-  private def readObject(ois: ObjectInputStream) {
-    ois.defaultReadObject()
-    timeToOldestCheckpointFileTime = new HashMap[Time, Time]
-    timeToCheckpointFile = new HashMap[Time, String]
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 668e532..31038a0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -17,11 +17,11 @@
 
 package org.apache.spark.streaming
 
-import org.apache.spark.streaming.dstream.{NetworkInputDStream, InputDStream}
+import scala.collection.mutable.ArrayBuffer
 import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
-import collection.mutable.ArrayBuffer
 import org.apache.spark.Logging
 import org.apache.spark.streaming.scheduler.Job
+import org.apache.spark.streaming.dstream.{DStream, NetworkInputDStream, InputDStream}
 
 final private[streaming] class DStreamGraph extends Serializable with Logging {
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
deleted file mode 100644
index 56dbcbd..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
+++ /dev/null
@@ -1,621 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.dstream._
-
-import org.apache.spark.{Partitioner, HashPartitioner}
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.{ClassTags, RDD, PairRDDFunctions}
-import org.apache.spark.storage.StorageLevel
-
-import scala.collection.mutable.ArrayBuffer
-import scala.reflect.{ClassTag, classTag}
-
-import org.apache.hadoop.mapred.{JobConf, OutputFormat}
-import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
-import org.apache.hadoop.mapred.OutputFormat
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.conf.Configuration
-
-class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
-extends Serializable {
-
-  private[streaming] def ssc = self.ssc
-
-  private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
-    new HashPartitioner(numPartitions)
-  }
-
-  /**
-   * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
-   * generate the RDDs with Spark's default number of partitions.
-   */
-  def groupByKey(): DStream[(K, Seq[V])] = {
-    groupByKey(defaultPartitioner())
-  }
-
-  /**
-   * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
-   * generate the RDDs with `numPartitions` partitions.
-   */
-  def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = {
-    groupByKey(defaultPartitioner(numPartitions))
-  }
-
-  /**
-   * Return a new DStream by applying `groupByKey` on each RDD. The supplied [[org.apache.spark.Partitioner]]
-   * is used to control the partitioning of each RDD.
-   */
-  def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
-    val createCombiner = (v: V) => ArrayBuffer[V](v)
-    val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v)
-    val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2)
-    combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner)
-      .asInstanceOf[DStream[(K, Seq[V])]]
-  }
-
-  /**
-   * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
-   * merged using the associative reduce function. Hash partitioning is used to generate the RDDs
-   * with Spark's default number of partitions.
-   */
-  def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = {
-    reduceByKey(reduceFunc, defaultPartitioner())
-  }
-
-  /**
-   * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
-   * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs
-   * with `numPartitions` partitions.
-   */
-  def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = {
-    reduceByKey(reduceFunc, defaultPartitioner(numPartitions))
-  }
-
-  /**
-   * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
-   * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the
-   * partitioning of each RDD.
-   */
-  def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = {
-    val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
-    combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner)
-  }
-
-  /**
-   * Combine elements of each key in DStream's RDDs using custom functions. This is similar to the
-   * combineByKey for RDDs. Please refer to combineByKey in
-   * [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
-   */
-  def combineByKey[C: ClassTag](
-    createCombiner: V => C,
-    mergeValue: (C, V) => C,
-    mergeCombiner: (C, C) => C,
-    partitioner: Partitioner,
-    mapSideCombine: Boolean = true): DStream[(K, C)] = {
-    new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)
-  }
-
-  /**
-   * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to
-   * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs
-   * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with
-   * Spark's default number of partitions.
-   * @param windowDuration width of the window; must be a multiple of this DStream's
-   *                       batching interval
-   */
-  def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Seq[V])] = {
-    groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner())
-  }
-
-  /**
-   * Return a new DStream by applying `groupByKey` over a sliding window. Similar to
-   * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to
-   * generate the RDDs with Spark's default number of partitions.
-   * @param windowDuration width of the window; must be a multiple of this DStream's
-   *                       batching interval
-   * @param slideDuration  sliding interval of the window (i.e., the interval after which
-   *                       the new DStream will generate RDDs); must be a multiple of this
-   *                       DStream's batching interval
-   */
-  def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = {
-    groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner())
-  }
-
-  /**
-   * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
-   * Similar to `DStream.groupByKey()`, but applies it over a sliding window.
-   * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
-   * @param windowDuration width of the window; must be a multiple of this DStream's
-   *                       batching interval
-   * @param slideDuration  sliding interval of the window (i.e., the interval after which
-   *                       the new DStream will generate RDDs); must be a multiple of this
-   *                       DStream's batching interval
-   * @param numPartitions  number of partitions of each RDD in the new DStream; if not specified
-   *                       then Spark's default number of partitions will be used
-   */
-  def groupByKeyAndWindow(
-      windowDuration: Duration,
-      slideDuration: Duration,
-      numPartitions: Int
-    ): DStream[(K, Seq[V])] = {
-    groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions))
-  }
-
-  /**
-   * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
-   * Similar to `DStream.groupByKey()`, but applies it over a sliding window.
-   * @param windowDuration width of the window; must be a multiple of this DStream's
-   *                       batching interval
-   * @param slideDuration  sliding interval of the window (i.e., the interval after which
-   *                       the new DStream will generate RDDs); must be a multiple of this
-   *                       DStream's batching interval
-   * @param partitioner    partitioner for controlling the partitioning of each RDD in the new DStream.
-   */
-  def groupByKeyAndWindow(
-      windowDuration: Duration,
-      slideDuration: Duration,
-      partitioner: Partitioner
-    ): DStream[(K, Seq[V])] = {
-    val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= v
-    val mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= v
-    val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2
-    self.groupByKey(partitioner)
-        .window(windowDuration, slideDuration)
-        .combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner)
-        .asInstanceOf[DStream[(K, Seq[V])]]
-  }
-
-  /**
-   * Return a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
-   * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream
-   * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate
-   * the RDDs with Spark's default number of partitions.
-   * @param reduceFunc associative reduce function
-   * @param windowDuration width of the window; must be a multiple of this DStream's
-   *                       batching interval
-   */
-  def reduceByKeyAndWindow(
-      reduceFunc: (V, V) => V,
-      windowDuration: Duration
-    ): DStream[(K, V)] = {
-    reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner())
-  }
-
-  /**
-   * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
-   * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
-   * generate the RDDs with Spark's default number of partitions.
-   * @param reduceFunc associative reduce function
-   * @param windowDuration width of the window; must be a multiple of this DStream's
-   *                       batching interval
-   * @param slideDuration  sliding interval of the window (i.e., the interval after which
-   *                       the new DStream will generate RDDs); must be a multiple of this
-   *                       DStream's batching interval
-   */
-  def reduceByKeyAndWindow(
-      reduceFunc: (V, V) => V,
-      windowDuration: Duration,
-      slideDuration: Duration
-    ): DStream[(K, V)] = {
-    reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())
-  }
-
-  /**
-   * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
-   * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
-   * generate the RDDs with `numPartitions` partitions.
-   * @param reduceFunc associative reduce function
-   * @param windowDuration width of the window; must be a multiple of this DStream's
-   *                       batching interval
-   * @param slideDuration  sliding interval of the window (i.e., the interval after which
-   *                       the new DStream will generate RDDs); must be a multiple of this
-   *                       DStream's batching interval
-   * @param numPartitions  number of partitions of each RDD in the new DStream.
-   */
-  def reduceByKeyAndWindow(
-      reduceFunc: (V, V) => V,
-      windowDuration: Duration,
-      slideDuration: Duration,
-      numPartitions: Int
-    ): DStream[(K, V)] = {
-    reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions))
-  }
-
-  /**
-   * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to
-   * `DStream.reduceByKey()`, but applies it over a sliding window.
-   * @param reduceFunc associative reduce function
-   * @param windowDuration width of the window; must be a multiple of this DStream's
-   *                       batching interval
-   * @param slideDuration  sliding interval of the window (i.e., the interval after which
-   *                       the new DStream will generate RDDs); must be a multiple of this
-   *                       DStream's batching interval
-   * @param partitioner    partitioner for controlling the partitioning of each RDD
-   *                       in the new DStream.
-   */
-  def reduceByKeyAndWindow(
-      reduceFunc: (V, V) => V,
-      windowDuration: Duration,
-      slideDuration: Duration,
-      partitioner: Partitioner
-    ): DStream[(K, V)] = {
-    val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
-    self.reduceByKey(cleanedReduceFunc, partitioner)
-        .window(windowDuration, slideDuration)
-        .reduceByKey(cleanedReduceFunc, partitioner)
-  }
-
-  /**
-   * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
-   * The reduced value of over a new window is calculated using the old window's reduced value :
-   *  1. reduce the new values that entered the window (e.g., adding new counts)
-   *
-   *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
-   *
-   * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
-   * However, it is applicable to only "invertible reduce functions".
-   * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
-   * @param reduceFunc associative reduce function
-   * @param invReduceFunc inverse reduce function
-   * @param windowDuration width of the window; must be a multiple of this DStream's
-   *                       batching interval
-   * @param slideDuration  sliding interval of the window (i.e., the interval after which
-   *                       the new DStream will generate RDDs); must be a multiple of this
-   *                       DStream's batching interval
-   * @param filterFunc     Optional function to filter expired key-value pairs;
-   *                       only pairs that satisfy the function are retained
-   */
-  def reduceByKeyAndWindow(
-      reduceFunc: (V, V) => V,
-      invReduceFunc: (V, V) => V,
-      windowDuration: Duration,
-      slideDuration: Duration = self.slideDuration,
-      numPartitions: Int = ssc.sc.defaultParallelism,
-      filterFunc: ((K, V)) => Boolean = null
-    ): DStream[(K, V)] = {
-
-    reduceByKeyAndWindow(
-      reduceFunc, invReduceFunc, windowDuration,
-      slideDuration, defaultPartitioner(numPartitions), filterFunc
-    )
-  }
-
-  /**
-   * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
-   * The reduced value of over a new window is calculated using the old window's reduced value :
-   *  1. reduce the new values that entered the window (e.g., adding new counts)
-   *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
-   * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
-   * However, it is applicable to only "invertible reduce functions".
-   * @param reduceFunc     associative reduce function
-   * @param invReduceFunc  inverse reduce function
-   * @param windowDuration width of the window; must be a multiple of this DStream's
-   *                       batching interval
-   * @param slideDuration  sliding interval of the window (i.e., the interval after which
-   *                       the new DStream will generate RDDs); must be a multiple of this
-   *                       DStream's batching interval
-   * @param partitioner    partitioner for controlling the partitioning of each RDD in the new DStream.
-   * @param filterFunc     Optional function to filter expired key-value pairs;
-   *                       only pairs that satisfy the function are retained
-   */
-  def reduceByKeyAndWindow(
-      reduceFunc: (V, V) => V,
-      invReduceFunc: (V, V) => V,
-      windowDuration: Duration,
-      slideDuration: Duration,
-      partitioner: Partitioner,
-      filterFunc: ((K, V)) => Boolean
-    ): DStream[(K, V)] = {
-
-    val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
-    val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
-    val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None
-    new ReducedWindowedDStream[K, V](
-      self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,
-      windowDuration, slideDuration, partitioner
-    )
-  }
-
-  /**
-   * Return a new "state" DStream where the state for each key is updated by applying
-   * the given function on the previous state of the key and the new values of each key.
-   * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
-   * @param updateFunc State update function. If `this` function returns None, then
-   *                   corresponding state key-value pair will be eliminated.
-   * @tparam S State type
-   */
-  def updateStateByKey[S: ClassTag](
-      updateFunc: (Seq[V], Option[S]) => Option[S]
-    ): DStream[(K, S)] = {
-    updateStateByKey(updateFunc, defaultPartitioner())
-  }
-
-  /**
-   * Return a new "state" DStream where the state for each key is updated by applying
-   * the given function on the previous state of the key and the new values of each key.
-   * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
-   * @param updateFunc State update function. If `this` function returns None, then
-   *                   corresponding state key-value pair will be eliminated.
-   * @param numPartitions Number of partitions of each RDD in the new DStream.
-   * @tparam S State type
-   */
-  def updateStateByKey[S: ClassTag](
-      updateFunc: (Seq[V], Option[S]) => Option[S],
-      numPartitions: Int
-    ): DStream[(K, S)] = {
-    updateStateByKey(updateFunc, defaultPartitioner(numPartitions))
-  }
-
-  /**
-   * Return a new "state" DStream where the state for each key is updated by applying
-   * the given function on the previous state of the key and the new values of the key.
-   * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
-   * @param updateFunc State update function. If `this` function returns None, then
-   *                   corresponding state key-value pair will be eliminated.
-   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
-   * @tparam S State type
-   */
-  def updateStateByKey[S: ClassTag](
-      updateFunc: (Seq[V], Option[S]) => Option[S],
-      partitioner: Partitioner
-    ): DStream[(K, S)] = {
-    val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
-      iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
-    }
-    updateStateByKey(newUpdateFunc, partitioner, true)
-  }
-
-  /**
-   * Return a new "state" DStream where the state for each key is updated by applying
-   * the given function on the previous state of the key and the new values of each key.
-   * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
-   * @param updateFunc State update function. If `this` function returns None, then
-   *                   corresponding state key-value pair will be eliminated. Note, that
-   *                   this function may generate a different a tuple with a different key
-   *                   than the input key. It is up to the developer to decide whether to
-   *                   remember the partitioner despite the key being changed.
-   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
-   * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
-   * @tparam S State type
-   */
-  def updateStateByKey[S: ClassTag](
-      updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
-      partitioner: Partitioner,
-      rememberPartitioner: Boolean
-    ): DStream[(K, S)] = {
-     new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner)
-  }
-
-  /**
-   * Return a new DStream by applying a map function to the value of each key-value pairs in
-   * 'this' DStream without changing the key.
-   */
-  def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = {
-    new MapValuedDStream[K, V, U](self, mapValuesFunc)
-  }
-
-  /**
-   * Return a new DStream by applying a flatmap function to the value of each key-value pairs in
-   * 'this' DStream without changing the key.
-   */
-  def flatMapValues[U: ClassTag](
-      flatMapValuesFunc: V => TraversableOnce[U]
-    ): DStream[(K, U)] = {
-    new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
-  }
-
-  /**
-   * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
-   * Hash partitioning is used to generate the RDDs with Spark's default number
-   * of partitions.
-   */
-  def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
-    cogroup(other, defaultPartitioner())
-  }
-
-  /**
-   * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
-   * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
-   */
-  def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = {
-    cogroup(other, defaultPartitioner(numPartitions))
-  }
-
-  /**
-   * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
-   * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs.
-   */
-  def cogroup[W: ClassTag](
-      other: DStream[(K, W)],
-      partitioner: Partitioner
-    ): DStream[(K, (Seq[V], Seq[W]))] = {
-    self.transformWith(
-      other,
-      (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner)
-    )
-  }
-
-  /**
-   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
-   * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
-   */
-  def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
-    join[W](other, defaultPartitioner())
-  }
-
-  /**
-   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
-   * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
-   */
-  def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = {
-    join[W](other, defaultPartitioner(numPartitions))
-  }
-
-  /**
-   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
-   * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
-   */
-  def join[W: ClassTag](
-      other: DStream[(K, W)],
-      partitioner: Partitioner
-    ): DStream[(K, (V, W))] = {
-    self.transformWith(
-      other,
-      (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner)
-    )
-  }
-
-  /**
-   * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
-   * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
-   * number of partitions.
-   */
-  def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = {
-    leftOuterJoin[W](other, defaultPartitioner())
-  }
-
-  /**
-   * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
-   * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
-   * partitions.
-   */
-  def leftOuterJoin[W: ClassTag](
-      other: DStream[(K, W)],
-      numPartitions: Int
-    ): DStream[(K, (V, Option[W]))] = {
-    leftOuterJoin[W](other, defaultPartitioner(numPartitions))
-  }
-
-  /**
-   * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
-   * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
-   * the partitioning of each RDD.
-   */
-  def leftOuterJoin[W: ClassTag](
-      other: DStream[(K, W)],
-      partitioner: Partitioner
-    ): DStream[(K, (V, Option[W]))] = {
-    self.transformWith(
-      other,
-      (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner)
-    )
-  }
-
-  /**
-   * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
-   * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
-   * number of partitions.
-   */
-  def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = {
-    rightOuterJoin[W](other, defaultPartitioner())
-  }
-
-  /**
-   * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
-   * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
-   * partitions.
-   */
-  def rightOuterJoin[W: ClassTag](
-      other: DStream[(K, W)],
-      numPartitions: Int
-    ): DStream[(K, (Option[V], W))] = {
-    rightOuterJoin[W](other, defaultPartitioner(numPartitions))
-  }
-
-  /**
-   * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
-   * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
-   * the partitioning of each RDD.
-   */
-  def rightOuterJoin[W: ClassTag](
-      other: DStream[(K, W)],
-      partitioner: Partitioner
-    ): DStream[(K, (Option[V], W))] = {
-    self.transformWith(
-      other,
-      (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner)
-    )
-  }
-
-  /**
-   * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
-   * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
-   */
-  def saveAsHadoopFiles[F <: OutputFormat[K, V]](
-      prefix: String,
-      suffix: String
-    )(implicit fm: ClassTag[F]) {
-    saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
-  }
-
-  /**
-   * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
-   * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
-   */
-  def saveAsHadoopFiles(
-      prefix: String,
-      suffix: String,
-      keyClass: Class[_],
-      valueClass: Class[_],
-      outputFormatClass: Class[_ <: OutputFormat[_, _]],
-      conf: JobConf = new JobConf
-    ) {  
-    val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
-      val file = rddToFileName(prefix, suffix, time)
-      rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
-    }
-    self.foreach(saveFunc)
-  }
-
-  /**
-   * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
-   * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
-   */
-  def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
-      prefix: String,
-      suffix: String
-    )(implicit fm: ClassTag[F])  {
-    saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
-  }
-
-  /**
-   * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
-   * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
-   */
-  def saveAsNewAPIHadoopFiles(
-      prefix: String,
-      suffix: String,
-      keyClass: Class[_],
-      valueClass: Class[_],
-      outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
-      conf: Configuration = new Configuration
-    ) {
-    val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
-      val file = rddToFileName(prefix, suffix, time)
-      rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
-    }
-    self.foreach(saveFunc)
-  }
-
-  private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass
-
-  private def getValueClass() = implicitly[ClassTag[V]].runtimeClass
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
index d29033d..c92854c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
@@ -17,13 +17,14 @@
 
 package org.apache.spark.streaming.api.java
 
-import org.apache.spark.streaming.{Duration, Time, DStream}
+import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.api.java.function.{Function => JFunction}
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.rdd.RDD
 
 import scala.reflect.ClassTag
+import org.apache.spark.streaming.dstream.DStream
 
 /**
  * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 64f38ce..d3cd52a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -30,6 +30,7 @@ import org.apache.spark.api.java.function.{Function3 => JFunction3, _}
 import java.util
 import org.apache.spark.rdd.RDD
 import JavaDStream._
+import org.apache.spark.streaming.dstream.DStream
 
 trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]
     extends Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 6c3467d..6bb985c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -35,6 +35,7 @@ import org.apache.spark.storage.StorageLevel
 import com.google.common.base.Optional
 import org.apache.spark.rdd.RDD
 import org.apache.spark.rdd.PairRDDFunctions
+import org.apache.spark.streaming.dstream.DStream
 
 class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
     implicit val kManifest: ClassTag[K],

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index ea7f7da..03b4223 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -36,6 +36,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.scheduler.StreamingListener
 import org.apache.hadoop.conf.Configuration
+import org.apache.spark.streaming.dstream.DStream
 
 /**
  * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic


[9/9] git commit: Merge pull request #400 from tdas/dstream-move

Posted by pw...@apache.org.
Merge pull request #400 from tdas/dstream-move

Moved DStream and PairDSream to org.apache.spark.streaming.dstream

Similar to the package location of `org.apache.spark.rdd.RDD`, `DStream` has been moved from `org.apache.spark.streaming.DStream` to `org.apache.spark.streaming.dstream.DStream`. I know that the package name is a little long, but I think its better to keep it consistent with Spark's structure.

Also fixed persistence of windowed DStream. The RDDs generated generated by windowed DStream are essentially unions of underlying RDDs, and persistent these union RDDs would store numerous copies of the underlying data. Instead setting the persistence level on the windowed DStream is made to set the persistence level of the underlying DStream.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b93f9d42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b93f9d42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b93f9d42

Branch: refs/heads/master
Commit: b93f9d42f21f03163734ef97b2871db945e166da
Parents: e6ed13f ffa1d38
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 13 12:18:05 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 13 12:18:05 2014 -0800

----------------------------------------------------------------------
 docs/streaming-programming-guide.md             |   2 +-
 .../examples/RecoverableNetworkWordCount.scala  |   2 +-
 .../spark/streaming/flume/FlumeUtils.scala      |   4 +-
 .../spark/streaming/kafka/KafkaUtils.scala      |   6 +-
 .../apache/spark/streaming/mqtt/MQTTUtils.scala |   4 +-
 .../spark/streaming/twitter/TwitterUtils.scala  |   7 +-
 .../spark/streaming/zeromq/ZeroMQUtils.scala    |   3 +-
 .../org/apache/spark/streaming/DStream.scala    | 756 ------------------
 .../spark/streaming/DStreamCheckpointData.scala | 128 ----
 .../apache/spark/streaming/DStreamGraph.scala   |   6 +-
 .../spark/streaming/PairDStreamFunctions.scala  | 621 ---------------
 .../spark/streaming/StreamingContext.scala      |   5 +-
 .../spark/streaming/api/java/JavaDStream.scala  |   3 +-
 .../streaming/api/java/JavaDStreamLike.scala    |   1 +
 .../streaming/api/java/JavaPairDStream.scala    |   1 +
 .../api/java/JavaStreamingContext.scala         |   5 +-
 .../spark/streaming/dstream/DStream.scala       | 757 +++++++++++++++++++
 .../dstream/DStreamCheckpointData.scala         | 126 +++
 .../streaming/dstream/FileInputDStream.scala    |   2 +-
 .../streaming/dstream/FilteredDStream.scala     |   2 +-
 .../dstream/FlatMapValuedDStream.scala          |   2 +-
 .../streaming/dstream/FlatMappedDStream.scala   |   2 +-
 .../streaming/dstream/ForEachDStream.scala      |   2 +-
 .../streaming/dstream/GlommedDStream.scala      |   2 +-
 .../spark/streaming/dstream/InputDStream.scala  |   2 +-
 .../dstream/MapPartitionedDStream.scala         |   2 +-
 .../streaming/dstream/MapValuedDStream.scala    |   2 +-
 .../spark/streaming/dstream/MappedDStream.scala |   2 +-
 .../dstream/PairDStreamFunctions.scala          | 622 +++++++++++++++
 .../dstream/ReducedWindowedDStream.scala        |   2 +-
 .../streaming/dstream/ShuffledDStream.scala     |   2 +-
 .../spark/streaming/dstream/StateDStream.scala  |   2 +-
 .../streaming/dstream/TransformedDStream.scala  |   2 +-
 .../spark/streaming/dstream/UnionDStream.scala  |   3 +-
 .../streaming/dstream/WindowedDStream.scala     |  17 +-
 .../streaming/util/MasterFailureTest.scala      |   2 +-
 .../spark/streaming/BasicOperationsSuite.scala  |   1 +
 .../spark/streaming/CheckpointSuite.scala       |   2 +-
 .../spark/streaming/StreamingContextSuite.scala |   3 +-
 .../streaming/StreamingListenerSuite.scala      |   1 +
 .../apache/spark/streaming/TestSuiteBase.scala  |   2 +-
 .../spark/streaming/WindowOperationsSuite.scala |  15 +
 .../tools/JavaAPICompletenessChecker.scala      |  50 +-
 43 files changed, 1613 insertions(+), 1570 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b93f9d42/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b93f9d42/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b93f9d42/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b93f9d42/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b93f9d42/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------


[2/9] Moved DStream, DStreamCheckpointData and PairDStream from org.apache.spark.streaming to org.apache.spark.streaming.dstream.

Posted by pw...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
new file mode 100644
index 0000000..fd72ebc
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -0,0 +1,742 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import scala.collection.mutable.HashMap
+import scala.reflect.ClassTag
+
+import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.MetadataCleaner
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.scheduler.Job
+import org.apache.spark.streaming.Duration
+
+
+/**
+ * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
+ * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]]
+ * for more details on RDDs). DStreams can either be created from live data (such as, data from
+ * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
+ * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
+ * DStream periodically generates a RDD, either from live data or by transforming the RDD generated
+ * by a parent DStream.
+ *
+ * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
+ * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains operations available
+ * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations
+ * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through
+ * implicit conversions when `spark.streaming.StreamingContext._` is imported.
+ *
+ * DStreams internally is characterized by a few basic properties:
+ *  - A list of other DStreams that the DStream depends on
+ *  - A time interval at which the DStream generates an RDD
+ *  - A function that is used to generate an RDD after each time interval
+ */
+
+abstract class DStream[T: ClassTag] (
+    @transient private[streaming] var ssc: StreamingContext
+  ) extends Serializable with Logging {
+
+  // =======================================================================
+  // Methods that should be implemented by subclasses of DStream
+  // =======================================================================
+
+  /** Time interval after which the DStream generates a RDD */
+  def slideDuration: Duration
+
+  /** List of parent DStreams on which this DStream depends on */
+  def dependencies: List[DStream[_]]
+
+  /** Method that generates a RDD for the given time */
+  def compute (validTime: Time): Option[RDD[T]]
+
+  // =======================================================================
+  // Methods and fields available on all DStreams
+  // =======================================================================
+
+  // RDDs generated, marked as private[streaming] so that testsuites can access it
+  @transient
+  private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
+
+  // Time zero for the DStream
+  private[streaming] var zeroTime: Time = null
+
+  // Duration for which the DStream will remember each RDD created
+  private[streaming] var rememberDuration: Duration = null
+
+  // Storage level of the RDDs in the stream
+  private[streaming] var storageLevel: StorageLevel = StorageLevel.NONE
+
+  // Checkpoint details
+  private[streaming] val mustCheckpoint = false
+  private[streaming] var checkpointDuration: Duration = null
+  private[streaming] val checkpointData = new DStreamCheckpointData(this)
+
+  // Reference to whole DStream graph
+  private[streaming] var graph: DStreamGraph = null
+
+  private[streaming] def isInitialized = (zeroTime != null)
+
+  // Duration for which the DStream requires its parent DStream to remember each RDD created
+  private[streaming] def parentRememberDuration = rememberDuration
+
+  /** Return the StreamingContext associated with this DStream */
+  def context = ssc
+
+  /** Persist the RDDs of this DStream with the given storage level */
+  def persist(level: StorageLevel): DStream[T] = {
+    if (this.isInitialized) {
+      throw new UnsupportedOperationException(
+        "Cannot change storage level of an DStream after streaming context has started")
+    }
+    this.storageLevel = level
+    this
+  }
+
+  /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+  def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)
+
+  /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+  def cache(): DStream[T] = persist()
+
+  /**
+   * Enable periodic checkpointing of RDDs of this DStream
+   * @param interval Time interval after which generated RDD will be checkpointed
+   */
+  def checkpoint(interval: Duration): DStream[T] = {
+    if (isInitialized) {
+      throw new UnsupportedOperationException(
+        "Cannot change checkpoint interval of an DStream after streaming context has started")
+    }
+    persist()
+    checkpointDuration = interval
+    this
+  }
+
+  /**
+   * Initialize the DStream by setting the "zero" time, based on which
+   * the validity of future times is calculated. This method also recursively initializes
+   * its parent DStreams.
+   */
+  private[streaming] def initialize(time: Time) {
+    if (zeroTime != null && zeroTime != time) {
+      throw new Exception("ZeroTime is already initialized to " + zeroTime
+        + ", cannot initialize it again to " + time)
+    }
+    zeroTime = time
+
+    // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
+    if (mustCheckpoint && checkpointDuration == null) {
+      checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
+      logInfo("Checkpoint interval automatically set to " + checkpointDuration)
+    }
+
+    // Set the minimum value of the rememberDuration if not already set
+    var minRememberDuration = slideDuration
+    if (checkpointDuration != null && minRememberDuration <= checkpointDuration) {
+      minRememberDuration = checkpointDuration * 2  // times 2 just to be sure that the latest checkpoint is not forgetten
+    }
+    if (rememberDuration == null || rememberDuration < minRememberDuration) {
+      rememberDuration = minRememberDuration
+    }
+
+    // Initialize the dependencies
+    dependencies.foreach(_.initialize(zeroTime))
+  }
+
+  private[streaming] def validate() {
+    assert(rememberDuration != null, "Remember duration is set to null")
+
+    assert(
+      !mustCheckpoint || checkpointDuration != null,
+      "The checkpoint interval for " + this.getClass.getSimpleName + " has not been set." +
+        " Please use DStream.checkpoint() to set the interval."
+    )
+
+    assert(
+     checkpointDuration == null || context.sparkContext.checkpointDir.isDefined,
+      "The checkpoint directory has not been set. Please use StreamingContext.checkpoint()" +
+      " or SparkContext.checkpoint() to set the checkpoint directory."
+    )
+
+    assert(
+      checkpointDuration == null || checkpointDuration >= slideDuration,
+      "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
+        checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " +
+        "Please set it to at least " + slideDuration + "."
+    )
+
+    assert(
+      checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration),
+      "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
+        checkpointDuration + " which not a multiple of its slide time (" + slideDuration + "). " +
+        "Please set it to a multiple " + slideDuration + "."
+    )
+
+    assert(
+      checkpointDuration == null || storageLevel != StorageLevel.NONE,
+      "" + this.getClass.getSimpleName + " has been marked for checkpointing but the storage " +
+        "level has not been set to enable persisting. Please use DStream.persist() to set the " +
+        "storage level to use memory for better checkpointing performance."
+    )
+
+    assert(
+      checkpointDuration == null || rememberDuration > checkpointDuration,
+      "The remember duration for " + this.getClass.getSimpleName + " has been set to " +
+        rememberDuration + " which is not more than the checkpoint interval (" +
+        checkpointDuration + "). Please set it to higher than " + checkpointDuration + "."
+    )
+
+    val metadataCleanerDelay = MetadataCleaner.getDelaySeconds(ssc.conf)
+    logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
+    assert(
+      metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000,
+      "It seems you are doing some DStream window operation or setting a checkpoint interval " +
+        "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
+        "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" +
+        "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " +
+        "set the Java property 'spark.cleaner.delay' to more than " +
+        math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds."
+    )
+
+    dependencies.foreach(_.validate())
+
+    logInfo("Slide time = " + slideDuration)
+    logInfo("Storage level = " + storageLevel)
+    logInfo("Checkpoint interval = " + checkpointDuration)
+    logInfo("Remember duration = " + rememberDuration)
+    logInfo("Initialized and validated " + this)
+  }
+
+  private[streaming] def setContext(s: StreamingContext) {
+    if (ssc != null && ssc != s) {
+      throw new Exception("Context is already set in " + this + ", cannot set it again")
+    }
+    ssc = s
+    logInfo("Set context for " + this)
+    dependencies.foreach(_.setContext(ssc))
+  }
+
+  private[streaming] def setGraph(g: DStreamGraph) {
+    if (graph != null && graph != g) {
+      throw new Exception("Graph is already set in " + this + ", cannot set it again")
+    }
+    graph = g
+    dependencies.foreach(_.setGraph(graph))
+  }
+
+  private[streaming] def remember(duration: Duration) {
+    if (duration != null && duration > rememberDuration) {
+      rememberDuration = duration
+      logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this)
+    }
+    dependencies.foreach(_.remember(parentRememberDuration))
+  }
+
+  /** Checks whether the 'time' is valid wrt slideDuration for generating RDD */
+  private[streaming] def isTimeValid(time: Time): Boolean = {
+    if (!isInitialized) {
+      throw new Exception (this + " has not been initialized")
+    } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
+      logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
+      false
+    } else {
+      logDebug("Time " + time + " is valid")
+      true
+    }
+  }
+
+  /**
+   * Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal
+   * method that should not be called directly.
+   */
+  private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
+    // If this DStream was not initialized (i.e., zeroTime not set), then do it
+    // If RDD was already generated, then retrieve it from HashMap
+    generatedRDDs.get(time) match {
+
+      // If an RDD was already generated and is being reused, then
+      // probably all RDDs in this DStream will be reused and hence should be cached
+      case Some(oldRDD) => Some(oldRDD)
+
+      // if RDD was not generated, and if the time is valid
+      // (based on sliding time of this DStream), then generate the RDD
+      case None => {
+        if (isTimeValid(time)) {
+          compute(time) match {
+            case Some(newRDD) =>
+              if (storageLevel != StorageLevel.NONE) {
+                newRDD.persist(storageLevel)
+                logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time)
+              }
+              if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
+                newRDD.checkpoint()
+                logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time)
+              }
+              generatedRDDs.put(time, newRDD)
+              Some(newRDD)
+            case None =>
+              None
+          }
+        } else {
+          None
+        }
+      }
+    }
+  }
+
+  /**
+   * Generate a SparkStreaming job for the given time. This is an internal method that
+   * should not be called directly. This default implementation creates a job
+   * that materializes the corresponding RDD. Subclasses of DStream may override this
+   * to generate their own jobs.
+   */
+  private[streaming] def generateJob(time: Time): Option[Job] = {
+    getOrCompute(time) match {
+      case Some(rdd) => {
+        val jobFunc = () => {
+          val emptyFunc = { (iterator: Iterator[T]) => {} }
+          context.sparkContext.runJob(rdd, emptyFunc)
+        }
+        Some(new Job(time, jobFunc))
+      }
+      case None => None
+    }
+  }
+
+  /**
+   * Clear metadata that are older than `rememberDuration` of this DStream.
+   * This is an internal method that should not be called directly. This default
+   * implementation clears the old generated RDDs. Subclasses of DStream may override
+   * this to clear their own metadata along with the generated RDDs.
+   */
+  private[streaming] def clearMetadata(time: Time) {
+    val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
+    generatedRDDs --= oldRDDs.keys
+    logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
+      (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
+    dependencies.foreach(_.clearMetadata(time))
+  }
+
+  /* Adds metadata to the Stream while it is running.
+   * This method should be overwritten by sublcasses of InputDStream.
+   */
+  private[streaming] def addMetadata(metadata: Any) {
+    if (metadata != null) {
+      logInfo("Dropping Metadata: " + metadata.toString)
+    }
+  }
+
+  /**
+   * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of
+   * this stream. This is an internal method that should not be called directly. This is
+   * a default implementation that saves only the file names of the checkpointed RDDs to
+   * checkpointData. Subclasses of DStream (especially those of InputDStream) may override
+   * this method to save custom checkpoint data.
+   */
+  private[streaming] def updateCheckpointData(currentTime: Time) {
+    logInfo("Updating checkpoint data for time " + currentTime)
+    checkpointData.update(currentTime)
+    dependencies.foreach(_.updateCheckpointData(currentTime))
+    logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData)
+  }
+
+  private[streaming] def clearCheckpointData(time: Time) {
+    logInfo("Clearing checkpoint data")
+    checkpointData.cleanup(time)
+    dependencies.foreach(_.clearCheckpointData(time))
+    logInfo("Cleared checkpoint data")
+  }
+
+  /**
+   * Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method
+   * that should not be called directly. This is a default implementation that recreates RDDs
+   * from the checkpoint file names stored in checkpointData. Subclasses of DStream that
+   * override the updateCheckpointData() method would also need to override this method.
+   */
+  private[streaming] def restoreCheckpointData() {
+    // Create RDDs from the checkpoint data
+    logInfo("Restoring checkpoint data")
+    checkpointData.restore()
+    dependencies.foreach(_.restoreCheckpointData())
+    logInfo("Restored checkpoint data")
+  }
+
+  @throws(classOf[IOException])
+  private def writeObject(oos: ObjectOutputStream) {
+    logDebug(this.getClass().getSimpleName + ".writeObject used")
+    if (graph != null) {
+      graph.synchronized {
+        if (graph.checkpointInProgress) {
+          oos.defaultWriteObject()
+        } else {
+          val msg = "Object of " + this.getClass.getName + " is being serialized " +
+            " possibly as a part of closure of an RDD operation. This is because " +
+            " the DStream object is being referred to from within the closure. " +
+            " Please rewrite the RDD operation inside this DStream to avoid this. " +
+            " This has been enforced to avoid bloating of Spark tasks " +
+            " with unnecessary objects."
+          throw new java.io.NotSerializableException(msg)
+        }
+      }
+    } else {
+      throw new java.io.NotSerializableException("Graph is unexpectedly null when DStream is being serialized.")
+    }
+  }
+
+  @throws(classOf[IOException])
+  private def readObject(ois: ObjectInputStream) {
+    logDebug(this.getClass().getSimpleName + ".readObject used")
+    ois.defaultReadObject()
+    generatedRDDs = new HashMap[Time, RDD[T]] ()
+  }
+
+  // =======================================================================
+  // DStream operations
+  // =======================================================================
+
+  /** Return a new DStream by applying a function to all elements of this DStream. */
+  def map[U: ClassTag](mapFunc: T => U): DStream[U] = {
+    new MappedDStream(this, context.sparkContext.clean(mapFunc))
+  }
+
+  /**
+   * Return a new DStream by applying a function to all elements of this DStream,
+   * and then flattening the results
+   */
+  def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = {
+    new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
+  }
+
+  /** Return a new DStream containing only the elements that satisfy a predicate. */
+  def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc)
+
+  /**
+   * Return a new DStream in which each RDD is generated by applying glom() to each RDD of
+   * this DStream. Applying glom() to an RDD coalesces all elements within each partition into
+   * an array.
+   */
+  def glom(): DStream[Array[T]] = new GlommedDStream(this)
+
+
+  /**
+   * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the
+   * returned DStream has exactly numPartitions partitions.
+   */
+  def repartition(numPartitions: Int): DStream[T] = this.transform(_.repartition(numPartitions))
+
+  /**
+   * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
+   * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
+   * of the RDD.
+   */
+  def mapPartitions[U: ClassTag](
+      mapPartFunc: Iterator[T] => Iterator[U],
+      preservePartitioning: Boolean = false
+    ): DStream[U] = {
+    new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning)
+  }
+
+  /**
+   * Return a new DStream in which each RDD has a single element generated by reducing each RDD
+   * of this DStream.
+   */
+  def reduce(reduceFunc: (T, T) => T): DStream[T] =
+    this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
+
+  /**
+   * Return a new DStream in which each RDD has a single element generated by counting each RDD
+   * of this DStream.
+   */
+  def count(): DStream[Long] = {
+    this.map(_ => (null, 1L))
+        .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1)))
+        .reduceByKey(_ + _)
+        .map(_._2)
+  }
+
+  /**
+   * Return a new DStream in which each RDD contains the counts of each distinct value in
+   * each RDD of this DStream. Hash partitioning is used to generate
+   * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
+   * `numPartitions` not specified).
+   */
+  def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)] =
+    this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
+
+  /**
+   * Apply a function to each RDD in this DStream. This is an output operator, so
+   * 'this' DStream will be registered as an output stream and therefore materialized.
+   */
+  def foreach(foreachFunc: RDD[T] => Unit) {
+    this.foreach((r: RDD[T], t: Time) => foreachFunc(r))
+  }
+
+  /**
+   * Apply a function to each RDD in this DStream. This is an output operator, so
+   * 'this' DStream will be registered as an output stream and therefore materialized.
+   */
+  def foreach(foreachFunc: (RDD[T], Time) => Unit) {
+    ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc)))
+  }
+
+  /**
+   * Return a new DStream in which each RDD is generated by applying a function
+   * on each RDD of 'this' DStream.
+   */
+  def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
+    transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r)))
+  }
+
+  /**
+   * Return a new DStream in which each RDD is generated by applying a function
+   * on each RDD of 'this' DStream.
+   */
+  def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
+    //new TransformedDStream(this, context.sparkContext.clean(transformFunc))
+    val cleanedF = context.sparkContext.clean(transformFunc)
+    val realTransformFunc =  (rdds: Seq[RDD[_]], time: Time) => {
+      assert(rdds.length == 1)
+      cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
+    }
+    new TransformedDStream[U](Seq(this), realTransformFunc)
+  }
+
+  /**
+   * Return a new DStream in which each RDD is generated by applying a function
+   * on each RDD of 'this' DStream and 'other' DStream.
+   */
+  def transformWith[U: ClassTag, V: ClassTag](
+      other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
+    ): DStream[V] = {
+    val cleanedF = ssc.sparkContext.clean(transformFunc)
+    transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2))
+  }
+
+  /**
+   * Return a new DStream in which each RDD is generated by applying a function
+   * on each RDD of 'this' DStream and 'other' DStream.
+   */
+  def transformWith[U: ClassTag, V: ClassTag](
+      other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
+    ): DStream[V] = {
+    val cleanedF = ssc.sparkContext.clean(transformFunc)
+    val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
+      assert(rdds.length == 2)
+      val rdd1 = rdds(0).asInstanceOf[RDD[T]]
+      val rdd2 = rdds(1).asInstanceOf[RDD[U]]
+      cleanedF(rdd1, rdd2, time)
+    }
+    new TransformedDStream[V](Seq(this, other), realTransformFunc)
+  }
+
+  /**
+   * Print the first ten elements of each RDD generated in this DStream. This is an output
+   * operator, so this DStream will be registered as an output stream and there materialized.
+   */
+  def print() {
+    def foreachFunc = (rdd: RDD[T], time: Time) => {
+      val first11 = rdd.take(11)
+      println ("-------------------------------------------")
+      println ("Time: " + time)
+      println ("-------------------------------------------")
+      first11.take(10).foreach(println)
+      if (first11.size > 10) println("...")
+      println()
+    }
+    val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
+    ssc.registerOutputStream(newStream)
+  }
+
+  /**
+   * Return a new DStream in which each RDD contains all the elements in seen in a
+   * sliding window of time over this DStream. The new DStream generates RDDs with
+   * the same interval as this DStream.
+   * @param windowDuration width of the window; must be a multiple of this DStream's interval.
+   */
+  def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
+
+  /**
+   * Return a new DStream in which each RDD contains all the elements in seen in a
+   * sliding window of time over this DStream.
+   * @param windowDuration width of the window; must be a multiple of this DStream's
+   *                       batching interval
+   * @param slideDuration  sliding interval of the window (i.e., the interval after which
+   *                       the new DStream will generate RDDs); must be a multiple of this
+   *                       DStream's batching interval
+   */
+  def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = {
+    new WindowedDStream(this, windowDuration, slideDuration)
+  }
+
+  /**
+   * Return a new DStream in which each RDD has a single element generated by reducing all
+   * elements in a sliding window over this DStream.
+   * @param reduceFunc associative reduce function
+   * @param windowDuration width of the window; must be a multiple of this DStream's
+   *                       batching interval
+   * @param slideDuration  sliding interval of the window (i.e., the interval after which
+   *                       the new DStream will generate RDDs); must be a multiple of this
+   *                       DStream's batching interval
+   */
+  def reduceByWindow(
+      reduceFunc: (T, T) => T,
+      windowDuration: Duration,
+      slideDuration: Duration
+    ): DStream[T] = {
+    this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
+  }
+
+  /**
+   * Return a new DStream in which each RDD has a single element generated by reducing all
+   * elements in a sliding window over this DStream. However, the reduction is done incrementally
+   * using the old window's reduced value :
+   *  1. reduce the new values that entered the window (e.g., adding new counts)
+   *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+   *  This is more efficient than reduceByWindow without "inverse reduce" function.
+   *  However, it is applicable to only "invertible reduce functions".
+   * @param reduceFunc associative reduce function
+   * @param invReduceFunc inverse reduce function
+   * @param windowDuration width of the window; must be a multiple of this DStream's
+   *                       batching interval
+   * @param slideDuration  sliding interval of the window (i.e., the interval after which
+   *                       the new DStream will generate RDDs); must be a multiple of this
+   *                       DStream's batching interval
+   */
+  def reduceByWindow(
+      reduceFunc: (T, T) => T,
+      invReduceFunc: (T, T) => T,
+      windowDuration: Duration,
+      slideDuration: Duration
+    ): DStream[T] = {
+      this.map(x => (1, x))
+          .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
+          .map(_._2)
+  }
+
+  /**
+   * Return a new DStream in which each RDD has a single element generated by counting the number
+   * of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with
+   * Spark's default number of partitions.
+   * @param windowDuration width of the window; must be a multiple of this DStream's
+   *                       batching interval
+   * @param slideDuration  sliding interval of the window (i.e., the interval after which
+   *                       the new DStream will generate RDDs); must be a multiple of this
+   *                       DStream's batching interval
+   */
+  def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = {
+    this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
+  }
+
+  /**
+   * Return a new DStream in which each RDD contains the count of distinct elements in
+   * RDDs in a sliding window over this DStream. Hash partitioning is used to generate
+   * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
+   * `numPartitions` not specified).
+   * @param windowDuration width of the window; must be a multiple of this DStream's
+   *                       batching interval
+   * @param slideDuration  sliding interval of the window (i.e., the interval after which
+   *                       the new DStream will generate RDDs); must be a multiple of this
+   *                       DStream's batching interval
+   * @param numPartitions  number of partitions of each RDD in the new DStream.
+   */
+  def countByValueAndWindow(
+      windowDuration: Duration,
+      slideDuration: Duration,
+      numPartitions: Int = ssc.sc.defaultParallelism
+    ): DStream[(T, Long)] = {
+
+    this.map(x => (x, 1L)).reduceByKeyAndWindow(
+      (x: Long, y: Long) => x + y,
+      (x: Long, y: Long) => x - y,
+      windowDuration,
+      slideDuration,
+      numPartitions,
+      (x: (T, Long)) => x._2 != 0L
+    )
+  }
+
+  /**
+   * Return a new DStream by unifying data of another DStream with this DStream.
+   * @param that Another DStream having the same slideDuration as this DStream.
+   */
+  def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that))
+
+  /**
+   * Return all the RDDs defined by the Interval object (both end times included)
+   */
+  def slice(interval: Interval): Seq[RDD[T]] = {
+    slice(interval.beginTime, interval.endTime)
+  }
+
+  /**
+   * Return all the RDDs between 'fromTime' to 'toTime' (both included)
+   */
+  def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
+    if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
+      logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
+    }
+    if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
+      logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
+    }
+    val alignedToTime = toTime.floor(slideDuration)
+    val alignedFromTime = fromTime.floor(slideDuration)
+
+    logInfo("Slicing from " + fromTime + " to " + toTime +
+      " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
+
+    alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
+      if (time >= zeroTime) getOrCompute(time) else None
+    })
+  }
+
+  /**
+   * Save each RDD in this DStream as a Sequence file of serialized objects.
+   * The file name at each batch interval is generated based on `prefix` and
+   * `suffix`: "prefix-TIME_IN_MS.suffix".
+   */
+  def saveAsObjectFiles(prefix: String, suffix: String = "") {
+    val saveFunc = (rdd: RDD[T], time: Time) => {
+      val file = rddToFileName(prefix, suffix, time)
+      rdd.saveAsObjectFile(file)
+    }
+    this.foreach(saveFunc)
+  }
+
+  /**
+   * Save each RDD in this DStream as at text file, using string representation
+   * of elements. The file name at each batch interval is generated based on
+   * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+   */
+  def saveAsTextFiles(prefix: String, suffix: String = "") {
+    val saveFunc = (rdd: RDD[T], time: Time) => {
+      val file = rddToFileName(prefix, suffix, time)
+      rdd.saveAsTextFile(file)
+    }
+    this.foreach(saveFunc)
+  }
+
+  def register() {
+    ssc.registerOutputStream(this)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
new file mode 100644
index 0000000..2da4127
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import scala.collection.mutable.HashMap
+import scala.reflect.ClassTag
+import java.io.{ObjectInputStream, IOException}
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.FileSystem
+import org.apache.spark.Logging
+import org.apache.spark.streaming.Time
+
+private[streaming]
+class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
+  extends Serializable with Logging {
+  protected val data = new HashMap[Time, AnyRef]()
+
+  // Mapping of the batch time to the checkpointed RDD file of that time
+  @transient private var timeToCheckpointFile = new HashMap[Time, String]
+  // Mapping of the batch time to the time of the oldest checkpointed RDD
+  // in that batch's checkpoint data
+  @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time]
+
+  @transient private var fileSystem : FileSystem = null
+  protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]]
+
+  /**
+   * Updates the checkpoint data of the DStream. This gets called every time
+   * the graph checkpoint is initiated. Default implementation records the
+   * checkpoint files to which the generate RDDs of the DStream has been saved.
+   */
+  def update(time: Time) {
+
+    // Get the checkpointed RDDs from the generated RDDs
+    val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined)
+                                       .map(x => (x._1, x._2.getCheckpointFile.get))
+    logDebug("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n"))
+
+    // Add the checkpoint files to the data to be serialized 
+    if (!checkpointFiles.isEmpty) {
+      currentCheckpointFiles.clear()
+      currentCheckpointFiles ++= checkpointFiles
+      // Add the current checkpoint files to the map of all checkpoint files
+      // This will be used to delete old checkpoint files
+      timeToCheckpointFile ++= currentCheckpointFiles
+      // Remember the time of the oldest checkpoint RDD in current state
+      timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering)
+    }
+  }
+
+  /**
+   * Cleanup old checkpoint data. This gets called after a checkpoint of `time` has been
+   * written to the checkpoint directory.
+   */
+  def cleanup(time: Time) {
+    // Get the time of the oldest checkpointed RDD that was written as part of the
+    // checkpoint of `time`
+    timeToOldestCheckpointFileTime.remove(time) match {
+      case Some(lastCheckpointFileTime) =>
+        // Find all the checkpointed RDDs (i.e. files) that are older than `lastCheckpointFileTime`
+        // This is because checkpointed RDDs older than this are not going to be needed
+        // even after master fails, as the checkpoint data of `time` does not refer to those files
+        val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime)
+        logDebug("Files to delete:\n" + filesToDelete.mkString(","))
+        filesToDelete.foreach {
+          case (time, file) =>
+            try {
+              val path = new Path(file)
+              if (fileSystem == null) {
+                fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration)
+              }
+              fileSystem.delete(path, true)
+              timeToCheckpointFile -= time
+              logInfo("Deleted checkpoint file '" + file + "' for time " + time)
+            } catch {
+              case e: Exception =>
+                logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e)
+                fileSystem = null
+            }
+        }
+      case None =>
+        logInfo("Nothing to delete")
+    }
+  }
+
+  /**
+   * Restore the checkpoint data. This gets called once when the DStream graph
+   * (along with its DStreams) are being restored from a graph checkpoint file.
+   * Default implementation restores the RDDs from their checkpoint files.
+   */
+  def restore() {
+    // Create RDDs from the checkpoint data
+    currentCheckpointFiles.foreach {
+      case(time, file) => {
+        logInfo("Restoring checkpointed RDD for time " + time + " from file '" + file + "'")
+        dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file)))
+      }
+    }
+  }
+
+  override def toString() = {
+    "[\n" + currentCheckpointFiles.size + " checkpoint files \n" + currentCheckpointFiles.mkString("\n") + "\n]"
+  }
+
+  @throws(classOf[IOException])
+  private def readObject(ois: ObjectInputStream) {
+    ois.defaultReadObject()
+    timeToOldestCheckpointFileTime = new HashMap[Time, Time]
+    timeToCheckpointFile = new HashMap[Time, String]
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 1f0f31c..012fbb0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.rdd.UnionRDD
-import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time}
+import org.apache.spark.streaming.{StreamingContext, Time}
 import org.apache.spark.util.TimeStampedHashMap
 
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
index db2e0a4..c81534a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.rdd.RDD
 import scala.reflect.ClassTag
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
index 244dc3e..6586234 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.SparkContext._
 import scala.reflect.ClassTag

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
index 336c4b7..c7bb283 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.rdd.RDD
 import scala.reflect.ClassTag
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index 364abcd..905bc72 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.streaming.dstream
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.streaming.scheduler.Job
 import scala.reflect.ClassTag
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
index 23136f4..a9bb51f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.rdd.RDD
 import scala.reflect.ClassTag
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index 8f84232..a1075ad 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.streaming.{Time, Duration, StreamingContext, DStream}
+import org.apache.spark.streaming.{Time, Duration, StreamingContext}
 
 import scala.reflect.ClassTag
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
index 8a04060..3d8ee29 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.rdd.RDD
 import scala.reflect.ClassTag
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
index 0ce364f..7aea1f9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.SparkContext._
 import scala.reflect.ClassTag

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
index c0b7491..02704a8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.rdd.RDD
 import scala.reflect.ClassTag
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
new file mode 100644
index 0000000..f71dd17
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.dstream._
+
+import org.apache.spark.{Partitioner, HashPartitioner}
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.{ClassTags, RDD, PairRDDFunctions}
+import org.apache.spark.storage.StorageLevel
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.{ClassTag, classTag}
+
+import org.apache.hadoop.mapred.{JobConf, OutputFormat}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
+import org.apache.hadoop.mapred.OutputFormat
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.streaming.{Time, Duration}
+
+class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
+extends Serializable {
+
+  private[streaming] def ssc = self.ssc
+
+  private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
+    new HashPartitioner(numPartitions)
+  }
+
+  /**
+   * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+   * generate the RDDs with Spark's default number of partitions.
+   */
+  def groupByKey(): DStream[(K, Seq[V])] = {
+    groupByKey(defaultPartitioner())
+  }
+
+  /**
+   * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+   * generate the RDDs with `numPartitions` partitions.
+   */
+  def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = {
+    groupByKey(defaultPartitioner(numPartitions))
+  }
+
+  /**
+   * Return a new DStream by applying `groupByKey` on each RDD. The supplied [[org.apache.spark.Partitioner]]
+   * is used to control the partitioning of each RDD.
+   */
+  def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
+    val createCombiner = (v: V) => ArrayBuffer[V](v)
+    val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v)
+    val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2)
+    combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner)
+      .asInstanceOf[DStream[(K, Seq[V])]]
+  }
+
+  /**
+   * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+   * merged using the associative reduce function. Hash partitioning is used to generate the RDDs
+   * with Spark's default number of partitions.
+   */
+  def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = {
+    reduceByKey(reduceFunc, defaultPartitioner())
+  }
+
+  /**
+   * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+   * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs
+   * with `numPartitions` partitions.
+   */
+  def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = {
+    reduceByKey(reduceFunc, defaultPartitioner(numPartitions))
+  }
+
+  /**
+   * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+   * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the
+   * partitioning of each RDD.
+   */
+  def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = {
+    val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
+    combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner)
+  }
+
+  /**
+   * Combine elements of each key in DStream's RDDs using custom functions. This is similar to the
+   * combineByKey for RDDs. Please refer to combineByKey in
+   * [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
+   */
+  def combineByKey[C: ClassTag](
+    createCombiner: V => C,
+    mergeValue: (C, V) => C,
+    mergeCombiner: (C, C) => C,
+    partitioner: Partitioner,
+    mapSideCombine: Boolean = true): DStream[(K, C)] = {
+    new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)
+  }
+
+  /**
+   * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to
+   * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs
+   * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with
+   * Spark's default number of partitions.
+   * @param windowDuration width of the window; must be a multiple of this DStream's
+   *                       batching interval
+   */
+  def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Seq[V])] = {
+    groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner())
+  }
+
+  /**
+   * Return a new DStream by applying `groupByKey` over a sliding window. Similar to
+   * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to
+   * generate the RDDs with Spark's default number of partitions.
+   * @param windowDuration width of the window; must be a multiple of this DStream's
+   *                       batching interval
+   * @param slideDuration  sliding interval of the window (i.e., the interval after which
+   *                       the new DStream will generate RDDs); must be a multiple of this
+   *                       DStream's batching interval
+   */
+  def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = {
+    groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner())
+  }
+
+  /**
+   * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+   * Similar to `DStream.groupByKey()`, but applies it over a sliding window.
+   * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+   * @param windowDuration width of the window; must be a multiple of this DStream's
+   *                       batching interval
+   * @param slideDuration  sliding interval of the window (i.e., the interval after which
+   *                       the new DStream will generate RDDs); must be a multiple of this
+   *                       DStream's batching interval
+   * @param numPartitions  number of partitions of each RDD in the new DStream; if not specified
+   *                       then Spark's default number of partitions will be used
+   */
+  def groupByKeyAndWindow(
+      windowDuration: Duration,
+      slideDuration: Duration,
+      numPartitions: Int
+    ): DStream[(K, Seq[V])] = {
+    groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions))
+  }
+
+  /**
+   * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+   * Similar to `DStream.groupByKey()`, but applies it over a sliding window.
+   * @param windowDuration width of the window; must be a multiple of this DStream's
+   *                       batching interval
+   * @param slideDuration  sliding interval of the window (i.e., the interval after which
+   *                       the new DStream will generate RDDs); must be a multiple of this
+   *                       DStream's batching interval
+   * @param partitioner    partitioner for controlling the partitioning of each RDD in the new DStream.
+   */
+  def groupByKeyAndWindow(
+      windowDuration: Duration,
+      slideDuration: Duration,
+      partitioner: Partitioner
+    ): DStream[(K, Seq[V])] = {
+    val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= v
+    val mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= v
+    val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2
+    self.groupByKey(partitioner)
+        .window(windowDuration, slideDuration)
+        .combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner)
+        .asInstanceOf[DStream[(K, Seq[V])]]
+  }
+
+  /**
+   * Return a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
+   * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream
+   * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate
+   * the RDDs with Spark's default number of partitions.
+   * @param reduceFunc associative reduce function
+   * @param windowDuration width of the window; must be a multiple of this DStream's
+   *                       batching interval
+   */
+  def reduceByKeyAndWindow(
+      reduceFunc: (V, V) => V,
+      windowDuration: Duration
+    ): DStream[(K, V)] = {
+    reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner())
+  }
+
+  /**
+   * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+   * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
+   * generate the RDDs with Spark's default number of partitions.
+   * @param reduceFunc associative reduce function
+   * @param windowDuration width of the window; must be a multiple of this DStream's
+   *                       batching interval
+   * @param slideDuration  sliding interval of the window (i.e., the interval after which
+   *                       the new DStream will generate RDDs); must be a multiple of this
+   *                       DStream's batching interval
+   */
+  def reduceByKeyAndWindow(
+      reduceFunc: (V, V) => V,
+      windowDuration: Duration,
+      slideDuration: Duration
+    ): DStream[(K, V)] = {
+    reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())
+  }
+
+  /**
+   * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+   * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
+   * generate the RDDs with `numPartitions` partitions.
+   * @param reduceFunc associative reduce function
+   * @param windowDuration width of the window; must be a multiple of this DStream's
+   *                       batching interval
+   * @param slideDuration  sliding interval of the window (i.e., the interval after which
+   *                       the new DStream will generate RDDs); must be a multiple of this
+   *                       DStream's batching interval
+   * @param numPartitions  number of partitions of each RDD in the new DStream.
+   */
+  def reduceByKeyAndWindow(
+      reduceFunc: (V, V) => V,
+      windowDuration: Duration,
+      slideDuration: Duration,
+      numPartitions: Int
+    ): DStream[(K, V)] = {
+    reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions))
+  }
+
+  /**
+   * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to
+   * `DStream.reduceByKey()`, but applies it over a sliding window.
+   * @param reduceFunc associative reduce function
+   * @param windowDuration width of the window; must be a multiple of this DStream's
+   *                       batching interval
+   * @param slideDuration  sliding interval of the window (i.e., the interval after which
+   *                       the new DStream will generate RDDs); must be a multiple of this
+   *                       DStream's batching interval
+   * @param partitioner    partitioner for controlling the partitioning of each RDD
+   *                       in the new DStream.
+   */
+  def reduceByKeyAndWindow(
+      reduceFunc: (V, V) => V,
+      windowDuration: Duration,
+      slideDuration: Duration,
+      partitioner: Partitioner
+    ): DStream[(K, V)] = {
+    val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
+    self.reduceByKey(cleanedReduceFunc, partitioner)
+        .window(windowDuration, slideDuration)
+        .reduceByKey(cleanedReduceFunc, partitioner)
+  }
+
+  /**
+   * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
+   * The reduced value of over a new window is calculated using the old window's reduced value :
+   *  1. reduce the new values that entered the window (e.g., adding new counts)
+   *
+   *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+   *
+   * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
+   * However, it is applicable to only "invertible reduce functions".
+   * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+   * @param reduceFunc associative reduce function
+   * @param invReduceFunc inverse reduce function
+   * @param windowDuration width of the window; must be a multiple of this DStream's
+   *                       batching interval
+   * @param slideDuration  sliding interval of the window (i.e., the interval after which
+   *                       the new DStream will generate RDDs); must be a multiple of this
+   *                       DStream's batching interval
+   * @param filterFunc     Optional function to filter expired key-value pairs;
+   *                       only pairs that satisfy the function are retained
+   */
+  def reduceByKeyAndWindow(
+      reduceFunc: (V, V) => V,
+      invReduceFunc: (V, V) => V,
+      windowDuration: Duration,
+      slideDuration: Duration = self.slideDuration,
+      numPartitions: Int = ssc.sc.defaultParallelism,
+      filterFunc: ((K, V)) => Boolean = null
+    ): DStream[(K, V)] = {
+
+    reduceByKeyAndWindow(
+      reduceFunc, invReduceFunc, windowDuration,
+      slideDuration, defaultPartitioner(numPartitions), filterFunc
+    )
+  }
+
+  /**
+   * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
+   * The reduced value of over a new window is calculated using the old window's reduced value :
+   *  1. reduce the new values that entered the window (e.g., adding new counts)
+   *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+   * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
+   * However, it is applicable to only "invertible reduce functions".
+   * @param reduceFunc     associative reduce function
+   * @param invReduceFunc  inverse reduce function
+   * @param windowDuration width of the window; must be a multiple of this DStream's
+   *                       batching interval
+   * @param slideDuration  sliding interval of the window (i.e., the interval after which
+   *                       the new DStream will generate RDDs); must be a multiple of this
+   *                       DStream's batching interval
+   * @param partitioner    partitioner for controlling the partitioning of each RDD in the new DStream.
+   * @param filterFunc     Optional function to filter expired key-value pairs;
+   *                       only pairs that satisfy the function are retained
+   */
+  def reduceByKeyAndWindow(
+      reduceFunc: (V, V) => V,
+      invReduceFunc: (V, V) => V,
+      windowDuration: Duration,
+      slideDuration: Duration,
+      partitioner: Partitioner,
+      filterFunc: ((K, V)) => Boolean
+    ): DStream[(K, V)] = {
+
+    val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
+    val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
+    val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None
+    new ReducedWindowedDStream[K, V](
+      self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,
+      windowDuration, slideDuration, partitioner
+    )
+  }
+
+  /**
+   * Return a new "state" DStream where the state for each key is updated by applying
+   * the given function on the previous state of the key and the new values of each key.
+   * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+   * @param updateFunc State update function. If `this` function returns None, then
+   *                   corresponding state key-value pair will be eliminated.
+   * @tparam S State type
+   */
+  def updateStateByKey[S: ClassTag](
+      updateFunc: (Seq[V], Option[S]) => Option[S]
+    ): DStream[(K, S)] = {
+    updateStateByKey(updateFunc, defaultPartitioner())
+  }
+
+  /**
+   * Return a new "state" DStream where the state for each key is updated by applying
+   * the given function on the previous state of the key and the new values of each key.
+   * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+   * @param updateFunc State update function. If `this` function returns None, then
+   *                   corresponding state key-value pair will be eliminated.
+   * @param numPartitions Number of partitions of each RDD in the new DStream.
+   * @tparam S State type
+   */
+  def updateStateByKey[S: ClassTag](
+      updateFunc: (Seq[V], Option[S]) => Option[S],
+      numPartitions: Int
+    ): DStream[(K, S)] = {
+    updateStateByKey(updateFunc, defaultPartitioner(numPartitions))
+  }
+
+  /**
+   * Return a new "state" DStream where the state for each key is updated by applying
+   * the given function on the previous state of the key and the new values of the key.
+   * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+   * @param updateFunc State update function. If `this` function returns None, then
+   *                   corresponding state key-value pair will be eliminated.
+   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+   * @tparam S State type
+   */
+  def updateStateByKey[S: ClassTag](
+      updateFunc: (Seq[V], Option[S]) => Option[S],
+      partitioner: Partitioner
+    ): DStream[(K, S)] = {
+    val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
+      iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
+    }
+    updateStateByKey(newUpdateFunc, partitioner, true)
+  }
+
+  /**
+   * Return a new "state" DStream where the state for each key is updated by applying
+   * the given function on the previous state of the key and the new values of each key.
+   * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+   * @param updateFunc State update function. If `this` function returns None, then
+   *                   corresponding state key-value pair will be eliminated. Note, that
+   *                   this function may generate a different a tuple with a different key
+   *                   than the input key. It is up to the developer to decide whether to
+   *                   remember the partitioner despite the key being changed.
+   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+   * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
+   * @tparam S State type
+   */
+  def updateStateByKey[S: ClassTag](
+      updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
+      partitioner: Partitioner,
+      rememberPartitioner: Boolean
+    ): DStream[(K, S)] = {
+     new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner)
+  }
+
+  /**
+   * Return a new DStream by applying a map function to the value of each key-value pairs in
+   * 'this' DStream without changing the key.
+   */
+  def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = {
+    new MapValuedDStream[K, V, U](self, mapValuesFunc)
+  }
+
+  /**
+   * Return a new DStream by applying a flatmap function to the value of each key-value pairs in
+   * 'this' DStream without changing the key.
+   */
+  def flatMapValues[U: ClassTag](
+      flatMapValuesFunc: V => TraversableOnce[U]
+    ): DStream[(K, U)] = {
+    new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
+  }
+
+  /**
+   * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+   * Hash partitioning is used to generate the RDDs with Spark's default number
+   * of partitions.
+   */
+  def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
+    cogroup(other, defaultPartitioner())
+  }
+
+  /**
+   * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+   * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+   */
+  def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = {
+    cogroup(other, defaultPartitioner(numPartitions))
+  }
+
+  /**
+   * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+   * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs.
+   */
+  def cogroup[W: ClassTag](
+      other: DStream[(K, W)],
+      partitioner: Partitioner
+    ): DStream[(K, (Seq[V], Seq[W]))] = {
+    self.transformWith(
+      other,
+      (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner)
+    )
+  }
+
+  /**
+   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+   * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+   */
+  def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
+    join[W](other, defaultPartitioner())
+  }
+
+  /**
+   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+   * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+   */
+  def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = {
+    join[W](other, defaultPartitioner(numPartitions))
+  }
+
+  /**
+   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+   * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+   */
+  def join[W: ClassTag](
+      other: DStream[(K, W)],
+      partitioner: Partitioner
+    ): DStream[(K, (V, W))] = {
+    self.transformWith(
+      other,
+      (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner)
+    )
+  }
+
+  /**
+   * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+   * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+   * number of partitions.
+   */
+  def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = {
+    leftOuterJoin[W](other, defaultPartitioner())
+  }
+
+  /**
+   * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+   * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+   * partitions.
+   */
+  def leftOuterJoin[W: ClassTag](
+      other: DStream[(K, W)],
+      numPartitions: Int
+    ): DStream[(K, (V, Option[W]))] = {
+    leftOuterJoin[W](other, defaultPartitioner(numPartitions))
+  }
+
+  /**
+   * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+   * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+   * the partitioning of each RDD.
+   */
+  def leftOuterJoin[W: ClassTag](
+      other: DStream[(K, W)],
+      partitioner: Partitioner
+    ): DStream[(K, (V, Option[W]))] = {
+    self.transformWith(
+      other,
+      (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner)
+    )
+  }
+
+  /**
+   * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+   * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+   * number of partitions.
+   */
+  def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = {
+    rightOuterJoin[W](other, defaultPartitioner())
+  }
+
+  /**
+   * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+   * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+   * partitions.
+   */
+  def rightOuterJoin[W: ClassTag](
+      other: DStream[(K, W)],
+      numPartitions: Int
+    ): DStream[(K, (Option[V], W))] = {
+    rightOuterJoin[W](other, defaultPartitioner(numPartitions))
+  }
+
+  /**
+   * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+   * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+   * the partitioning of each RDD.
+   */
+  def rightOuterJoin[W: ClassTag](
+      other: DStream[(K, W)],
+      partitioner: Partitioner
+    ): DStream[(K, (Option[V], W))] = {
+    self.transformWith(
+      other,
+      (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner)
+    )
+  }
+
+  /**
+   * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
+   * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
+   */
+  def saveAsHadoopFiles[F <: OutputFormat[K, V]](
+      prefix: String,
+      suffix: String
+    )(implicit fm: ClassTag[F]) {
+    saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
+  }
+
+  /**
+   * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
+   * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
+   */
+  def saveAsHadoopFiles(
+      prefix: String,
+      suffix: String,
+      keyClass: Class[_],
+      valueClass: Class[_],
+      outputFormatClass: Class[_ <: OutputFormat[_, _]],
+      conf: JobConf = new JobConf
+    ) {  
+    val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
+      val file = rddToFileName(prefix, suffix, time)
+      rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
+    }
+    self.foreach(saveFunc)
+  }
+
+  /**
+   * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+   * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+   */
+  def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
+      prefix: String,
+      suffix: String
+    )(implicit fm: ClassTag[F])  {
+    saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
+  }
+
+  /**
+   * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+   * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+   */
+  def saveAsNewAPIHadoopFiles(
+      prefix: String,
+      suffix: String,
+      keyClass: Class[_],
+      valueClass: Class[_],
+      outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
+      conf: Configuration = new Configuration
+    ) {
+    val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
+      val file = rddToFileName(prefix, suffix, time)
+      rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
+    }
+    self.foreach(saveFunc)
+  }
+
+  private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass
+
+  private def getValueClass() = implicitly[ClassTag[V]].runtimeClass
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index db56345..7a6b1ea 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -26,7 +26,7 @@ import org.apache.spark.SparkContext._
 import org.apache.spark.storage.StorageLevel
 
 import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.streaming.{Duration, Interval, Time, DStream}
+import org.apache.spark.streaming.{Duration, Interval, Time}
 
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
index 84e69f2..880a89b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.dstream
 import org.apache.spark.Partitioner
 import org.apache.spark.rdd.RDD
 import org.apache.spark.SparkContext._
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
 import scala.reflect.ClassTag
 
 private[streaming]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index e0ff3cc..cc58329 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.Partitioner
 import org.apache.spark.SparkContext._
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Duration, Time, DStream}
+import org.apache.spark.streaming.{Duration, Time}
 
 import scala.reflect.ClassTag
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
index aeea060..7cd4554 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.streaming.dstream
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
 import scala.reflect.ClassTag
 
 private[streaming]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
index 0d84ec8..4ecba03 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
@@ -17,9 +17,8 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.rdd.RDD
-import collection.mutable.ArrayBuffer
 import org.apache.spark.rdd.UnionRDD
 
 import scala.collection.mutable.ArrayBuffer

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index 162b19d..e7403b5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.util
 import org.apache.spark.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming._
-import org.apache.spark.streaming.dstream.ForEachDStream
+import org.apache.spark.streaming.dstream.{DStream, ForEachDStream}
 import StreamingContext._
 
 import scala.util.Random

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 2e3a1e6..d293d20 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -24,6 +24,7 @@ import org.apache.spark.SparkContext._
 
 import util.ManualClock
 import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.streaming.dstream.DStream
 
 class BasicOperationsSuite extends TestSuiteBase {
   test("map") {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 9590bca..21a72e7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -26,7 +26,7 @@ import com.google.common.io.Files
 import org.apache.hadoop.fs.{Path, FileSystem}
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.dstream.FileInputDStream
+import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
 import org.apache.spark.streaming.util.ManualClock
 import org.apache.spark.util.Utils
 import org.apache.spark.SparkConf

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 9eb9b36..e0232c7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -23,6 +23,7 @@ import org.scalatest.concurrent.Timeouts
 import org.scalatest.time.SpanSugar._
 import org.apache.spark.{SparkException, SparkConf, SparkContext}
 import org.apache.spark.util.{Utils, MetadataCleaner}
+import org.apache.spark.streaming.dstream.DStream
 
 class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index fa64142..9e0f2c9 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming
 import org.apache.spark.streaming.scheduler._
 import scala.collection.mutable.ArrayBuffer
 import org.scalatest.matchers.ShouldMatchers
+import org.apache.spark.streaming.dstream.DStream
 
 class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 3569624..75093d6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming
 
-import org.apache.spark.streaming.dstream.{InputDStream, ForEachDStream}
+import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream}
 import org.apache.spark.streaming.util.ManualClock
 
 import scala.collection.mutable.ArrayBuffer

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/448aef67/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
index c39abfc..8f3c2dd 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.streaming
 
 import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.dstream.DStream
 
 class WindowOperationsSuite extends TestSuiteBase {
 


[7/9] git commit: Merge remote-tracking branch 'apache/master' into dstream-move

Posted by pw...@apache.org.
Merge remote-tracking branch 'apache/master' into dstream-move

Conflicts:
	streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/777c181d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/777c181d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/777c181d

Branch: refs/heads/master
Commit: 777c181d2f583570956724f9cbe20eb1dc7048f1
Parents: 034f89a 405bfe8
Author: Tathagata Das <ta...@gmail.com>
Authored: Sun Jan 12 21:59:51 2014 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Sun Jan 12 21:59:51 2014 -0800

----------------------------------------------------------------------
 docs/streaming-programming-guide.md             |  4 +--
 .../streaming/examples/RawNetworkGrep.scala     |  2 +-
 .../examples/RecoverableNetworkWordCount.scala  |  2 +-
 .../streaming/examples/TwitterAlgebirdCMS.scala |  4 +--
 .../streaming/examples/TwitterAlgebirdHLL.scala |  4 +--
 .../streaming/examples/TwitterPopularTags.scala |  4 +--
 .../examples/clickstream/PageViewStream.scala   |  2 +-
 .../streaming/api/java/JavaDStreamLike.scala    | 26 ++++++++++++++++--
 .../spark/streaming/dstream/DStream.scala       | 29 +++++++++++++++-----
 .../dstream/PairDStreamFunctions.scala          |  4 +--
 .../spark/streaming/BasicOperationsSuite.scala  |  2 +-
 .../spark/streaming/StreamingContextSuite.scala |  2 +-
 12 files changed, 61 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/777c181d/docs/streaming-programming-guide.md
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/777c181d/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
----------------------------------------------------------------------
diff --cc examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
index d51e6e9,d51e6e9..8c5d0bd
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
@@@ -82,7 -82,7 +82,7 @@@ object RecoverableNetworkWordCount 
      val lines = ssc.socketTextStream(ip, port)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
--    wordCounts.foreach((rdd: RDD[(String, Int)], time: Time) => {
++    wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
        val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]")
        println(counts)
        println("Appending to " + outputFile.getAbsolutePath)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/777c181d/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/777c181d/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 8014db6,0000000..a7c4cca
mode 100644,000000..100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@@ -1,742 -1,0 +1,757 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.spark.streaming.dstream
 +
++
++import scala.deprecated
 +import scala.collection.mutable.HashMap
 +import scala.reflect.ClassTag
 +
- import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
++import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
 +
 +import org.apache.spark.Logging
 +import org.apache.spark.rdd.RDD
 +import org.apache.spark.storage.StorageLevel
 +import org.apache.spark.util.MetadataCleaner
 +import org.apache.spark.streaming._
 +import org.apache.spark.streaming.StreamingContext._
 +import org.apache.spark.streaming.scheduler.Job
 +import org.apache.spark.streaming.Duration
 +
- 
 +/**
 + * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
 + * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]]
 + * for more details on RDDs). DStreams can either be created from live data (such as, data from
 + * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
 + * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
 + * DStream periodically generates a RDD, either from live data or by transforming the RDD generated
 + * by a parent DStream.
 + *
 + * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
 + * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains operations available
 + * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations
 + * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through
 + * implicit conversions when `spark.streaming.StreamingContext._` is imported.
 + *
 + * DStreams internally is characterized by a few basic properties:
 + *  - A list of other DStreams that the DStream depends on
 + *  - A time interval at which the DStream generates an RDD
 + *  - A function that is used to generate an RDD after each time interval
 + */
 +
 +abstract class DStream[T: ClassTag] (
 +    @transient private[streaming] var ssc: StreamingContext
 +  ) extends Serializable with Logging {
 +
 +  // =======================================================================
 +  // Methods that should be implemented by subclasses of DStream
 +  // =======================================================================
 +
 +  /** Time interval after which the DStream generates a RDD */
 +  def slideDuration: Duration
 +
 +  /** List of parent DStreams on which this DStream depends on */
 +  def dependencies: List[DStream[_]]
 +
 +  /** Method that generates a RDD for the given time */
 +  def compute (validTime: Time): Option[RDD[T]]
 +
 +  // =======================================================================
 +  // Methods and fields available on all DStreams
 +  // =======================================================================
 +
 +  // RDDs generated, marked as private[streaming] so that testsuites can access it
 +  @transient
 +  private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
 +
 +  // Time zero for the DStream
 +  private[streaming] var zeroTime: Time = null
 +
 +  // Duration for which the DStream will remember each RDD created
 +  private[streaming] var rememberDuration: Duration = null
 +
 +  // Storage level of the RDDs in the stream
 +  private[streaming] var storageLevel: StorageLevel = StorageLevel.NONE
 +
 +  // Checkpoint details
 +  private[streaming] val mustCheckpoint = false
 +  private[streaming] var checkpointDuration: Duration = null
 +  private[streaming] val checkpointData = new DStreamCheckpointData(this)
 +
 +  // Reference to whole DStream graph
 +  private[streaming] var graph: DStreamGraph = null
 +
 +  private[streaming] def isInitialized = (zeroTime != null)
 +
 +  // Duration for which the DStream requires its parent DStream to remember each RDD created
 +  private[streaming] def parentRememberDuration = rememberDuration
 +
 +  /** Return the StreamingContext associated with this DStream */
 +  def context = ssc
 +
 +  /** Persist the RDDs of this DStream with the given storage level */
 +  def persist(level: StorageLevel): DStream[T] = {
 +    if (this.isInitialized) {
 +      throw new UnsupportedOperationException(
 +        "Cannot change storage level of an DStream after streaming context has started")
 +    }
 +    this.storageLevel = level
 +    this
 +  }
 +
 +  /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
 +  def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)
 +
 +  /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
 +  def cache(): DStream[T] = persist()
 +
 +  /**
 +   * Enable periodic checkpointing of RDDs of this DStream
 +   * @param interval Time interval after which generated RDD will be checkpointed
 +   */
 +  def checkpoint(interval: Duration): DStream[T] = {
 +    if (isInitialized) {
 +      throw new UnsupportedOperationException(
 +        "Cannot change checkpoint interval of an DStream after streaming context has started")
 +    }
 +    persist()
 +    checkpointDuration = interval
 +    this
 +  }
 +
 +  /**
 +   * Initialize the DStream by setting the "zero" time, based on which
 +   * the validity of future times is calculated. This method also recursively initializes
 +   * its parent DStreams.
 +   */
 +  private[streaming] def initialize(time: Time) {
 +    if (zeroTime != null && zeroTime != time) {
 +      throw new Exception("ZeroTime is already initialized to " + zeroTime
 +        + ", cannot initialize it again to " + time)
 +    }
 +    zeroTime = time
 +
 +    // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
 +    if (mustCheckpoint && checkpointDuration == null) {
 +      checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
 +      logInfo("Checkpoint interval automatically set to " + checkpointDuration)
 +    }
 +
 +    // Set the minimum value of the rememberDuration if not already set
 +    var minRememberDuration = slideDuration
 +    if (checkpointDuration != null && minRememberDuration <= checkpointDuration) {
 +      minRememberDuration = checkpointDuration * 2  // times 2 just to be sure that the latest checkpoint is not forgetten
 +    }
 +    if (rememberDuration == null || rememberDuration < minRememberDuration) {
 +      rememberDuration = minRememberDuration
 +    }
 +
 +    // Initialize the dependencies
 +    dependencies.foreach(_.initialize(zeroTime))
 +  }
 +
 +  private[streaming] def validate() {
 +    assert(rememberDuration != null, "Remember duration is set to null")
 +
 +    assert(
 +      !mustCheckpoint || checkpointDuration != null,
 +      "The checkpoint interval for " + this.getClass.getSimpleName + " has not been set." +
 +        " Please use DStream.checkpoint() to set the interval."
 +    )
 +
 +    assert(
 +     checkpointDuration == null || context.sparkContext.checkpointDir.isDefined,
 +      "The checkpoint directory has not been set. Please use StreamingContext.checkpoint()" +
 +      " or SparkContext.checkpoint() to set the checkpoint directory."
 +    )
 +
 +    assert(
 +      checkpointDuration == null || checkpointDuration >= slideDuration,
 +      "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
 +        checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " +
 +        "Please set it to at least " + slideDuration + "."
 +    )
 +
 +    assert(
 +      checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration),
 +      "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
 +        checkpointDuration + " which not a multiple of its slide time (" + slideDuration + "). " +
 +        "Please set it to a multiple " + slideDuration + "."
 +    )
 +
 +    assert(
 +      checkpointDuration == null || storageLevel != StorageLevel.NONE,
 +      "" + this.getClass.getSimpleName + " has been marked for checkpointing but the storage " +
 +        "level has not been set to enable persisting. Please use DStream.persist() to set the " +
 +        "storage level to use memory for better checkpointing performance."
 +    )
 +
 +    assert(
 +      checkpointDuration == null || rememberDuration > checkpointDuration,
 +      "The remember duration for " + this.getClass.getSimpleName + " has been set to " +
 +        rememberDuration + " which is not more than the checkpoint interval (" +
 +        checkpointDuration + "). Please set it to higher than " + checkpointDuration + "."
 +    )
 +
 +    val metadataCleanerDelay = MetadataCleaner.getDelaySeconds(ssc.conf)
 +    logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
 +    assert(
 +      metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000,
 +      "It seems you are doing some DStream window operation or setting a checkpoint interval " +
 +        "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
 +        "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" +
 +        "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " +
 +        "set the Java property 'spark.cleaner.delay' to more than " +
 +        math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds."
 +    )
 +
 +    dependencies.foreach(_.validate())
 +
 +    logInfo("Slide time = " + slideDuration)
 +    logInfo("Storage level = " + storageLevel)
 +    logInfo("Checkpoint interval = " + checkpointDuration)
 +    logInfo("Remember duration = " + rememberDuration)
 +    logInfo("Initialized and validated " + this)
 +  }
 +
 +  private[streaming] def setContext(s: StreamingContext) {
 +    if (ssc != null && ssc != s) {
 +      throw new Exception("Context is already set in " + this + ", cannot set it again")
 +    }
 +    ssc = s
 +    logInfo("Set context for " + this)
 +    dependencies.foreach(_.setContext(ssc))
 +  }
 +
 +  private[streaming] def setGraph(g: DStreamGraph) {
 +    if (graph != null && graph != g) {
 +      throw new Exception("Graph is already set in " + this + ", cannot set it again")
 +    }
 +    graph = g
 +    dependencies.foreach(_.setGraph(graph))
 +  }
 +
 +  private[streaming] def remember(duration: Duration) {
 +    if (duration != null && duration > rememberDuration) {
 +      rememberDuration = duration
 +      logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this)
 +    }
 +    dependencies.foreach(_.remember(parentRememberDuration))
 +  }
 +
 +  /** Checks whether the 'time' is valid wrt slideDuration for generating RDD */
 +  private[streaming] def isTimeValid(time: Time): Boolean = {
 +    if (!isInitialized) {
 +      throw new Exception (this + " has not been initialized")
 +    } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
 +      logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
 +      false
 +    } else {
 +      logDebug("Time " + time + " is valid")
 +      true
 +    }
 +  }
 +
 +  /**
 +   * Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal
 +   * method that should not be called directly.
 +   */
 +  private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
 +    // If this DStream was not initialized (i.e., zeroTime not set), then do it
 +    // If RDD was already generated, then retrieve it from HashMap
 +    generatedRDDs.get(time) match {
 +
 +      // If an RDD was already generated and is being reused, then
 +      // probably all RDDs in this DStream will be reused and hence should be cached
 +      case Some(oldRDD) => Some(oldRDD)
 +
 +      // if RDD was not generated, and if the time is valid
 +      // (based on sliding time of this DStream), then generate the RDD
 +      case None => {
 +        if (isTimeValid(time)) {
 +          compute(time) match {
 +            case Some(newRDD) =>
 +              if (storageLevel != StorageLevel.NONE) {
 +                newRDD.persist(storageLevel)
 +                logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time)
 +              }
 +              if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
 +                newRDD.checkpoint()
 +                logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time)
 +              }
 +              generatedRDDs.put(time, newRDD)
 +              Some(newRDD)
 +            case None =>
 +              None
 +          }
 +        } else {
 +          None
 +        }
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Generate a SparkStreaming job for the given time. This is an internal method that
 +   * should not be called directly. This default implementation creates a job
 +   * that materializes the corresponding RDD. Subclasses of DStream may override this
 +   * to generate their own jobs.
 +   */
 +  private[streaming] def generateJob(time: Time): Option[Job] = {
 +    getOrCompute(time) match {
 +      case Some(rdd) => {
 +        val jobFunc = () => {
 +          val emptyFunc = { (iterator: Iterator[T]) => {} }
 +          context.sparkContext.runJob(rdd, emptyFunc)
 +        }
 +        Some(new Job(time, jobFunc))
 +      }
 +      case None => None
 +    }
 +  }
 +
 +  /**
 +   * Clear metadata that are older than `rememberDuration` of this DStream.
 +   * This is an internal method that should not be called directly. This default
 +   * implementation clears the old generated RDDs. Subclasses of DStream may override
 +   * this to clear their own metadata along with the generated RDDs.
 +   */
 +  private[streaming] def clearMetadata(time: Time) {
 +    val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
 +    generatedRDDs --= oldRDDs.keys
 +    logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
 +      (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
 +    dependencies.foreach(_.clearMetadata(time))
 +  }
 +
 +  /* Adds metadata to the Stream while it is running.
 +   * This method should be overwritten by sublcasses of InputDStream.
 +   */
 +  private[streaming] def addMetadata(metadata: Any) {
 +    if (metadata != null) {
 +      logInfo("Dropping Metadata: " + metadata.toString)
 +    }
 +  }
 +
 +  /**
 +   * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of
 +   * this stream. This is an internal method that should not be called directly. This is
 +   * a default implementation that saves only the file names of the checkpointed RDDs to
 +   * checkpointData. Subclasses of DStream (especially those of InputDStream) may override
 +   * this method to save custom checkpoint data.
 +   */
 +  private[streaming] def updateCheckpointData(currentTime: Time) {
 +    logDebug("Updating checkpoint data for time " + currentTime)
 +    checkpointData.update(currentTime)
 +    dependencies.foreach(_.updateCheckpointData(currentTime))
 +    logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData)
 +  }
 +
 +  private[streaming] def clearCheckpointData(time: Time) {
 +    logDebug("Clearing checkpoint data")
 +    checkpointData.cleanup(time)
 +    dependencies.foreach(_.clearCheckpointData(time))
 +    logDebug("Cleared checkpoint data")
 +  }
 +
 +  /**
 +   * Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method
 +   * that should not be called directly. This is a default implementation that recreates RDDs
 +   * from the checkpoint file names stored in checkpointData. Subclasses of DStream that
 +   * override the updateCheckpointData() method would also need to override this method.
 +   */
 +  private[streaming] def restoreCheckpointData() {
 +    // Create RDDs from the checkpoint data
 +    logInfo("Restoring checkpoint data")
 +    checkpointData.restore()
 +    dependencies.foreach(_.restoreCheckpointData())
 +    logInfo("Restored checkpoint data")
 +  }
 +
 +  @throws(classOf[IOException])
 +  private def writeObject(oos: ObjectOutputStream) {
 +    logDebug(this.getClass().getSimpleName + ".writeObject used")
 +    if (graph != null) {
 +      graph.synchronized {
 +        if (graph.checkpointInProgress) {
 +          oos.defaultWriteObject()
 +        } else {
 +          val msg = "Object of " + this.getClass.getName + " is being serialized " +
 +            " possibly as a part of closure of an RDD operation. This is because " +
 +            " the DStream object is being referred to from within the closure. " +
 +            " Please rewrite the RDD operation inside this DStream to avoid this. " +
 +            " This has been enforced to avoid bloating of Spark tasks " +
 +            " with unnecessary objects."
 +          throw new java.io.NotSerializableException(msg)
 +        }
 +      }
 +    } else {
 +      throw new java.io.NotSerializableException("Graph is unexpectedly null when DStream is being serialized.")
 +    }
 +  }
 +
 +  @throws(classOf[IOException])
 +  private def readObject(ois: ObjectInputStream) {
 +    logDebug(this.getClass().getSimpleName + ".readObject used")
 +    ois.defaultReadObject()
 +    generatedRDDs = new HashMap[Time, RDD[T]] ()
 +  }
 +
 +  // =======================================================================
 +  // DStream operations
 +  // =======================================================================
 +
 +  /** Return a new DStream by applying a function to all elements of this DStream. */
 +  def map[U: ClassTag](mapFunc: T => U): DStream[U] = {
 +    new MappedDStream(this, context.sparkContext.clean(mapFunc))
 +  }
 +
 +  /**
 +   * Return a new DStream by applying a function to all elements of this DStream,
 +   * and then flattening the results
 +   */
 +  def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = {
 +    new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
 +  }
 +
 +  /** Return a new DStream containing only the elements that satisfy a predicate. */
 +  def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc)
 +
 +  /**
 +   * Return a new DStream in which each RDD is generated by applying glom() to each RDD of
 +   * this DStream. Applying glom() to an RDD coalesces all elements within each partition into
 +   * an array.
 +   */
 +  def glom(): DStream[Array[T]] = new GlommedDStream(this)
 +
 +
 +  /**
 +   * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the
 +   * returned DStream has exactly numPartitions partitions.
 +   */
 +  def repartition(numPartitions: Int): DStream[T] = this.transform(_.repartition(numPartitions))
 +
 +  /**
 +   * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
 +   * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
 +   * of the RDD.
 +   */
 +  def mapPartitions[U: ClassTag](
 +      mapPartFunc: Iterator[T] => Iterator[U],
 +      preservePartitioning: Boolean = false
 +    ): DStream[U] = {
 +    new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD has a single element generated by reducing each RDD
 +   * of this DStream.
 +   */
 +  def reduce(reduceFunc: (T, T) => T): DStream[T] =
 +    this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
 +
 +  /**
 +   * Return a new DStream in which each RDD has a single element generated by counting each RDD
 +   * of this DStream.
 +   */
 +  def count(): DStream[Long] = {
 +    this.map(_ => (null, 1L))
 +        .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1)))
 +        .reduceByKey(_ + _)
 +        .map(_._2)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD contains the counts of each distinct value in
 +   * each RDD of this DStream. Hash partitioning is used to generate
 +   * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
 +   * `numPartitions` not specified).
 +   */
 +  def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)] =
 +    this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
 +
 +  /**
 +   * Apply a function to each RDD in this DStream. This is an output operator, so
 +   * 'this' DStream will be registered as an output stream and therefore materialized.
 +   */
-   def foreach(foreachFunc: RDD[T] => Unit) {
-     this.foreach((r: RDD[T], t: Time) => foreachFunc(r))
++  @deprecated("use foreachRDD", "0.9.0")
++  def foreach(foreachFunc: RDD[T] => Unit) = this.foreachRDD(foreachFunc)
++
++  /**
++   * Apply a function to each RDD in this DStream. This is an output operator, so
++   * 'this' DStream will be registered as an output stream and therefore materialized.
++   */
++  @deprecated("use foreachRDD", "0.9.0")
++  def foreach(foreachFunc: (RDD[T], Time) => Unit) = this.foreachRDD(foreachFunc)
++
++  /**
++   * Apply a function to each RDD in this DStream. This is an output operator, so
++   * 'this' DStream will be registered as an output stream and therefore materialized.
++   */
++  def foreachRDD(foreachFunc: RDD[T] => Unit) {
++    this.foreachRDD((r: RDD[T], t: Time) => foreachFunc(r))
 +  }
 +
 +  /**
 +   * Apply a function to each RDD in this DStream. This is an output operator, so
 +   * 'this' DStream will be registered as an output stream and therefore materialized.
 +   */
-   def foreach(foreachFunc: (RDD[T], Time) => Unit) {
++  def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
 +    ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc)))
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD is generated by applying a function
 +   * on each RDD of 'this' DStream.
 +   */
 +  def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
 +    transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r)))
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD is generated by applying a function
 +   * on each RDD of 'this' DStream.
 +   */
 +  def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
 +    //new TransformedDStream(this, context.sparkContext.clean(transformFunc))
 +    val cleanedF = context.sparkContext.clean(transformFunc)
 +    val realTransformFunc =  (rdds: Seq[RDD[_]], time: Time) => {
 +      assert(rdds.length == 1)
 +      cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
 +    }
 +    new TransformedDStream[U](Seq(this), realTransformFunc)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD is generated by applying a function
 +   * on each RDD of 'this' DStream and 'other' DStream.
 +   */
 +  def transformWith[U: ClassTag, V: ClassTag](
 +      other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
 +    ): DStream[V] = {
 +    val cleanedF = ssc.sparkContext.clean(transformFunc)
 +    transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2))
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD is generated by applying a function
 +   * on each RDD of 'this' DStream and 'other' DStream.
 +   */
 +  def transformWith[U: ClassTag, V: ClassTag](
 +      other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
 +    ): DStream[V] = {
 +    val cleanedF = ssc.sparkContext.clean(transformFunc)
 +    val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
 +      assert(rdds.length == 2)
 +      val rdd1 = rdds(0).asInstanceOf[RDD[T]]
 +      val rdd2 = rdds(1).asInstanceOf[RDD[U]]
 +      cleanedF(rdd1, rdd2, time)
 +    }
 +    new TransformedDStream[V](Seq(this, other), realTransformFunc)
 +  }
 +
 +  /**
 +   * Print the first ten elements of each RDD generated in this DStream. This is an output
 +   * operator, so this DStream will be registered as an output stream and there materialized.
 +   */
 +  def print() {
 +    def foreachFunc = (rdd: RDD[T], time: Time) => {
 +      val first11 = rdd.take(11)
 +      println ("-------------------------------------------")
 +      println ("Time: " + time)
 +      println ("-------------------------------------------")
 +      first11.take(10).foreach(println)
 +      if (first11.size > 10) println("...")
 +      println()
 +    }
 +    val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
 +    ssc.registerOutputStream(newStream)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD contains all the elements in seen in a
 +   * sliding window of time over this DStream. The new DStream generates RDDs with
 +   * the same interval as this DStream.
 +   * @param windowDuration width of the window; must be a multiple of this DStream's interval.
 +   */
 +  def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
 +
 +  /**
 +   * Return a new DStream in which each RDD contains all the elements in seen in a
 +   * sliding window of time over this DStream.
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   */
 +  def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = {
 +    new WindowedDStream(this, windowDuration, slideDuration)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD has a single element generated by reducing all
 +   * elements in a sliding window over this DStream.
 +   * @param reduceFunc associative reduce function
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   */
 +  def reduceByWindow(
 +      reduceFunc: (T, T) => T,
 +      windowDuration: Duration,
 +      slideDuration: Duration
 +    ): DStream[T] = {
 +    this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD has a single element generated by reducing all
 +   * elements in a sliding window over this DStream. However, the reduction is done incrementally
 +   * using the old window's reduced value :
 +   *  1. reduce the new values that entered the window (e.g., adding new counts)
 +   *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
 +   *  This is more efficient than reduceByWindow without "inverse reduce" function.
 +   *  However, it is applicable to only "invertible reduce functions".
 +   * @param reduceFunc associative reduce function
 +   * @param invReduceFunc inverse reduce function
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   */
 +  def reduceByWindow(
 +      reduceFunc: (T, T) => T,
 +      invReduceFunc: (T, T) => T,
 +      windowDuration: Duration,
 +      slideDuration: Duration
 +    ): DStream[T] = {
 +      this.map(x => (1, x))
 +          .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
 +          .map(_._2)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD has a single element generated by counting the number
 +   * of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with
 +   * Spark's default number of partitions.
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   */
 +  def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = {
 +    this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD contains the count of distinct elements in
 +   * RDDs in a sliding window over this DStream. Hash partitioning is used to generate
 +   * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
 +   * `numPartitions` not specified).
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   * @param numPartitions  number of partitions of each RDD in the new DStream.
 +   */
 +  def countByValueAndWindow(
 +      windowDuration: Duration,
 +      slideDuration: Duration,
 +      numPartitions: Int = ssc.sc.defaultParallelism
 +    ): DStream[(T, Long)] = {
 +
 +    this.map(x => (x, 1L)).reduceByKeyAndWindow(
 +      (x: Long, y: Long) => x + y,
 +      (x: Long, y: Long) => x - y,
 +      windowDuration,
 +      slideDuration,
 +      numPartitions,
 +      (x: (T, Long)) => x._2 != 0L
 +    )
 +  }
 +
 +  /**
 +   * Return a new DStream by unifying data of another DStream with this DStream.
 +   * @param that Another DStream having the same slideDuration as this DStream.
 +   */
 +  def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that))
 +
 +  /**
 +   * Return all the RDDs defined by the Interval object (both end times included)
 +   */
 +  def slice(interval: Interval): Seq[RDD[T]] = {
 +    slice(interval.beginTime, interval.endTime)
 +  }
 +
 +  /**
 +   * Return all the RDDs between 'fromTime' to 'toTime' (both included)
 +   */
 +  def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
 +    if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
 +      logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
 +    }
 +    if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
 +      logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
 +    }
 +    val alignedToTime = toTime.floor(slideDuration)
 +    val alignedFromTime = fromTime.floor(slideDuration)
 +
 +    logInfo("Slicing from " + fromTime + " to " + toTime +
 +      " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
 +
 +    alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
 +      if (time >= zeroTime) getOrCompute(time) else None
 +    })
 +  }
 +
 +  /**
 +   * Save each RDD in this DStream as a Sequence file of serialized objects.
 +   * The file name at each batch interval is generated based on `prefix` and
 +   * `suffix`: "prefix-TIME_IN_MS.suffix".
 +   */
 +  def saveAsObjectFiles(prefix: String, suffix: String = "") {
 +    val saveFunc = (rdd: RDD[T], time: Time) => {
 +      val file = rddToFileName(prefix, suffix, time)
 +      rdd.saveAsObjectFile(file)
 +    }
-     this.foreach(saveFunc)
++    this.foreachRDD(saveFunc)
 +  }
 +
 +  /**
 +   * Save each RDD in this DStream as at text file, using string representation
 +   * of elements. The file name at each batch interval is generated based on
 +   * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
 +   */
 +  def saveAsTextFiles(prefix: String, suffix: String = "") {
 +    val saveFunc = (rdd: RDD[T], time: Time) => {
 +      val file = rddToFileName(prefix, suffix, time)
 +      rdd.saveAsTextFile(file)
 +    }
-     this.foreach(saveFunc)
++    this.foreachRDD(saveFunc)
 +  }
 +
 +  def register() {
 +    ssc.registerOutputStream(this)
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/777c181d/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index f71dd17,0000000..6b3e483
mode 100644,000000..100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@@ -1,622 -1,0 +1,622 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.spark.streaming.dstream
 +
 +import org.apache.spark.streaming.StreamingContext._
 +import org.apache.spark.streaming.dstream._
 +
 +import org.apache.spark.{Partitioner, HashPartitioner}
 +import org.apache.spark.SparkContext._
 +import org.apache.spark.rdd.{ClassTags, RDD, PairRDDFunctions}
 +import org.apache.spark.storage.StorageLevel
 +
 +import scala.collection.mutable.ArrayBuffer
 +import scala.reflect.{ClassTag, classTag}
 +
 +import org.apache.hadoop.mapred.{JobConf, OutputFormat}
 +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
 +import org.apache.hadoop.mapred.OutputFormat
 +import org.apache.hadoop.security.UserGroupInformation
 +import org.apache.hadoop.conf.Configuration
 +import org.apache.spark.streaming.{Time, Duration}
 +
 +class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
 +extends Serializable {
 +
 +  private[streaming] def ssc = self.ssc
 +
 +  private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
 +    new HashPartitioner(numPartitions)
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
 +   * generate the RDDs with Spark's default number of partitions.
 +   */
 +  def groupByKey(): DStream[(K, Seq[V])] = {
 +    groupByKey(defaultPartitioner())
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
 +   * generate the RDDs with `numPartitions` partitions.
 +   */
 +  def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = {
 +    groupByKey(defaultPartitioner(numPartitions))
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `groupByKey` on each RDD. The supplied [[org.apache.spark.Partitioner]]
 +   * is used to control the partitioning of each RDD.
 +   */
 +  def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
 +    val createCombiner = (v: V) => ArrayBuffer[V](v)
 +    val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v)
 +    val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2)
 +    combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner)
 +      .asInstanceOf[DStream[(K, Seq[V])]]
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
 +   * merged using the associative reduce function. Hash partitioning is used to generate the RDDs
 +   * with Spark's default number of partitions.
 +   */
 +  def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = {
 +    reduceByKey(reduceFunc, defaultPartitioner())
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
 +   * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs
 +   * with `numPartitions` partitions.
 +   */
 +  def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = {
 +    reduceByKey(reduceFunc, defaultPartitioner(numPartitions))
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
 +   * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the
 +   * partitioning of each RDD.
 +   */
 +  def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = {
 +    val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
 +    combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner)
 +  }
 +
 +  /**
 +   * Combine elements of each key in DStream's RDDs using custom functions. This is similar to the
 +   * combineByKey for RDDs. Please refer to combineByKey in
 +   * [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
 +   */
 +  def combineByKey[C: ClassTag](
 +    createCombiner: V => C,
 +    mergeValue: (C, V) => C,
 +    mergeCombiner: (C, C) => C,
 +    partitioner: Partitioner,
 +    mapSideCombine: Boolean = true): DStream[(K, C)] = {
 +    new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to
 +   * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs
 +   * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with
 +   * Spark's default number of partitions.
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   */
 +  def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Seq[V])] = {
 +    groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner())
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `groupByKey` over a sliding window. Similar to
 +   * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to
 +   * generate the RDDs with Spark's default number of partitions.
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   */
 +  def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = {
 +    groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner())
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
 +   * Similar to `DStream.groupByKey()`, but applies it over a sliding window.
 +   * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   * @param numPartitions  number of partitions of each RDD in the new DStream; if not specified
 +   *                       then Spark's default number of partitions will be used
 +   */
 +  def groupByKeyAndWindow(
 +      windowDuration: Duration,
 +      slideDuration: Duration,
 +      numPartitions: Int
 +    ): DStream[(K, Seq[V])] = {
 +    groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions))
 +  }
 +
 +  /**
 +   * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
 +   * Similar to `DStream.groupByKey()`, but applies it over a sliding window.
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   * @param partitioner    partitioner for controlling the partitioning of each RDD in the new DStream.
 +   */
 +  def groupByKeyAndWindow(
 +      windowDuration: Duration,
 +      slideDuration: Duration,
 +      partitioner: Partitioner
 +    ): DStream[(K, Seq[V])] = {
 +    val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= v
 +    val mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= v
 +    val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2
 +    self.groupByKey(partitioner)
 +        .window(windowDuration, slideDuration)
 +        .combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner)
 +        .asInstanceOf[DStream[(K, Seq[V])]]
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
 +   * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream
 +   * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate
 +   * the RDDs with Spark's default number of partitions.
 +   * @param reduceFunc associative reduce function
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   */
 +  def reduceByKeyAndWindow(
 +      reduceFunc: (V, V) => V,
 +      windowDuration: Duration
 +    ): DStream[(K, V)] = {
 +    reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner())
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
 +   * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
 +   * generate the RDDs with Spark's default number of partitions.
 +   * @param reduceFunc associative reduce function
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   */
 +  def reduceByKeyAndWindow(
 +      reduceFunc: (V, V) => V,
 +      windowDuration: Duration,
 +      slideDuration: Duration
 +    ): DStream[(K, V)] = {
 +    reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
 +   * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
 +   * generate the RDDs with `numPartitions` partitions.
 +   * @param reduceFunc associative reduce function
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   * @param numPartitions  number of partitions of each RDD in the new DStream.
 +   */
 +  def reduceByKeyAndWindow(
 +      reduceFunc: (V, V) => V,
 +      windowDuration: Duration,
 +      slideDuration: Duration,
 +      numPartitions: Int
 +    ): DStream[(K, V)] = {
 +    reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions))
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to
 +   * `DStream.reduceByKey()`, but applies it over a sliding window.
 +   * @param reduceFunc associative reduce function
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   * @param partitioner    partitioner for controlling the partitioning of each RDD
 +   *                       in the new DStream.
 +   */
 +  def reduceByKeyAndWindow(
 +      reduceFunc: (V, V) => V,
 +      windowDuration: Duration,
 +      slideDuration: Duration,
 +      partitioner: Partitioner
 +    ): DStream[(K, V)] = {
 +    val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
 +    self.reduceByKey(cleanedReduceFunc, partitioner)
 +        .window(windowDuration, slideDuration)
 +        .reduceByKey(cleanedReduceFunc, partitioner)
 +  }
 +
 +  /**
 +   * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
 +   * The reduced value of over a new window is calculated using the old window's reduced value :
 +   *  1. reduce the new values that entered the window (e.g., adding new counts)
 +   *
 +   *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
 +   *
 +   * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
 +   * However, it is applicable to only "invertible reduce functions".
 +   * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
 +   * @param reduceFunc associative reduce function
 +   * @param invReduceFunc inverse reduce function
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   * @param filterFunc     Optional function to filter expired key-value pairs;
 +   *                       only pairs that satisfy the function are retained
 +   */
 +  def reduceByKeyAndWindow(
 +      reduceFunc: (V, V) => V,
 +      invReduceFunc: (V, V) => V,
 +      windowDuration: Duration,
 +      slideDuration: Duration = self.slideDuration,
 +      numPartitions: Int = ssc.sc.defaultParallelism,
 +      filterFunc: ((K, V)) => Boolean = null
 +    ): DStream[(K, V)] = {
 +
 +    reduceByKeyAndWindow(
 +      reduceFunc, invReduceFunc, windowDuration,
 +      slideDuration, defaultPartitioner(numPartitions), filterFunc
 +    )
 +  }
 +
 +  /**
 +   * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
 +   * The reduced value of over a new window is calculated using the old window's reduced value :
 +   *  1. reduce the new values that entered the window (e.g., adding new counts)
 +   *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
 +   * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
 +   * However, it is applicable to only "invertible reduce functions".
 +   * @param reduceFunc     associative reduce function
 +   * @param invReduceFunc  inverse reduce function
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   * @param partitioner    partitioner for controlling the partitioning of each RDD in the new DStream.
 +   * @param filterFunc     Optional function to filter expired key-value pairs;
 +   *                       only pairs that satisfy the function are retained
 +   */
 +  def reduceByKeyAndWindow(
 +      reduceFunc: (V, V) => V,
 +      invReduceFunc: (V, V) => V,
 +      windowDuration: Duration,
 +      slideDuration: Duration,
 +      partitioner: Partitioner,
 +      filterFunc: ((K, V)) => Boolean
 +    ): DStream[(K, V)] = {
 +
 +    val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
 +    val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
 +    val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None
 +    new ReducedWindowedDStream[K, V](
 +      self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,
 +      windowDuration, slideDuration, partitioner
 +    )
 +  }
 +
 +  /**
 +   * Return a new "state" DStream where the state for each key is updated by applying
 +   * the given function on the previous state of the key and the new values of each key.
 +   * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
 +   * @param updateFunc State update function. If `this` function returns None, then
 +   *                   corresponding state key-value pair will be eliminated.
 +   * @tparam S State type
 +   */
 +  def updateStateByKey[S: ClassTag](
 +      updateFunc: (Seq[V], Option[S]) => Option[S]
 +    ): DStream[(K, S)] = {
 +    updateStateByKey(updateFunc, defaultPartitioner())
 +  }
 +
 +  /**
 +   * Return a new "state" DStream where the state for each key is updated by applying
 +   * the given function on the previous state of the key and the new values of each key.
 +   * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
 +   * @param updateFunc State update function. If `this` function returns None, then
 +   *                   corresponding state key-value pair will be eliminated.
 +   * @param numPartitions Number of partitions of each RDD in the new DStream.
 +   * @tparam S State type
 +   */
 +  def updateStateByKey[S: ClassTag](
 +      updateFunc: (Seq[V], Option[S]) => Option[S],
 +      numPartitions: Int
 +    ): DStream[(K, S)] = {
 +    updateStateByKey(updateFunc, defaultPartitioner(numPartitions))
 +  }
 +
 +  /**
 +   * Return a new "state" DStream where the state for each key is updated by applying
 +   * the given function on the previous state of the key and the new values of the key.
 +   * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
 +   * @param updateFunc State update function. If `this` function returns None, then
 +   *                   corresponding state key-value pair will be eliminated.
 +   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
 +   * @tparam S State type
 +   */
 +  def updateStateByKey[S: ClassTag](
 +      updateFunc: (Seq[V], Option[S]) => Option[S],
 +      partitioner: Partitioner
 +    ): DStream[(K, S)] = {
 +    val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
 +      iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
 +    }
 +    updateStateByKey(newUpdateFunc, partitioner, true)
 +  }
 +
 +  /**
 +   * Return a new "state" DStream where the state for each key is updated by applying
 +   * the given function on the previous state of the key and the new values of each key.
 +   * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
 +   * @param updateFunc State update function. If `this` function returns None, then
 +   *                   corresponding state key-value pair will be eliminated. Note, that
 +   *                   this function may generate a different a tuple with a different key
 +   *                   than the input key. It is up to the developer to decide whether to
 +   *                   remember the partitioner despite the key being changed.
 +   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
 +   * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
 +   * @tparam S State type
 +   */
 +  def updateStateByKey[S: ClassTag](
 +      updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
 +      partitioner: Partitioner,
 +      rememberPartitioner: Boolean
 +    ): DStream[(K, S)] = {
 +     new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner)
 +  }
 +
 +  /**
 +   * Return a new DStream by applying a map function to the value of each key-value pairs in
 +   * 'this' DStream without changing the key.
 +   */
 +  def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = {
 +    new MapValuedDStream[K, V, U](self, mapValuesFunc)
 +  }
 +
 +  /**
 +   * Return a new DStream by applying a flatmap function to the value of each key-value pairs in
 +   * 'this' DStream without changing the key.
 +   */
 +  def flatMapValues[U: ClassTag](
 +      flatMapValuesFunc: V => TraversableOnce[U]
 +    ): DStream[(K, U)] = {
 +    new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
 +   * Hash partitioning is used to generate the RDDs with Spark's default number
 +   * of partitions.
 +   */
 +  def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
 +    cogroup(other, defaultPartitioner())
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
 +   * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
 +   */
 +  def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = {
 +    cogroup(other, defaultPartitioner(numPartitions))
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
 +   * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs.
 +   */
 +  def cogroup[W: ClassTag](
 +      other: DStream[(K, W)],
 +      partitioner: Partitioner
 +    ): DStream[(K, (Seq[V], Seq[W]))] = {
 +    self.transformWith(
 +      other,
 +      (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner)
 +    )
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
 +   * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
 +   */
 +  def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
 +    join[W](other, defaultPartitioner())
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
 +   * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
 +   */
 +  def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = {
 +    join[W](other, defaultPartitioner(numPartitions))
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
 +   * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
 +   */
 +  def join[W: ClassTag](
 +      other: DStream[(K, W)],
 +      partitioner: Partitioner
 +    ): DStream[(K, (V, W))] = {
 +    self.transformWith(
 +      other,
 +      (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner)
 +    )
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
 +   * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
 +   * number of partitions.
 +   */
 +  def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = {
 +    leftOuterJoin[W](other, defaultPartitioner())
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
 +   * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
 +   * partitions.
 +   */
 +  def leftOuterJoin[W: ClassTag](
 +      other: DStream[(K, W)],
 +      numPartitions: Int
 +    ): DStream[(K, (V, Option[W]))] = {
 +    leftOuterJoin[W](other, defaultPartitioner(numPartitions))
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
 +   * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
 +   * the partitioning of each RDD.
 +   */
 +  def leftOuterJoin[W: ClassTag](
 +      other: DStream[(K, W)],
 +      partitioner: Partitioner
 +    ): DStream[(K, (V, Option[W]))] = {
 +    self.transformWith(
 +      other,
 +      (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner)
 +    )
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
 +   * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
 +   * number of partitions.
 +   */
 +  def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = {
 +    rightOuterJoin[W](other, defaultPartitioner())
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
 +   * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
 +   * partitions.
 +   */
 +  def rightOuterJoin[W: ClassTag](
 +      other: DStream[(K, W)],
 +      numPartitions: Int
 +    ): DStream[(K, (Option[V], W))] = {
 +    rightOuterJoin[W](other, defaultPartitioner(numPartitions))
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
 +   * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
 +   * the partitioning of each RDD.
 +   */
 +  def rightOuterJoin[W: ClassTag](
 +      other: DStream[(K, W)],
 +      partitioner: Partitioner
 +    ): DStream[(K, (Option[V], W))] = {
 +    self.transformWith(
 +      other,
 +      (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner)
 +    )
 +  }
 +
 +  /**
 +   * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
 +   * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
 +   */
 +  def saveAsHadoopFiles[F <: OutputFormat[K, V]](
 +      prefix: String,
 +      suffix: String
 +    )(implicit fm: ClassTag[F]) {
 +    saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
 +  }
 +
 +  /**
 +   * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
 +   * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
 +   */
 +  def saveAsHadoopFiles(
 +      prefix: String,
 +      suffix: String,
 +      keyClass: Class[_],
 +      valueClass: Class[_],
 +      outputFormatClass: Class[_ <: OutputFormat[_, _]],
 +      conf: JobConf = new JobConf
 +    ) {  
 +    val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
 +      val file = rddToFileName(prefix, suffix, time)
 +      rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
 +    }
-     self.foreach(saveFunc)
++    self.foreachRDD(saveFunc)
 +  }
 +
 +  /**
 +   * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
 +   * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
 +   */
 +  def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
 +      prefix: String,
 +      suffix: String
 +    )(implicit fm: ClassTag[F])  {
 +    saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
 +  }
 +
 +  /**
 +   * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
 +   * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
 +   */
 +  def saveAsNewAPIHadoopFiles(
 +      prefix: String,
 +      suffix: String,
 +      keyClass: Class[_],
 +      valueClass: Class[_],
 +      outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
 +      conf: Configuration = new Configuration
 +    ) {
 +    val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
 +      val file = rddToFileName(prefix, suffix, time)
 +      rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
 +    }
-     self.foreach(saveFunc)
++    self.foreachRDD(saveFunc)
 +  }
 +
 +  private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass
 +
 +  private def getValueClass() = implicitly[ClassTag[V]].runtimeClass
 +}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/777c181d/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/777c181d/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --cc streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index a4d0f9f,a477d20..f7f3346
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@@ -187,7 -186,7 +187,7 @@@ class StreamingContextSuite extends Fun
      ssc = new StreamingContext(master, appName, batchDuration)
      val inputStream = addInputStream(ssc)
      inputStream.map(x => { throw new TestException("error in map task"); x})
--               .foreach(_.count)
++               .foreachRDD(_.count)
  
      val exception = intercept[Exception] {
        ssc.start()


[4/9] git commit: Merge branch 'error-handling' into dstream-move

Posted by pw...@apache.org.
Merge branch 'error-handling' into dstream-move


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d1820fef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d1820fef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d1820fef

Branch: refs/heads/master
Commit: d1820fef574e8f559d8fba3995e21216033be303
Parents: 448aef6 c7fabb7
Author: Tathagata Das <ta...@gmail.com>
Authored: Sun Jan 12 17:36:49 2014 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Sun Jan 12 17:36:49 2014 -0800

----------------------------------------------------------------------
 .../spark/streaming/StreamingContext.scala      |  6 +--
 .../api/java/JavaStreamingContext.scala         |  6 +--
 .../spark/streaming/dstream/DStream.scala       |  6 +--
 .../streaming/scheduler/JobGenerator.scala      | 43 ++++++++++++--------
 .../streaming/scheduler/JobScheduler.scala      | 27 +++++++-----
 .../scheduler/NetworkInputTracker.scala         | 24 ++++++++---
 .../streaming/scheduler/StreamingListener.scala |  3 +-
 .../scheduler/StreamingListenerBus.scala        | 21 +++++++---
 .../spark/streaming/util/RecurringTimer.scala   | 13 +-----
 .../spark/streaming/StreamingContextSuite.scala | 26 ++++++++----
 .../apache/spark/streaming/TestSuiteBase.scala  |  5 ++-
 11 files changed, 110 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d1820fef/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d1820fef/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index fd72ebc,0000000..8014db6
mode 100644,000000..100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@@ -1,742 -1,0 +1,742 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.spark.streaming.dstream
 +
 +import scala.collection.mutable.HashMap
 +import scala.reflect.ClassTag
 +
 +import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
 +
 +import org.apache.spark.Logging
 +import org.apache.spark.rdd.RDD
 +import org.apache.spark.storage.StorageLevel
 +import org.apache.spark.util.MetadataCleaner
 +import org.apache.spark.streaming._
 +import org.apache.spark.streaming.StreamingContext._
 +import org.apache.spark.streaming.scheduler.Job
 +import org.apache.spark.streaming.Duration
 +
 +
 +/**
 + * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
 + * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]]
 + * for more details on RDDs). DStreams can either be created from live data (such as, data from
 + * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
 + * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
 + * DStream periodically generates a RDD, either from live data or by transforming the RDD generated
 + * by a parent DStream.
 + *
 + * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
 + * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains operations available
 + * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations
 + * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through
 + * implicit conversions when `spark.streaming.StreamingContext._` is imported.
 + *
 + * DStreams internally is characterized by a few basic properties:
 + *  - A list of other DStreams that the DStream depends on
 + *  - A time interval at which the DStream generates an RDD
 + *  - A function that is used to generate an RDD after each time interval
 + */
 +
 +abstract class DStream[T: ClassTag] (
 +    @transient private[streaming] var ssc: StreamingContext
 +  ) extends Serializable with Logging {
 +
 +  // =======================================================================
 +  // Methods that should be implemented by subclasses of DStream
 +  // =======================================================================
 +
 +  /** Time interval after which the DStream generates a RDD */
 +  def slideDuration: Duration
 +
 +  /** List of parent DStreams on which this DStream depends on */
 +  def dependencies: List[DStream[_]]
 +
 +  /** Method that generates a RDD for the given time */
 +  def compute (validTime: Time): Option[RDD[T]]
 +
 +  // =======================================================================
 +  // Methods and fields available on all DStreams
 +  // =======================================================================
 +
 +  // RDDs generated, marked as private[streaming] so that testsuites can access it
 +  @transient
 +  private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
 +
 +  // Time zero for the DStream
 +  private[streaming] var zeroTime: Time = null
 +
 +  // Duration for which the DStream will remember each RDD created
 +  private[streaming] var rememberDuration: Duration = null
 +
 +  // Storage level of the RDDs in the stream
 +  private[streaming] var storageLevel: StorageLevel = StorageLevel.NONE
 +
 +  // Checkpoint details
 +  private[streaming] val mustCheckpoint = false
 +  private[streaming] var checkpointDuration: Duration = null
 +  private[streaming] val checkpointData = new DStreamCheckpointData(this)
 +
 +  // Reference to whole DStream graph
 +  private[streaming] var graph: DStreamGraph = null
 +
 +  private[streaming] def isInitialized = (zeroTime != null)
 +
 +  // Duration for which the DStream requires its parent DStream to remember each RDD created
 +  private[streaming] def parentRememberDuration = rememberDuration
 +
 +  /** Return the StreamingContext associated with this DStream */
 +  def context = ssc
 +
 +  /** Persist the RDDs of this DStream with the given storage level */
 +  def persist(level: StorageLevel): DStream[T] = {
 +    if (this.isInitialized) {
 +      throw new UnsupportedOperationException(
 +        "Cannot change storage level of an DStream after streaming context has started")
 +    }
 +    this.storageLevel = level
 +    this
 +  }
 +
 +  /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
 +  def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)
 +
 +  /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
 +  def cache(): DStream[T] = persist()
 +
 +  /**
 +   * Enable periodic checkpointing of RDDs of this DStream
 +   * @param interval Time interval after which generated RDD will be checkpointed
 +   */
 +  def checkpoint(interval: Duration): DStream[T] = {
 +    if (isInitialized) {
 +      throw new UnsupportedOperationException(
 +        "Cannot change checkpoint interval of an DStream after streaming context has started")
 +    }
 +    persist()
 +    checkpointDuration = interval
 +    this
 +  }
 +
 +  /**
 +   * Initialize the DStream by setting the "zero" time, based on which
 +   * the validity of future times is calculated. This method also recursively initializes
 +   * its parent DStreams.
 +   */
 +  private[streaming] def initialize(time: Time) {
 +    if (zeroTime != null && zeroTime != time) {
 +      throw new Exception("ZeroTime is already initialized to " + zeroTime
 +        + ", cannot initialize it again to " + time)
 +    }
 +    zeroTime = time
 +
 +    // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
 +    if (mustCheckpoint && checkpointDuration == null) {
 +      checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
 +      logInfo("Checkpoint interval automatically set to " + checkpointDuration)
 +    }
 +
 +    // Set the minimum value of the rememberDuration if not already set
 +    var minRememberDuration = slideDuration
 +    if (checkpointDuration != null && minRememberDuration <= checkpointDuration) {
 +      minRememberDuration = checkpointDuration * 2  // times 2 just to be sure that the latest checkpoint is not forgetten
 +    }
 +    if (rememberDuration == null || rememberDuration < minRememberDuration) {
 +      rememberDuration = minRememberDuration
 +    }
 +
 +    // Initialize the dependencies
 +    dependencies.foreach(_.initialize(zeroTime))
 +  }
 +
 +  private[streaming] def validate() {
 +    assert(rememberDuration != null, "Remember duration is set to null")
 +
 +    assert(
 +      !mustCheckpoint || checkpointDuration != null,
 +      "The checkpoint interval for " + this.getClass.getSimpleName + " has not been set." +
 +        " Please use DStream.checkpoint() to set the interval."
 +    )
 +
 +    assert(
 +     checkpointDuration == null || context.sparkContext.checkpointDir.isDefined,
 +      "The checkpoint directory has not been set. Please use StreamingContext.checkpoint()" +
 +      " or SparkContext.checkpoint() to set the checkpoint directory."
 +    )
 +
 +    assert(
 +      checkpointDuration == null || checkpointDuration >= slideDuration,
 +      "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
 +        checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " +
 +        "Please set it to at least " + slideDuration + "."
 +    )
 +
 +    assert(
 +      checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration),
 +      "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
 +        checkpointDuration + " which not a multiple of its slide time (" + slideDuration + "). " +
 +        "Please set it to a multiple " + slideDuration + "."
 +    )
 +
 +    assert(
 +      checkpointDuration == null || storageLevel != StorageLevel.NONE,
 +      "" + this.getClass.getSimpleName + " has been marked for checkpointing but the storage " +
 +        "level has not been set to enable persisting. Please use DStream.persist() to set the " +
 +        "storage level to use memory for better checkpointing performance."
 +    )
 +
 +    assert(
 +      checkpointDuration == null || rememberDuration > checkpointDuration,
 +      "The remember duration for " + this.getClass.getSimpleName + " has been set to " +
 +        rememberDuration + " which is not more than the checkpoint interval (" +
 +        checkpointDuration + "). Please set it to higher than " + checkpointDuration + "."
 +    )
 +
 +    val metadataCleanerDelay = MetadataCleaner.getDelaySeconds(ssc.conf)
 +    logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
 +    assert(
 +      metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000,
 +      "It seems you are doing some DStream window operation or setting a checkpoint interval " +
 +        "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
 +        "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" +
 +        "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " +
 +        "set the Java property 'spark.cleaner.delay' to more than " +
 +        math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds."
 +    )
 +
 +    dependencies.foreach(_.validate())
 +
 +    logInfo("Slide time = " + slideDuration)
 +    logInfo("Storage level = " + storageLevel)
 +    logInfo("Checkpoint interval = " + checkpointDuration)
 +    logInfo("Remember duration = " + rememberDuration)
 +    logInfo("Initialized and validated " + this)
 +  }
 +
 +  private[streaming] def setContext(s: StreamingContext) {
 +    if (ssc != null && ssc != s) {
 +      throw new Exception("Context is already set in " + this + ", cannot set it again")
 +    }
 +    ssc = s
 +    logInfo("Set context for " + this)
 +    dependencies.foreach(_.setContext(ssc))
 +  }
 +
 +  private[streaming] def setGraph(g: DStreamGraph) {
 +    if (graph != null && graph != g) {
 +      throw new Exception("Graph is already set in " + this + ", cannot set it again")
 +    }
 +    graph = g
 +    dependencies.foreach(_.setGraph(graph))
 +  }
 +
 +  private[streaming] def remember(duration: Duration) {
 +    if (duration != null && duration > rememberDuration) {
 +      rememberDuration = duration
 +      logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this)
 +    }
 +    dependencies.foreach(_.remember(parentRememberDuration))
 +  }
 +
 +  /** Checks whether the 'time' is valid wrt slideDuration for generating RDD */
 +  private[streaming] def isTimeValid(time: Time): Boolean = {
 +    if (!isInitialized) {
 +      throw new Exception (this + " has not been initialized")
 +    } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
 +      logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
 +      false
 +    } else {
 +      logDebug("Time " + time + " is valid")
 +      true
 +    }
 +  }
 +
 +  /**
 +   * Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal
 +   * method that should not be called directly.
 +   */
 +  private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
 +    // If this DStream was not initialized (i.e., zeroTime not set), then do it
 +    // If RDD was already generated, then retrieve it from HashMap
 +    generatedRDDs.get(time) match {
 +
 +      // If an RDD was already generated and is being reused, then
 +      // probably all RDDs in this DStream will be reused and hence should be cached
 +      case Some(oldRDD) => Some(oldRDD)
 +
 +      // if RDD was not generated, and if the time is valid
 +      // (based on sliding time of this DStream), then generate the RDD
 +      case None => {
 +        if (isTimeValid(time)) {
 +          compute(time) match {
 +            case Some(newRDD) =>
 +              if (storageLevel != StorageLevel.NONE) {
 +                newRDD.persist(storageLevel)
 +                logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time)
 +              }
 +              if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
 +                newRDD.checkpoint()
 +                logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time)
 +              }
 +              generatedRDDs.put(time, newRDD)
 +              Some(newRDD)
 +            case None =>
 +              None
 +          }
 +        } else {
 +          None
 +        }
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Generate a SparkStreaming job for the given time. This is an internal method that
 +   * should not be called directly. This default implementation creates a job
 +   * that materializes the corresponding RDD. Subclasses of DStream may override this
 +   * to generate their own jobs.
 +   */
 +  private[streaming] def generateJob(time: Time): Option[Job] = {
 +    getOrCompute(time) match {
 +      case Some(rdd) => {
 +        val jobFunc = () => {
 +          val emptyFunc = { (iterator: Iterator[T]) => {} }
 +          context.sparkContext.runJob(rdd, emptyFunc)
 +        }
 +        Some(new Job(time, jobFunc))
 +      }
 +      case None => None
 +    }
 +  }
 +
 +  /**
 +   * Clear metadata that are older than `rememberDuration` of this DStream.
 +   * This is an internal method that should not be called directly. This default
 +   * implementation clears the old generated RDDs. Subclasses of DStream may override
 +   * this to clear their own metadata along with the generated RDDs.
 +   */
 +  private[streaming] def clearMetadata(time: Time) {
 +    val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
 +    generatedRDDs --= oldRDDs.keys
 +    logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
 +      (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
 +    dependencies.foreach(_.clearMetadata(time))
 +  }
 +
 +  /* Adds metadata to the Stream while it is running.
 +   * This method should be overwritten by sublcasses of InputDStream.
 +   */
 +  private[streaming] def addMetadata(metadata: Any) {
 +    if (metadata != null) {
 +      logInfo("Dropping Metadata: " + metadata.toString)
 +    }
 +  }
 +
 +  /**
 +   * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of
 +   * this stream. This is an internal method that should not be called directly. This is
 +   * a default implementation that saves only the file names of the checkpointed RDDs to
 +   * checkpointData. Subclasses of DStream (especially those of InputDStream) may override
 +   * this method to save custom checkpoint data.
 +   */
 +  private[streaming] def updateCheckpointData(currentTime: Time) {
-     logInfo("Updating checkpoint data for time " + currentTime)
++    logDebug("Updating checkpoint data for time " + currentTime)
 +    checkpointData.update(currentTime)
 +    dependencies.foreach(_.updateCheckpointData(currentTime))
 +    logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData)
 +  }
 +
 +  private[streaming] def clearCheckpointData(time: Time) {
-     logInfo("Clearing checkpoint data")
++    logDebug("Clearing checkpoint data")
 +    checkpointData.cleanup(time)
 +    dependencies.foreach(_.clearCheckpointData(time))
-     logInfo("Cleared checkpoint data")
++    logDebug("Cleared checkpoint data")
 +  }
 +
 +  /**
 +   * Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method
 +   * that should not be called directly. This is a default implementation that recreates RDDs
 +   * from the checkpoint file names stored in checkpointData. Subclasses of DStream that
 +   * override the updateCheckpointData() method would also need to override this method.
 +   */
 +  private[streaming] def restoreCheckpointData() {
 +    // Create RDDs from the checkpoint data
 +    logInfo("Restoring checkpoint data")
 +    checkpointData.restore()
 +    dependencies.foreach(_.restoreCheckpointData())
 +    logInfo("Restored checkpoint data")
 +  }
 +
 +  @throws(classOf[IOException])
 +  private def writeObject(oos: ObjectOutputStream) {
 +    logDebug(this.getClass().getSimpleName + ".writeObject used")
 +    if (graph != null) {
 +      graph.synchronized {
 +        if (graph.checkpointInProgress) {
 +          oos.defaultWriteObject()
 +        } else {
 +          val msg = "Object of " + this.getClass.getName + " is being serialized " +
 +            " possibly as a part of closure of an RDD operation. This is because " +
 +            " the DStream object is being referred to from within the closure. " +
 +            " Please rewrite the RDD operation inside this DStream to avoid this. " +
 +            " This has been enforced to avoid bloating of Spark tasks " +
 +            " with unnecessary objects."
 +          throw new java.io.NotSerializableException(msg)
 +        }
 +      }
 +    } else {
 +      throw new java.io.NotSerializableException("Graph is unexpectedly null when DStream is being serialized.")
 +    }
 +  }
 +
 +  @throws(classOf[IOException])
 +  private def readObject(ois: ObjectInputStream) {
 +    logDebug(this.getClass().getSimpleName + ".readObject used")
 +    ois.defaultReadObject()
 +    generatedRDDs = new HashMap[Time, RDD[T]] ()
 +  }
 +
 +  // =======================================================================
 +  // DStream operations
 +  // =======================================================================
 +
 +  /** Return a new DStream by applying a function to all elements of this DStream. */
 +  def map[U: ClassTag](mapFunc: T => U): DStream[U] = {
 +    new MappedDStream(this, context.sparkContext.clean(mapFunc))
 +  }
 +
 +  /**
 +   * Return a new DStream by applying a function to all elements of this DStream,
 +   * and then flattening the results
 +   */
 +  def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = {
 +    new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
 +  }
 +
 +  /** Return a new DStream containing only the elements that satisfy a predicate. */
 +  def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc)
 +
 +  /**
 +   * Return a new DStream in which each RDD is generated by applying glom() to each RDD of
 +   * this DStream. Applying glom() to an RDD coalesces all elements within each partition into
 +   * an array.
 +   */
 +  def glom(): DStream[Array[T]] = new GlommedDStream(this)
 +
 +
 +  /**
 +   * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the
 +   * returned DStream has exactly numPartitions partitions.
 +   */
 +  def repartition(numPartitions: Int): DStream[T] = this.transform(_.repartition(numPartitions))
 +
 +  /**
 +   * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
 +   * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
 +   * of the RDD.
 +   */
 +  def mapPartitions[U: ClassTag](
 +      mapPartFunc: Iterator[T] => Iterator[U],
 +      preservePartitioning: Boolean = false
 +    ): DStream[U] = {
 +    new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD has a single element generated by reducing each RDD
 +   * of this DStream.
 +   */
 +  def reduce(reduceFunc: (T, T) => T): DStream[T] =
 +    this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
 +
 +  /**
 +   * Return a new DStream in which each RDD has a single element generated by counting each RDD
 +   * of this DStream.
 +   */
 +  def count(): DStream[Long] = {
 +    this.map(_ => (null, 1L))
 +        .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1)))
 +        .reduceByKey(_ + _)
 +        .map(_._2)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD contains the counts of each distinct value in
 +   * each RDD of this DStream. Hash partitioning is used to generate
 +   * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
 +   * `numPartitions` not specified).
 +   */
 +  def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)] =
 +    this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
 +
 +  /**
 +   * Apply a function to each RDD in this DStream. This is an output operator, so
 +   * 'this' DStream will be registered as an output stream and therefore materialized.
 +   */
 +  def foreach(foreachFunc: RDD[T] => Unit) {
 +    this.foreach((r: RDD[T], t: Time) => foreachFunc(r))
 +  }
 +
 +  /**
 +   * Apply a function to each RDD in this DStream. This is an output operator, so
 +   * 'this' DStream will be registered as an output stream and therefore materialized.
 +   */
 +  def foreach(foreachFunc: (RDD[T], Time) => Unit) {
 +    ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc)))
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD is generated by applying a function
 +   * on each RDD of 'this' DStream.
 +   */
 +  def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
 +    transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r)))
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD is generated by applying a function
 +   * on each RDD of 'this' DStream.
 +   */
 +  def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
 +    //new TransformedDStream(this, context.sparkContext.clean(transformFunc))
 +    val cleanedF = context.sparkContext.clean(transformFunc)
 +    val realTransformFunc =  (rdds: Seq[RDD[_]], time: Time) => {
 +      assert(rdds.length == 1)
 +      cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
 +    }
 +    new TransformedDStream[U](Seq(this), realTransformFunc)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD is generated by applying a function
 +   * on each RDD of 'this' DStream and 'other' DStream.
 +   */
 +  def transformWith[U: ClassTag, V: ClassTag](
 +      other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
 +    ): DStream[V] = {
 +    val cleanedF = ssc.sparkContext.clean(transformFunc)
 +    transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2))
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD is generated by applying a function
 +   * on each RDD of 'this' DStream and 'other' DStream.
 +   */
 +  def transformWith[U: ClassTag, V: ClassTag](
 +      other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
 +    ): DStream[V] = {
 +    val cleanedF = ssc.sparkContext.clean(transformFunc)
 +    val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
 +      assert(rdds.length == 2)
 +      val rdd1 = rdds(0).asInstanceOf[RDD[T]]
 +      val rdd2 = rdds(1).asInstanceOf[RDD[U]]
 +      cleanedF(rdd1, rdd2, time)
 +    }
 +    new TransformedDStream[V](Seq(this, other), realTransformFunc)
 +  }
 +
 +  /**
 +   * Print the first ten elements of each RDD generated in this DStream. This is an output
 +   * operator, so this DStream will be registered as an output stream and there materialized.
 +   */
 +  def print() {
 +    def foreachFunc = (rdd: RDD[T], time: Time) => {
 +      val first11 = rdd.take(11)
 +      println ("-------------------------------------------")
 +      println ("Time: " + time)
 +      println ("-------------------------------------------")
 +      first11.take(10).foreach(println)
 +      if (first11.size > 10) println("...")
 +      println()
 +    }
 +    val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
 +    ssc.registerOutputStream(newStream)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD contains all the elements in seen in a
 +   * sliding window of time over this DStream. The new DStream generates RDDs with
 +   * the same interval as this DStream.
 +   * @param windowDuration width of the window; must be a multiple of this DStream's interval.
 +   */
 +  def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
 +
 +  /**
 +   * Return a new DStream in which each RDD contains all the elements in seen in a
 +   * sliding window of time over this DStream.
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   */
 +  def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = {
 +    new WindowedDStream(this, windowDuration, slideDuration)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD has a single element generated by reducing all
 +   * elements in a sliding window over this DStream.
 +   * @param reduceFunc associative reduce function
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   */
 +  def reduceByWindow(
 +      reduceFunc: (T, T) => T,
 +      windowDuration: Duration,
 +      slideDuration: Duration
 +    ): DStream[T] = {
 +    this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD has a single element generated by reducing all
 +   * elements in a sliding window over this DStream. However, the reduction is done incrementally
 +   * using the old window's reduced value :
 +   *  1. reduce the new values that entered the window (e.g., adding new counts)
 +   *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
 +   *  This is more efficient than reduceByWindow without "inverse reduce" function.
 +   *  However, it is applicable to only "invertible reduce functions".
 +   * @param reduceFunc associative reduce function
 +   * @param invReduceFunc inverse reduce function
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   */
 +  def reduceByWindow(
 +      reduceFunc: (T, T) => T,
 +      invReduceFunc: (T, T) => T,
 +      windowDuration: Duration,
 +      slideDuration: Duration
 +    ): DStream[T] = {
 +      this.map(x => (1, x))
 +          .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
 +          .map(_._2)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD has a single element generated by counting the number
 +   * of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with
 +   * Spark's default number of partitions.
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   */
 +  def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = {
 +    this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD contains the count of distinct elements in
 +   * RDDs in a sliding window over this DStream. Hash partitioning is used to generate
 +   * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
 +   * `numPartitions` not specified).
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   * @param numPartitions  number of partitions of each RDD in the new DStream.
 +   */
 +  def countByValueAndWindow(
 +      windowDuration: Duration,
 +      slideDuration: Duration,
 +      numPartitions: Int = ssc.sc.defaultParallelism
 +    ): DStream[(T, Long)] = {
 +
 +    this.map(x => (x, 1L)).reduceByKeyAndWindow(
 +      (x: Long, y: Long) => x + y,
 +      (x: Long, y: Long) => x - y,
 +      windowDuration,
 +      slideDuration,
 +      numPartitions,
 +      (x: (T, Long)) => x._2 != 0L
 +    )
 +  }
 +
 +  /**
 +   * Return a new DStream by unifying data of another DStream with this DStream.
 +   * @param that Another DStream having the same slideDuration as this DStream.
 +   */
 +  def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that))
 +
 +  /**
 +   * Return all the RDDs defined by the Interval object (both end times included)
 +   */
 +  def slice(interval: Interval): Seq[RDD[T]] = {
 +    slice(interval.beginTime, interval.endTime)
 +  }
 +
 +  /**
 +   * Return all the RDDs between 'fromTime' to 'toTime' (both included)
 +   */
 +  def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
 +    if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
 +      logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
 +    }
 +    if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
 +      logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
 +    }
 +    val alignedToTime = toTime.floor(slideDuration)
 +    val alignedFromTime = fromTime.floor(slideDuration)
 +
 +    logInfo("Slicing from " + fromTime + " to " + toTime +
 +      " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
 +
 +    alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
 +      if (time >= zeroTime) getOrCompute(time) else None
 +    })
 +  }
 +
 +  /**
 +   * Save each RDD in this DStream as a Sequence file of serialized objects.
 +   * The file name at each batch interval is generated based on `prefix` and
 +   * `suffix`: "prefix-TIME_IN_MS.suffix".
 +   */
 +  def saveAsObjectFiles(prefix: String, suffix: String = "") {
 +    val saveFunc = (rdd: RDD[T], time: Time) => {
 +      val file = rddToFileName(prefix, suffix, time)
 +      rdd.saveAsObjectFile(file)
 +    }
 +    this.foreach(saveFunc)
 +  }
 +
 +  /**
 +   * Save each RDD in this DStream as at text file, using string representation
 +   * of elements. The file name at each batch interval is generated based on
 +   * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
 +   */
 +  def saveAsTextFiles(prefix: String, suffix: String = "") {
 +    val saveFunc = (rdd: RDD[T], time: Time) => {
 +      val file = rddToFileName(prefix, suffix, time)
 +      rdd.saveAsTextFile(file)
 +    }
 +    this.foreach(saveFunc)
 +  }
 +
 +  def register() {
 +    ssc.registerOutputStream(this)
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d1820fef/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d1820fef/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------


[8/9] git commit: Fixed import formatting.

Posted by pw...@apache.org.
Fixed import formatting.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/ffa1d38e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/ffa1d38e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/ffa1d38e

Branch: refs/heads/master
Commit: ffa1d38ef19a7d5c5c2fc173d1d2f54267449f80
Parents: 777c181
Author: Tathagata Das <ta...@gmail.com>
Authored: Sun Jan 12 22:27:07 2014 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Sun Jan 12 22:27:07 2014 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala   | 2 +-
 .../main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala   | 2 +-
 .../src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala | 2 +-
 .../scala/org/apache/spark/streaming/twitter/TwitterUtils.scala    | 2 +-
 .../main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala | 2 +-
 .../scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala  | 2 +-
 6 files changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ffa1d38e/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index a6af53e..d53b66d 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.streaming.flume
 
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{StreamingContext}
+import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
 import org.apache.spark.streaming.dstream.DStream
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ffa1d38e/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 76f9c46..37c03be 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -26,7 +26,7 @@ import java.util.{Map => JMap}
 import kafka.serializer.{Decoder, StringDecoder}
 
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{StreamingContext}
+import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
 import org.apache.spark.streaming.dstream.DStream
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ffa1d38e/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index caa86b2..3636e46 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.streaming.mqtt
 
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{StreamingContext}
+import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
 import scala.reflect.ClassTag
 import org.apache.spark.streaming.dstream.DStream

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ffa1d38e/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
----------------------------------------------------------------------
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
index a23d685..b8bae7b 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.twitter
 import twitter4j.Status
 import twitter4j.auth.Authorization
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{StreamingContext}
+import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
 import org.apache.spark.streaming.dstream.DStream
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ffa1d38e/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index 669eb0d..7a14b3d 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -25,7 +25,7 @@ import akka.zeromq.Subscribe
 import org.apache.spark.api.java.function.{Function => JFunction}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy
-import org.apache.spark.streaming.{StreamingContext}
+import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
 import org.apache.spark.streaming.dstream.DStream
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ffa1d38e/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
index 475569c..4886cd6 100644
--- a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark._
 import org.apache.spark.api.java._
 import org.apache.spark.rdd.{RDD, DoubleRDDFunctions, PairRDDFunctions, OrderedRDDFunctions}
-import org.apache.spark.streaming.{StreamingContext}
+import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaDStream, JavaStreamingContext}
 import org.apache.spark.streaming.dstream.{DStream, PairDStreamFunctions}