You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/19 19:30:57 UTC

git commit: Merge pull request #458 from tdas/docs-update

Updated Branches:
  refs/heads/branch-0.9 34ae65b06 -> 0f077b5b4


Merge pull request #458 from tdas/docs-update

Updated java API docs for streaming, along with very minor changes in the code examples.

Docs updated for:
Scala: StreamingContext, DStream, PairDStreamFunctions
Java: JavaStreamingContext, JavaDStream, JavaPairDStream

Example updated:
JavaQueueStream: Not use deprecated method
ActorWordCount: Use the public interface the right way.
(cherry picked from commit 256a3553c447db0865ea8807a8fdbccb66a97b28)

Signed-off-by: Patrick Wendell <pw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0f077b5b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0f077b5b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0f077b5b

Branch: refs/heads/branch-0.9
Commit: 0f077b5b480cd6034b1e6c6f34d69e0c3c3854f2
Parents: 34ae65b
Author: Patrick Wendell <pw...@gmail.com>
Authored: Sun Jan 19 10:29:54 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sun Jan 19 10:30:29 2014 -0800

----------------------------------------------------------------------
 .../streaming/examples/JavaQueueStream.java     |  3 +-
 .../streaming/examples/ActorWordCount.scala     |  2 +-
 .../spark/streaming/StreamingContext.scala      | 17 ++++--
 .../spark/streaming/api/java/JavaDStream.scala  | 22 +++-----
 .../streaming/api/java/JavaPairDStream.scala    |  4 ++
 .../api/java/JavaStreamingContext.scala         | 55 ++++++++++----------
 .../spark/streaming/dstream/DStream.scala       |  6 +--
 .../dstream/PairDStreamFunctions.scala          | 11 ++--
 .../streaming/receivers/ActorReceiver.scala     | 35 ++++++++-----
 9 files changed, 79 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f077b5b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
index 7ef9c6c..e2d55f1 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
@@ -58,10 +58,9 @@ public final class JavaQueueStream {
     }
 
     for (int i = 0; i < 30; i++) {
-      rddQueue.add(ssc.sc().parallelize(list));
+      rddQueue.add(ssc.sparkContext().parallelize(list));
     }
 
-
     // Create the QueueInputDStream and use it do some processing
     JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
     JavaPairDStream<Integer, Integer> mappedStream = inputStream.map(

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f077b5b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index 57e1b1f..5a4aa7f 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -88,7 +88,7 @@ extends Actor with Receiver {
   override def preStart = remotePublisher ! SubscribeReceiver(context.self)
 
   def receive = {
-    case msg ⇒ context.parent ! pushBlock(msg.asInstanceOf[T])
+    case msg ⇒ pushBlock(msg.asInstanceOf[T])
   }
 
   override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f077b5b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 26257e6..5847b95 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -42,9 +42,15 @@ import org.apache.spark.streaming.scheduler._
 import org.apache.hadoop.conf.Configuration
 
 /**
- * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
- * information (such as, cluster URL and job name) to internally create a SparkContext, it provides
- * methods used to create DStream from various input sources.
+ * Main entry point for Spark Streaming functionality. It provides methods used to create
+ * [[org.apache.spark.streaming.dstream.DStream]]s from various input sources. It can be either
+ * created by providing a Spark master URL and an appName, or from a org.apache.spark.SparkConf
+ * configuration (see core Spark documentation), or from an existing org.apache.spark.SparkContext.
+ * The associated SparkContext can be accessed using `context.sparkContext`. After
+ * creating and transforming DStreams, the streaming computation can be started and stopped
+ * using `context.start()` and `context.stop()`, respectively.
+ * `context.awaitTransformation()` allows the current thread to wait for the termination
+ * of the context by `stop()` or by an exception.
  */
 class StreamingContext private[streaming] (
     sc_ : SparkContext,
@@ -63,7 +69,7 @@ class StreamingContext private[streaming] (
 
   /**
    * Create a StreamingContext by providing the configuration necessary for a new SparkContext.
-   * @param conf a [[org.apache.spark.SparkConf]] object specifying Spark parameters
+   * @param conf a org.apache.spark.SparkConf object specifying Spark parameters
    * @param batchDuration the time interval at which streaming data will be divided into batches
    */
   def this(conf: SparkConf, batchDuration: Duration) = {
@@ -88,7 +94,7 @@ class StreamingContext private[streaming] (
   }
 
   /**
-   * Re-create a StreamingContext from a checkpoint file.
+   * Recreate a StreamingContext from a checkpoint file.
    * @param path Path to the directory that was specified as the checkpoint directory
    * @param hadoopConf Optional, configuration object if necessary for reading from
    *                   HDFS compatible filesystems
@@ -151,6 +157,7 @@ class StreamingContext private[streaming] (
   private[streaming] val scheduler = new JobScheduler(this)
 
   private[streaming] val waiter = new ContextWaiter
+
   /**
    * Return the associated Spark context
    */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f077b5b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
index c92854c..e23b725 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
@@ -27,22 +27,12 @@ import scala.reflect.ClassTag
 import org.apache.spark.streaming.dstream.DStream
 
 /**
- * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
- * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]]
- * for more details on RDDs). DStreams can either be created from live data (such as, data from
- * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
- * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
- * DStream periodically generates a RDD, either from live data or by transforming the RDD generated
- * by a parent DStream.
- *
- * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
- * `window`. In addition, [[org.apache.spark.streaming.api.java.JavaPairDStream]] contains operations available
- * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`.
- *
- * DStreams internally is characterized by a few basic properties:
- *  - A list of other DStreams that the DStream depends on
- *  - A time interval at which the DStream generates an RDD
- *  - A function that is used to generate an RDD after each time interval
+ * A Java-friendly interface to [[org.apache.spark.streaming.dstream.DStream]], the basic
+ * abstraction in Spark Streaming that represents a continuous stream of data.
+ * DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume,
+ * etc.) or it can be generated by transforming existing DStreams using operations such as `map`,
+ * `window`. For operations applicable to key-value pair DStreams, see
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream]].
  */
 class JavaDStream[T](val dstream: DStream[T])(implicit val classTag: ClassTag[T])
     extends JavaDStreamLike[T, JavaDStream[T], JavaRDD[T]] {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f077b5b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 6bb985c..79fa6a6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -37,6 +37,10 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.rdd.PairRDDFunctions
 import org.apache.spark.streaming.dstream.DStream
 
+/**
+ * A Java-friendly interface to a DStream of key-value pairs, which provides extra methods
+ * like `reduceByKey` and `join`.
+ */
 class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
     implicit val kManifest: ClassTag[K],
     implicit val vManifest: ClassTag[V])

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f077b5b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 613683c..921b561 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -22,7 +22,6 @@ import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
 
 import java.io.InputStream
-import java.lang.{Integer => JInt}
 import java.util.{List => JList, Map => JMap}
 
 import akka.actor.{Props, SupervisorStrategy}
@@ -39,19 +38,20 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.streaming.dstream.DStream
 
 /**
- * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
- * information (such as, cluster URL and job name) to internally create a SparkContext, it provides
- * methods used to create DStream from various input sources.
+ * A Java-friendly version of [[org.apache.spark.streaming.StreamingContext]] which is the main
+ * entry point for Spark Streaming functionality. It provides methods to create
+ * [[org.apache.spark.streaming.api.java.JavaDStream]] and
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream.]] from input sources. The internal
+ * org.apache.spark.api.java.JavaSparkContext (see core Spark documentation) can be accessed
+ * using `context.sparkContext`. After creating and transforming DStreams, the streaming
+ * computation can be started and stopped using `context.start()` and `context.stop()`,
+ * respectively. `context.awaitTransformation()` allows the current thread to wait for the
+ * termination of a context by `stop()` or by an exception.
  */
 class JavaStreamingContext(val ssc: StreamingContext) {
 
-  // TODOs:
-  // - Test to/from Hadoop functions
-  // - Support creating and registering InputStreams
-
-
   /**
-   * Creates a StreamingContext.
+   * Create a StreamingContext.
    * @param master Name of the Spark Master
    * @param appName Name to be used when registering with the scheduler
    * @param batchDuration The time interval at which streaming data will be divided into batches
@@ -60,7 +60,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
     this(new StreamingContext(master, appName, batchDuration, null, Nil, Map()))
 
   /**
-   * Creates a StreamingContext.
+   * Create a StreamingContext.
    * @param master Name of the Spark Master
    * @param appName Name to be used when registering with the scheduler
    * @param batchDuration The time interval at which streaming data will be divided into batches
@@ -77,7 +77,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
     this(new StreamingContext(master, appName, batchDuration, sparkHome, Seq(jarFile), Map()))
 
   /**
-   * Creates a StreamingContext.
+   * Create a StreamingContext.
    * @param master Name of the Spark Master
    * @param appName Name to be used when registering with the scheduler
    * @param batchDuration The time interval at which streaming data will be divided into batches
@@ -94,7 +94,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
     this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, Map()))
 
   /**
-   * Creates a StreamingContext.
+   * Create a StreamingContext.
    * @param master Name of the Spark Master
    * @param appName Name to be used when registering with the scheduler
    * @param batchDuration The time interval at which streaming data will be divided into batches
@@ -113,7 +113,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
     this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, environment))
 
   /**
-   * Creates a StreamingContext using an existing SparkContext.
+   * Create a JavaStreamingContext using an existing JavaSparkContext.
    * @param sparkContext The underlying JavaSparkContext to use
    * @param batchDuration The time interval at which streaming data will be divided into batches
    */
@@ -121,7 +121,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
     this(new StreamingContext(sparkContext.sc, batchDuration))
 
   /**
-   * Creates a StreamingContext using an existing SparkContext.
+   * Create a JavaStreamingContext using a SparkConf configuration.
    * @param conf A Spark application configuration
    * @param batchDuration The time interval at which streaming data will be divided into batches
    */
@@ -129,19 +129,18 @@ class JavaStreamingContext(val ssc: StreamingContext) {
     this(new StreamingContext(conf, batchDuration))
 
   /**
-   * Re-creates a StreamingContext from a checkpoint file.
+   * Recreate a JavaStreamingContext from a checkpoint file.
    * @param path Path to the directory that was specified as the checkpoint directory
    */
   def this(path: String) = this(new StreamingContext(path, new Configuration))
 
   /**
-   * Re-creates a StreamingContext from a checkpoint file.
+   * Re-creates a JavaStreamingContext from a checkpoint file.
    * @param path Path to the directory that was specified as the checkpoint directory
    *
    */
   def this(path: String, hadoopConf: Configuration) = this(new StreamingContext(path, hadoopConf))
 
-
   @deprecated("use sparkContext", "0.9.0")
   val sc: JavaSparkContext = sparkContext
 
@@ -149,7 +148,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   val sparkContext = new JavaSparkContext(ssc.sc)
 
   /**
-   * Create a input stream from network source hostname:port. Data is received using
+   * Create an input stream from network source hostname:port. Data is received using
    * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
    * lines.
    * @param hostname      Hostname to connect to for receiving data
@@ -162,7 +161,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   }
 
   /**
-   * Create a input stream from network source hostname:port. Data is received using
+   * Create an input stream from network source hostname:port. Data is received using
    * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
    * lines. Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param hostname      Hostname to connect to for receiving data
@@ -173,7 +172,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   }
 
   /**
-   * Create a input stream from network source hostname:port. Data is received using
+   * Create an input stream from network source hostname:port. Data is received using
    * a TCP socket and the receive bytes it interepreted as object using the given
    * converter.
    * @param hostname      Hostname to connect to for receiving data
@@ -195,7 +194,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   }
 
   /**
-   * Create a input stream that monitors a Hadoop-compatible filesystem
+   * Create an input stream that monitors a Hadoop-compatible filesystem
    * for new files and reads them as text files (using key as LongWritable, value
    * as Text and input format as TextInputFormat). Files must be written to the
    * monitored directory by "moving" them from another location within the same
@@ -207,7 +206,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   }
 
   /**
-   * Create a input stream from network source hostname:port, where data is received
+   * Create an input stream from network source hostname:port, where data is received
    * as serialized blocks (serialized using the Spark's serializer) that can be directly
    * pushed into the block manager without deserializing them. This is the most efficient
    * way to receive data.
@@ -226,7 +225,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   }
 
   /**
-   * Create a input stream from network source hostname:port, where data is received
+   * Create an input stream from network source hostname:port, where data is received
    * as serialized blocks (serialized using the Spark's serializer) that can be directly
    * pushed into the block manager without deserializing them. This is the most efficient
    * way to receive data.
@@ -241,7 +240,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   }
 
   /**
-   * Create a input stream that monitors a Hadoop-compatible filesystem
+   * Create an input stream that monitors a Hadoop-compatible filesystem
    * for new files and reads them using the given key-value types and input format.
    * Files must be written to the monitored directory by "moving" them from another
    * location within the same file system. File names starting with . are ignored.
@@ -324,7 +323,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   }
 
   /**
-   * Creates a input stream from an queue of RDDs. In each batch,
+   * Creates an input stream from an queue of RDDs. In each batch,
    * it will process either one or all of the RDDs returned by the queue.
    *
    * NOTE: changes to the queue after the stream is created will not be recognized.
@@ -340,7 +339,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   }
 
   /**
-   * Creates a input stream from an queue of RDDs. In each batch,
+   * Creates an input stream from an queue of RDDs. In each batch,
    * it will process either one or all of the RDDs returned by the queue.
    *
    * NOTE: changes to the queue after the stream is created will not be recognized.
@@ -357,7 +356,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   }
 
   /**
-   * Creates a input stream from an queue of RDDs. In each batch,
+   * Creates an input stream from an queue of RDDs. In each batch,
    * it will process either one or all of the RDDs returned by the queue.
    *
    * NOTE: changes to the queue after the stream is created will not be recognized.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f077b5b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 71a4c5c..6bff56a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -37,8 +37,9 @@ import org.apache.spark.streaming.Duration
  * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
  * sequence of RDDs (of the same type) representing a continuous stream of data (see
  * org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs).
- * DStreams can either be created from live data (such as, data from Kafka, Flume, sockets, HDFS)
- * or it can be generated by transforming existing DStreams using operations such as `map`,
+ * DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume,
+ * etc.) using a [[org.apache.spark.streaming.StreamingContext]] or it can be generated by
+ * transforming existing DStreams using operations such as `map`,
  * `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream
  * periodically generates a RDD, either from live data or by transforming the RDD generated by a
  * parent DStream.
@@ -540,7 +541,6 @@ abstract class DStream[T: ClassTag] (
    * on each RDD of 'this' DStream.
    */
   def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
-    //new TransformedDStream(this, context.sparkContext.clean(transformFunc))
     val cleanedF = context.sparkContext.clean(transformFunc)
     val realTransformFunc =  (rdds: Seq[RDD[_]], time: Time) => {
       assert(rdds.length == 1)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f077b5b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index f577623..fb9df2f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -18,20 +18,17 @@
 package org.apache.spark.streaming.dstream
 
 import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.dstream._
 
 import org.apache.spark.{Partitioner, HashPartitioner}
 import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.{ClassTags, RDD, PairRDDFunctions}
-import org.apache.spark.storage.StorageLevel
+import org.apache.spark.rdd.RDD
 
 import scala.collection.mutable.ArrayBuffer
-import scala.reflect.{ClassTag, classTag}
+import scala.reflect.ClassTag
 
-import org.apache.hadoop.mapred.{JobConf, OutputFormat}
+import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
 import org.apache.hadoop.mapred.OutputFormat
-import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.streaming.{Time, Duration}
 
@@ -108,7 +105,7 @@ extends Serializable {
   /**
    * Combine elements of each key in DStream's RDDs using custom functions. This is similar to the
    * combineByKey for RDDs. Please refer to combineByKey in
-   * [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
+   * org.apache.spark.rdd.PairRDDFunctions in the Spark core documentation for more information.
    */
   def combineByKey[C: ClassTag](
     createCombiner: V => C,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f077b5b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
index fdf5371..79ed696 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
@@ -44,40 +44,49 @@ object ReceiverSupervisorStrategy {
 
 /**
  * A receiver trait to be mixed in with your Actor to gain access to
- * pushBlock API.
+ * the API for pushing received data into Spark Streaming for being processed.
  *
  * Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html
  * 
  * @example {{{
  *  class MyActor extends Actor with Receiver{
  *      def receive {
- *          case anything :String => pushBlock(anything)
+ *          case anything: String => pushBlock(anything)
  *      }
  *  }
- *  //Can be plugged in actorStream as follows
+ *
+ *  // Can be used with an actorStream as follows
  *  ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
  *
  * }}}
  *
- * @note An important point to note:
- *       Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * @note Since Actor may exist outside the spark framework, It is thus user's responsibility
  *       to ensure the type safety, i.e parametrized type of push block and InputDStream
  *       should be same.
- *
  */
-trait Receiver { self: Actor ⇒
+trait Receiver {
+
+  self: Actor ⇒ // to ensure that this can be added to Actor classes only
+
+  /**
+   * Push an iterator received data into Spark Streaming for processing
+   */
   def pushBlock[T: ClassTag](iter: Iterator[T]) {
     context.parent ! Data(iter)
   }
 
+  /**
+   * Push a single item of received data into Spark Streaming for processing
+   */
   def pushBlock[T: ClassTag](data: T) {
     context.parent ! Data(data)
   }
-
 }
 
 /**
- * Statistics for querying the supervisor about state of workers
+ * Statistics for querying the supervisor about state of workers. Used in
+ * conjunction with `StreamingContext.actorStream` and
+ * [[org.apache.spark.streaming.receivers.Receiver]].
  */
 case class Statistics(numberOfMsgs: Int,
   numberOfWorkers: Int,
@@ -96,17 +105,15 @@ private[streaming] case class Data[T: ClassTag](data: T)
  * his own Actor to run as receiver for Spark Streaming input source.
  *
  * This starts a supervisor actor which starts workers and also provides
- *  [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance].
+ * [http://doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.html fault-tolerance].
  *
- *  Here's a way to start more supervisor/workers as its children.
+ * Here's a way to start more supervisor/workers as its children.
  *
  * @example {{{
  *  context.parent ! Props(new Supervisor)
  * }}} OR {{{
- *  context.parent ! Props(new Worker,"Worker")
+ *  context.parent ! Props(new Worker, "Worker")
  * }}}
- *
- *
  */
 private[streaming] class ActorReceiver[T: ClassTag](
   props: Props,