You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/14 09:05:47 UTC

[2/5] git commit: Removed StreamingContext.registerInputStream and registerOutputStream - they were useless as InputDStream has been made to register itself. Also made DStream.register() private[streaming] - not useful to expose the confusing function. U

Removed StreamingContext.registerInputStream and registerOutputStream - they were useless as InputDStream has been made to register itself. Also made DStream.register() private[streaming] - not useful to expose the confusing function. Updated a lot of documentation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/4e497db8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/4e497db8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/4e497db8

Branch: refs/heads/master
Commit: 4e497db8f3826cf5142b2165a08d02c6f3c2cd90
Parents: 1233b3d
Author: Tathagata Das <ta...@gmail.com>
Authored: Mon Jan 13 23:23:46 2014 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Jan 13 23:23:46 2014 -0800

----------------------------------------------------------------------
 docs/streaming-programming-guide.md             | 32 ++++++++--------
 .../streaming/examples/NetworkWordCount.scala   |  3 +-
 .../spark/streaming/flume/FlumeUtils.scala      |  1 -
 .../streaming/flume/FlumeStreamSuite.scala      |  2 +-
 .../spark/streaming/kafka/KafkaUtils.scala      |  4 +-
 .../apache/spark/streaming/mqtt/MQTTUtils.scala |  4 +-
 .../spark/streaming/twitter/TwitterUtils.scala  |  4 +-
 .../spark/streaming/StreamingContext.scala      | 40 +++-----------------
 .../api/java/JavaStreamingContext.scala         |  7 ----
 .../spark/streaming/dstream/DStream.scala       | 25 +++++++-----
 .../spark/streaming/dstream/InputDStream.scala  | 24 +++++++-----
 .../streaming/dstream/NetworkInputDStream.scala |  9 +++--
 .../dstream/PairDStreamFunctions.scala          |  5 +++
 .../org/apache/spark/streaming/package.scala    | 38 +++++++++++++++++++
 .../streaming/util/MasterFailureTest.scala      |  2 +-
 .../apache/spark/streaming/JavaTestUtils.scala  |  3 +-
 .../spark/streaming/BasicOperationsSuite.scala  |  1 -
 .../spark/streaming/CheckpointSuite.scala       |  2 +-
 .../spark/streaming/InputStreamsSuite.scala     |  8 ++--
 .../spark/streaming/StreamingContextSuite.scala |  1 -
 .../apache/spark/streaming/TestSuiteBase.scala  |  7 +---
 21 files changed, 115 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4e497db8/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 4e8a680..1495af2 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -21,6 +21,8 @@ Add the following SBT or Maven dependency to your project to use Spark Streaming
     artifactId = spark-streaming_{{site.SCALA_VERSION}}
     version = {{site.SPARK_VERSION}}
 
+For ingesting data from sources like Kafka and Flume, add the corresponding artifact `spark-streaming-xyz_{{site.SCALA_VERSION}}` to the dependencies. For example, `spark-streaming-kafka_{{site.SCALA_VERSION}}` for Kafka, `spark-streaming-flume_{{site.SCALA_VERSION}}`, etc.  Please refer to the [Apache repository](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20AND%20v%3A%22{{site.SPARK_VERSION}}%22) for the full list of supported sources / artifacts.
+
 # Initializing Spark Streaming
 The first thing a Spark Streaming program must do is create a `StreamingContext` object, which tells Spark how to access a cluster. A `StreamingContext` can be created by using
 
@@ -28,26 +30,28 @@ The first thing a Spark Streaming program must do is create a `StreamingContext`
 new StreamingContext(master, appName, batchDuration, [sparkHome], [jars])
 {% endhighlight %}
 
-The `master` parameter is a standard [Spark cluster URL](scala-programming-guide.html#master-urls) and can be "local" for local testing. The `appName` is a name of your program, which will be shown on your cluster's web UI. The `batchDuration` is the size of the batches (as explained earlier). This must be set carefully such that the cluster can keep up with the processing of the data streams. Start with something conservative like 5 seconds. See the [Performance Tuning](#setting-the-right-batch-size) section for a detailed discussion. Finally, `sparkHome` and `jars` are necessary when running on a cluster to specify the location of your code, as described in the [Spark programming guide](scala-programming-guide.html#deploying-code-on-a-cluster).
+The `master` parameter is a standard [Spark cluster URL](scala-programming-guide.html#master-urls) and can be "local" for local testing. The `appName` is a name of your program, which will be shown on your cluster's web UI. The `batchDuration` is the size of the batches (as explained earlier). This must be set carefully such that the cluster can keep up with the processing of the data streams. Start with something conservative like 5 seconds. See the [Performance Tuning](#setting-the-right-batch-size) section for a detailed discussion. Finally, `sparkHome` and `jars` are optional parameters, which need to be set when running on a cluster to specify the location of your code, as described in the [Spark programming guide](scala-programming-guide.html#deploying-code-on-a-cluster).
 
-This constructor creates a SparkContext for your job as well, which can be accessed with `streamingContext.sparkContext`.
+{% highlight scala %}
+new SparkConf(conf, batchDuration)
+{% endhighlight %}
 
+where `conf` is a [SparkConf](api/core/index.html#org.apache.spark.SparkConf)
+object used for more advanced configuration. In both cases, a [SparkContext](api/core/index.html#org.apache.spark.SparkContext) is created as well which can be accessed with `streamingContext.sparkContext`.
 
-# Attaching Input Sources - InputDStreams
-The StreamingContext is used to creating InputDStreams from input sources:
+# Attaching Input Sources
+The StreamingContext is used to creating input streams from data sources:
 
 {% highlight scala %}
 // Assuming ssc is the StreamingContext
-ssc.textFileStream(directory)      // Creates a stream by monitoring and processing new files in a HDFS directory
-ssc.socketStream(hostname, port)   // Creates a stream that uses a TCP socket to read data from hostname:port
+ssc.textFileStream(directory)    // Creates a stream that monitors and processes new files in a HDFS directory
+ssc.socketStream(hostname, port) // Creates a stream that uses a TCP socket to read data from hostname:port
 {% endhighlight %}
 
-We also provide a input streams for Kafka, Flume, Akka actor, etc. For a complete list of input streams, take a look at the [StreamingContext API documentation](api/streaming/index.html#org.apache.spark.streaming.StreamingContext).
-
-
+The core Spark Streaming API provides input streams for files, sockets, Akka actors. Additional functionality for Kafka, Flume, ZeroMQ, Twitter, etc. can be imported by adding the right dependencies as explained in the [linking](#linking-with-spark-streaming) section.
 
 # DStream Operations
-Data received from the input streams can be processed using _DStream operations_. There are two kinds of operations - _transformations_ and _output operations_. Similar to RDD transformations, DStream transformations operate on one or more DStreams to create new DStreams with transformed data. After applying a sequence of transformations to the input streams, you'll need to call the output operations, which writies data out to an external source. 
+Data received from the input streams can be processed using _DStream operations_. There are two kinds of operations - _transformations_ and _output operations_. Similar to RDD transformations, DStream transformations operate on one or more DStreams to create new DStreams with transformed data. After applying a sequence of transformations to the input streams, output operations need to called, which writes data out to an external data sink like a file system or a database.
 
 ## Transformations
 
@@ -234,7 +238,7 @@ wordCounts.print()
 ssc.start()
 {% endhighlight %}
 
-The `socketTextStream` returns a DStream of lines received from a TCP socket-based source. The `lines` DStream is _transformed_ into a DStream using the `flatMap` operation, where each line is split into words. This `words` DStream is then mapped to a DStream of `(word, 1)` pairs, which is finally reduced to get the word counts. `wordCounts.print()` will print 10 of the counts generated every second.
+The `socketTextStream` returns a DStream of text data received from a TCP server socket. The `lines` DStream is _transformed_ into a DStream using the `flatMap` operation, where each line is split into words. This `words` DStream is then mapped to a DStream of `(word, 1)` pairs, which is finally reduced to get the word counts. `wordCounts.print()` will print 10 of the counts generated every second.
 
 To run this example on your local machine, you need to first run a Netcat server by using
 
@@ -270,14 +274,12 @@ hello world
 {% highlight bash %}
 # TERMINAL 2: RUNNING NetworkWordCount
 ...
-2012-12-31 18:47:10,446 INFO SparkContext: Job finished: run at ThreadPoolExecutor.java:886, took 0.038817 s
 -------------------------------------------
 Time: 1357008430000 ms
 -------------------------------------------
 (hello,1)
 (world,1)
 
-2012-12-31 18:47:10,447 INFO JobManager: Total delay: 0.44700 s for job 8 (execution: 0.44000 s)
 ...
 {% endhighlight %}
 </td>
@@ -384,7 +386,7 @@ A system that is required to operate 24/7 needs to be able tolerate the failure
 1. The configuration of each DStream (checkpoint interval, etc.)
 1. The RDD checkpoint files of each DStream
 
-All this is periodically saved in the file `<checkpoint directory>/graph`. To recover, a new Streaming Context can be created with this directory by using
+All this is periodically saved in the checkpoint directory. To recover, a new `StreamingContext` can be created with this directory by using
 
 {% highlight scala %}
 val ssc = new StreamingContext(checkpointDirectory)
@@ -395,7 +397,7 @@ On calling `ssc.start()` on this new context, the following steps are taken by t
 1. Schedule the transformations and output operations for all the time steps between the time when the driver failed and when it last checkpointed. This is also done for those time steps that were previously scheduled but not processed due to the failure. This will make the system recompute all the intermediate data from the checkpointed RDD files, etc.
 1. Restart the network receivers, if any, and continue receiving new data.
 
-In the current _alpha_ release, there are two different failure behaviors based on which input sources are used.
+There are two different failure behaviors based on which input sources are used.
 
 1. _Using HDFS files as input source_ - Since the data is reliably stored on HDFS, all data can re-computed and therefore no data will be lost due to any failure.
 1. _Using any input source that receives data through a network_ - The received input data is replicated in memory to multiple nodes. Since, all the data in the Spark worker's memory is lost when the Spark driver fails, the past input data will not be accessible and driver recovers. Hence, if stateful and window-based operations are used (like `updateStateByKey`, `window`, `countByValueAndWindow`, etc.), then the intermediate state will not be recovered completely.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4e497db8/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
index 25f7013..0226475 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.examples
 
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.storage.StorageLevel
 
 /**
  * Counts words in text encoded with UTF8 received from the network every second.
@@ -48,7 +49,7 @@ object NetworkWordCount {
 
     // Create a NetworkInputDStream on target ip:port and count the
     // words in input stream of \n delimited text (eg. generated by 'nc')
-    val lines = ssc.socketTextStream(args(1), args(2).toInt)
+    val lines = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_ONLY_SER)
     val words = lines.flatMap(_.split(" "))
     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
     wordCounts.print()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4e497db8/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index d53b66d..654ba45 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -37,7 +37,6 @@ object FlumeUtils {
       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
     ): DStream[SparkFlumeEvent] = {
     val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
-    ssc.registerInputStream(inputStream)
     inputStream
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4e497db8/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 2e8e9fa..8bc4397 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -43,7 +43,7 @@ class FlumeStreamSuite extends TestSuiteBase {
     val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
       with SynchronizedBuffer[Seq[SparkFlumeEvent]]
     val outputStream = new TestOutputStream(flumeStream, outputBuffer)
-    ssc.registerOutputStream(outputStream)
+    outputStream.register()
     ssc.start()
 
     val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4e497db8/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 37c03be..15a2daa 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -71,9 +71,7 @@ object KafkaUtils {
       topics: Map[String, Int],
       storageLevel: StorageLevel
     ): DStream[(K, V)] = {
-    val inputStream = new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
-    ssc.registerInputStream(inputStream)
-    inputStream
+    new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4e497db8/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index 3636e46..1b09ee5 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -37,9 +37,7 @@ object MQTTUtils {
       topic: String,
       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
     ): DStream[String] = {
-    val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
-    ssc.registerInputStream(inputStream)
-    inputStream
+    new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4e497db8/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
----------------------------------------------------------------------
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
index b8bae7b..e8433b7 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
@@ -41,9 +41,7 @@ object TwitterUtils {
       filters: Seq[String] = Nil,
       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
     ): DStream[Status] = {
-    val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
-    ssc.registerInputStream(inputStream)
-    inputStream
+    new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4e497db8/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 7b27933..26257e6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -199,10 +199,7 @@ class StreamingContext private[streaming] (
    */
   def networkStream[T: ClassTag](
     receiver: NetworkReceiver[T]): DStream[T] = {
-    val inputStream = new PluggableInputDStream[T](this,
-      receiver)
-    graph.addInputStream(inputStream)
-    inputStream
+    new PluggableInputDStream[T](this, receiver)
   }
 
   /**
@@ -259,9 +256,7 @@ class StreamingContext private[streaming] (
       converter: (InputStream) => Iterator[T],
       storageLevel: StorageLevel
     ): DStream[T] = {
-    val inputStream = new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
-    registerInputStream(inputStream)
-    inputStream
+    new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
   }
 
   /**
@@ -280,9 +275,7 @@ class StreamingContext private[streaming] (
       port: Int,
       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
     ): DStream[T] = {
-    val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel)
-    registerInputStream(inputStream)
-    inputStream
+    new RawInputDStream[T](this, hostname, port, storageLevel)
   }
 
   /**
@@ -300,9 +293,7 @@ class StreamingContext private[streaming] (
     V: ClassTag,
     F <: NewInputFormat[K, V]: ClassTag
   ] (directory: String): DStream[(K, V)] = {
-    val inputStream = new FileInputDStream[K, V, F](this, directory)
-    registerInputStream(inputStream)
-    inputStream
+    new FileInputDStream[K, V, F](this, directory)
   }
 
   /**
@@ -322,9 +313,7 @@ class StreamingContext private[streaming] (
     V: ClassTag,
     F <: NewInputFormat[K, V]: ClassTag
   ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = {
-    val inputStream = new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
-    registerInputStream(inputStream)
-    inputStream
+    new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
   }
 
   /**
@@ -367,9 +356,7 @@ class StreamingContext private[streaming] (
       oneAtATime: Boolean,
       defaultRDD: RDD[T]
     ): DStream[T] = {
-    val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD)
-    registerInputStream(inputStream)
-    inputStream
+    new QueueInputDStream(this, queue, oneAtATime, defaultRDD)
   }
 
   /**
@@ -390,21 +377,6 @@ class StreamingContext private[streaming] (
     new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
   }
 
-  /**
-   * Register an input stream that will be started (InputDStream.start() called) to get the
-   * input data.
-   */
-  def registerInputStream(inputStream: InputDStream[_]) {
-    graph.addInputStream(inputStream)
-  }
-
-  /**
-   * Register an output stream that will be computed every interval
-   */
-  def registerOutputStream(outputStream: DStream[_]) {
-    graph.addOutputStream(outputStream)
-  }
-
   /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
     * receiving system events related to streaming.
     */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4e497db8/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 1089504..4edf8fa 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -320,13 +320,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   }
 
   /**
-   * Registers an output stream that will be computed every interval
-   */
-  def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) {
-    ssc.registerOutputStream(outputStream.dstream)
-  }
-
-  /**
    * Creates a input stream from an queue of RDDs. In each batch,
    * it will process either one or all of the RDDs returned by the queue.
    *

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4e497db8/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 9dfcc08..299628c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -36,11 +36,12 @@ import org.apache.spark.streaming.Duration
 /**
  * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
  * sequence of RDDs (of the same type) representing a continuous stream of data (see
- * [[org.apache.spark.rdd.RDD]] for more details on RDDs). DStreams can either be created from
- * live data (such as, data from  * HDFS, Kafka or Flume) or it can be generated by transformation
- * existing DStreams using operations such as `map`, `window` and `reduceByKeyAndWindow`.
- * While a Spark Streaming program is running, each DStream periodically generates a RDD,
- * either from live data or by transforming the RDD generated by a parent DStream.
+ * org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs).
+ * DStreams can either be created from live data (such as, data from Kafka, Flume, sockets, HDFS)
+ * or it can be generated by transforming existing DStreams using operations such as `map`,
+ * `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream
+ * periodically generates a RDD, either from live data or by transforming the RDD generated by a
+ * parent DStream.
  *
  * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
  * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains
@@ -53,6 +54,8 @@ import org.apache.spark.streaming.Duration
  *  - A list of other DStreams that the DStream depends on
  *  - A time interval at which the DStream generates an RDD
  *  - A function that is used to generate an RDD after each time interval
+ *
+ * There are two types of DStream operations - __transformations__
  */
 
 abstract class DStream[T: ClassTag] (
@@ -519,7 +522,7 @@ abstract class DStream[T: ClassTag] (
    * 'this' DStream will be registered as an output stream and therefore materialized.
    */
   def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
-    ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc)))
+    new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
   }
 
   /**
@@ -586,8 +589,7 @@ abstract class DStream[T: ClassTag] (
       if (first11.size > 10) println("...")
       println()
     }
-    val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
-    ssc.registerOutputStream(newStream)
+    new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
   }
 
   /**
@@ -760,7 +762,10 @@ abstract class DStream[T: ClassTag] (
     this.foreachRDD(saveFunc)
   }
 
-  def register() {
-    ssc.registerOutputStream(this)
+  /**
+   * Register this DStream as an output DStream.
+   */
+  private[streaming] def register() {
+    ssc.graph.addOutputStream(this)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4e497db8/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index a1075ad..2730339 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -22,20 +22,24 @@ import org.apache.spark.streaming.{Time, Duration, StreamingContext}
 import scala.reflect.ClassTag
 
 /**
- * This is the abstract base class for all input streams. This class provides to methods
- * start() and stop() which called by the scheduler to start and stop receiving data/
- * Input streams that can generated RDDs from new data just by running a service on
- * the driver node (that is, without running a receiver onworker nodes) can be
- * implemented by directly subclassing this InputDStream. For example,
- * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory for
- * new files and generates RDDs on the new files. For implementing input streams
- * that requires running a receiver on the worker nodes, use NetworkInputDStream
- * as the parent class.
+ * This is the abstract base class for all input streams. This class provides methods
+ * start() and stop() which is called by Spark Streaming system to start and stop receiving data.
+ * Input streams that can generate RDDs from new data by running a service/thread only on
+ * the driver node (that is, without running a receiver on worker nodes), can be
+ * implemented by directly inheriting this InputDStream. For example,
+ * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for
+ * new files and generates RDDs with the new files. For implementing input streams
+ * that requires running a receiver on the worker nodes, use
+ * [[org.apache.spark.streaming.dstream.NetworkInputDStream]] as the parent class.
+ *
+ * @param ssc_ Streaming context that will execute this input stream
  */
 abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
   extends DStream[T](ssc_) {
 
-  var lastValidTime: Time = null
+  private[streaming] var lastValidTime: Time = null
+
+  ssc.graph.addInputStream(this)
 
   /**
    * Checks whether the 'time' is valid wrt slideDuration for generating RDD.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4e497db8/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index 0f1f6fc..ce153f0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -36,11 +36,12 @@ import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
 import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, RegisterReceiver}
 
 /**
- * Abstract class for defining any InputDStream that has to start a receiver on worker
- * nodes to receive external data. Specific implementations of NetworkInputDStream must
+ * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
+ * that has to start a receiver on worker nodes to receive external data.
+ * Specific implementations of NetworkInputDStream must
  * define the getReceiver() function that gets the receiver object of type
- * [[org.apache.spark.streaming.dstream.NetworkReceiver]] that will be sent to the workers to receive
- * data.
+ * [[org.apache.spark.streaming.dstream.NetworkReceiver]] that will be sent
+ * to the workers to receive data.
  * @param ssc_ Streaming context that will execute this input stream
  * @tparam T Class type of the object of this stream
  */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4e497db8/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 6b3e483..f577623 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -35,6 +35,11 @@ import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.streaming.{Time, Duration}
 
+/**
+ * Extra functions available on DStream of (key, value) pairs through an implicit conversion.
+ * Import `org.apache.spark.streaming.StreamingContext._` at the top of your program to use
+ * these functions.
+ */
 class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
 extends Serializable {
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4e497db8/streaming/src/main/scala/org/apache/spark/streaming/package.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/package.scala b/streaming/src/main/scala/org/apache/spark/streaming/package.scala
new file mode 100644
index 0000000..4dd985c
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/package.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+/**
+ * Spark Streaming functionality. [[org.apache.spark.streaming.StreamingContext]] serves as the main
+ * entry point to Spark Streaming, while [[org.apache.spark.streaming.dstream.DStream]] is the data
+ * type representing a continuous sequence of RDDs, representing a continuous stream of data.
+ *
+ * In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains operations
+ * available only on DStreams
+ * of key-value pairs, such as `groupByKey` and `reduceByKey`. These operations are automatically
+ * available on any DStream of the right type (e.g. DStream[(Int, Int)] through implicit
+ * conversions when you `import org.apache.spark.streaming.StreamingContext._`.
+ *
+ * For the Java API of Spark Streaming, take a look at the
+ * [[org.apache.spark.streaming.api.java.JavaStreamingContext]] which serves as the entry point, and
+ * the [[org.apache.spark.streaming.api.java.JavaDStream]] and the
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream]] which have the DStream functionality.
+ */
+package object streaming {
+  // For package docs only
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4e497db8/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index be67af3..5481393 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -191,7 +191,7 @@ object MasterFailureTest extends Logging {
     val inputStream = ssc.textFileStream(testDir.toString)
     val operatedStream = operation(inputStream)
     val outputStream = new TestOutputStream(operatedStream)
-    ssc.registerOutputStream(outputStream)
+    outputStream.register()
     ssc
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4e497db8/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
index 42ab959..33f6df8 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
@@ -43,7 +43,6 @@ trait JavaTestBase extends TestSuiteBase {
     implicit val cm: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
     val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions)
-    ssc.ssc.registerInputStream(dstream)
     new JavaDStream[T](dstream)
   }
 
@@ -57,7 +56,7 @@ trait JavaTestBase extends TestSuiteBase {
     implicit val cm: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
     val ostream = new TestOutputStreamWithPartitions(dstream.dstream)
-    dstream.dstream.ssc.registerOutputStream(ostream)
+    ostream.register()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4e497db8/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 7037aae..cb53555 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -379,7 +379,6 @@ class BasicOperationsSuite extends TestSuiteBase {
     val ssc = new StreamingContext(conf, Seconds(1))
     val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4))
     val stream = new TestInputStream[Int](ssc, input, 2)
-    ssc.registerInputStream(stream)
     stream.foreachRDD(_ => {})  // Dummy output stream
     ssc.start()
     Thread.sleep(2000)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4e497db8/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 0c68c44..89daf47 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -237,7 +237,7 @@ class CheckpointSuite extends TestSuiteBase {
     val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
     val outputBuffer = new ArrayBuffer[Seq[Int]]
     var outputStream = new TestOutputStream(reducedStream, outputBuffer)
-    ssc.registerOutputStream(outputStream)
+    outputStream.register()
     ssc.start()
 
     // Create files and advance manual clock to process them

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4e497db8/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index a8e0532..95bf40b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -54,7 +54,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
     val outputStream = new TestOutputStream(networkStream, outputBuffer)
     def output = outputBuffer.flatMap(x => x)
-    ssc.registerOutputStream(outputStream)
+    outputStream.register()
     ssc.start()
 
     // Feed data to the server to send to the network receiver
@@ -103,7 +103,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
     def output = outputBuffer.flatMap(x => x)
     val outputStream = new TestOutputStream(fileStream, outputBuffer)
-    ssc.registerOutputStream(outputStream)
+    outputStream.register()
     ssc.start()
 
     // Create files in the temporary directory so that Spark Streaming can read data from it
@@ -156,7 +156,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
     val outputStream = new TestOutputStream(networkStream, outputBuffer)
     def output = outputBuffer.flatMap(x => x)
-    ssc.registerOutputStream(outputStream)
+    outputStream.register()
     ssc.start()
 
     // Feed data to the server to send to the network receiver
@@ -209,7 +209,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]]
     val outputStream = new TestOutputStream(countStream, outputBuffer)
     def output = outputBuffer.flatMap(x => x)
-    ssc.registerOutputStream(outputStream)
+    outputStream.register()
     ssc.start()
 
     // Let the data from the receiver be received

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4e497db8/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index f7f3346..717da8e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -211,7 +211,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
   def addInputStream(s: StreamingContext): DStream[Int] = {
     val input = (1 to 100).map(i => (1 to i))
     val inputStream = new TestInputStream(s, input, 1)
-    s.registerInputStream(inputStream)
     inputStream
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4e497db8/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 535e5bd..2016306 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -181,8 +181,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
     val operatedStream = operation(inputStream)
     val outputStream = new TestOutputStreamWithPartitions(operatedStream,
       new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]])
-    ssc.registerInputStream(inputStream)
-    ssc.registerOutputStream(outputStream)
+    outputStream.register()
     ssc
   }
 
@@ -207,9 +206,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
     val operatedStream = operation(inputStream1, inputStream2)
     val outputStream = new TestOutputStreamWithPartitions(operatedStream,
       new ArrayBuffer[Seq[Seq[W]]] with SynchronizedBuffer[Seq[Seq[W]]])
-    ssc.registerInputStream(inputStream1)
-    ssc.registerInputStream(inputStream2)
-    ssc.registerOutputStream(outputStream)
+    outputStream.register()
     ssc
   }