You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2014/12/11 15:21:29 UTC

[1/2] spark git commit: [SPARK-4806] Streaming doc update for 1.2

Repository: spark
Updated Branches:
  refs/heads/master 2a5b5fd4c -> b004150ad


http://git-wip-us.apache.org/repos/asf/spark/blob/b004150a/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 5ebe834..1ac5b9e 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -7,13 +7,13 @@ title: Spark Streaming Programming Guide
 {:toc}
 
 # Overview
-Spark Streaming is an extension of the core Spark API that allows enables scalable, high-throughput,
+Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput,
 fault-tolerant stream processing of live data streams. Data can be ingested from many sources
-like Kafka, Flume, Twitter, ZeroMQ, Kinesis or plain old TCP sockets and be processed using complex
+like Kafka, Flume, Twitter, ZeroMQ, Kinesis or TCP sockets can be processed using complex
 algorithms expressed with high-level functions like `map`, `reduce`, `join` and `window`.
 Finally, processed data can be pushed out to filesystems, databases,
 and live dashboards. In fact, you can apply Spark's
-[machine learning](mllib-guide.html) algorithms, and
+[machine learning](mllib-guide.html) and
 [graph processing](graphx-programming-guide.html) algorithms on data streams.
 
 <p style="text-align: center;">
@@ -38,16 +38,25 @@ stream of results in batches.
 
 Spark Streaming provides a high-level abstraction called *discretized stream* or *DStream*,
 which represents a continuous stream of data. DStreams can be created either from input data
-stream from sources such as Kafka, Flume, and Kinesis, or by applying high-level
+streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level
 operations on other DStreams. Internally, a DStream is represented as a sequence of
 [RDDs](api/scala/index.html#org.apache.spark.rdd.RDD).
 
 This guide shows you how to start writing Spark Streaming programs with DStreams. You can
-write Spark Streaming programs in Scala or Java, both of which are presented in this guide. You
-will find tabs throughout this guide that let you choose between Scala and Java
-code snippets.
+write Spark Streaming programs in Scala, Java or Python (introduced in Spark 1.2),
+all of which are presented in this guide.
+You will find tabs throughout this guide that let you choose between code snippets of
+different languages.
+
+**Note:** Python API for Spark Streaming has been introduced in Spark 1.2. It has all the DStream
+transformations and almost all the output operations available in Scala and Java interfaces.
+However, it has only support for basic sources like text files and text data over sockets.
+APIs for additional sources, like Kafka and Flume, will be available in the future.
+Further information about available features in the Python API are mentioned throughout this
+document; look out for the tag
+<span class="badge" style="background-color: grey">Python API</span>.
 
-***************************************************************************************************  
+***************************************************************************************************
 
 # A Quick Example
 Before we go into the details of how to write your own Spark Streaming program,
@@ -76,7 +85,7 @@ val ssc = new StreamingContext(conf, Seconds(1))
 {% endhighlight %}
 
 Using this context, we can create a DStream that represents streaming data from a TCP
-source hostname, e.g. `localhost`, and port, e.g. `9999`
+source, specified as hostname (e.g. `localhost`) and port (e.g. `9999`).
 
 {% highlight scala %}
 // Create a DStream that will connect to hostname:port, like localhost:9999
@@ -141,11 +150,11 @@ import scala.Tuple2;
 
 // Create a local StreamingContext with two working thread and batch interval of 1 second
 SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
-JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1000))
+JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1))
 {% endhighlight %}
 
 Using this context, we can create a DStream that represents streaming data from a TCP
-source hostname, e.g. `localhost`, and port, e.g. `9999`
+source, specified as hostname (e.g. `localhost`) and port (e.g. `9999`).
 
 {% highlight java %}
 // Create a DStream that will connect to hostname:port, like localhost:9999
@@ -216,7 +225,7 @@ The complete code can be found in the Spark Streaming example
 
 </div>
 <div data-lang="python"  markdown="1" >
-First, we import StreamingContext, which is the main entry point for all streaming functionality. We create a local StreamingContext with two execution threads, and batch interval of 1 second.
+First, we import [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext), which is the main entry point for all streaming functionality. We create a local StreamingContext with two execution threads, and batch interval of 1 second.
 
 {% highlight python %}
 from pyspark import SparkContext
@@ -228,7 +237,7 @@ ssc = StreamingContext(sc, 1)
 {% endhighlight %}
 
 Using this context, we can create a DStream that represents streaming data from a TCP
-source hostname, e.g. `localhost`, and port, e.g. `9999`
+source, specified as hostname (e.g. `localhost`) and port (e.g. `9999`).
 
 {% highlight python %}
 # Create a DStream that will connect to hostname:port, like localhost:9999
@@ -308,7 +317,7 @@ $ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py loc
 
 
 Then, any lines typed in the terminal running the netcat server will be counted and printed on
-screen every second. It will look something like this.
+screen every second. It will look something like the following.
 
 <table width="100%">
     <td>
@@ -372,7 +381,7 @@ Time: 2014-10-14 15:25:21
 ...
 {% endhighlight %}
 </div>
-</div>    
+</div>
     </td>
 </table>
 
@@ -382,8 +391,7 @@ Time: 2014-10-14 15:25:21
 
 # Basic Concepts
 
-Next, we move beyond the simple example and elaborate on the basics of Spark Streaming that you
-need to know to write your streaming applications.
+Next, we move beyond the simple example and elaborate on the basics of Spark Streaming.
 
 ## Linking
 
@@ -414,7 +422,7 @@ some of the common ones are as follows.
 <tr><th>Source</th><th>Artifact</th></tr>
 <tr><td> Kafka </td><td> spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}} </td></tr>
 <tr><td> Flume </td><td> spark-streaming-flume_{{site.SCALA_BINARY_VERSION}} </td></tr>
-<tr><td> Kinesis<br/></td><td>spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}} [Apache Software License] </td></tr>
+<tr><td> Kinesis<br/></td><td>spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}} [Amazon Software License] </td></tr>
 <tr><td> Twitter </td><td> spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}} </td></tr>
 <tr><td> ZeroMQ </td><td> spark-streaming-zeromq_{{site.SCALA_BINARY_VERSION}} </td></tr>
 <tr><td> MQTT </td><td> spark-streaming-mqtt_{{site.SCALA_BINARY_VERSION}} </td></tr>
@@ -446,7 +454,7 @@ val ssc = new StreamingContext(conf, Seconds(1))
 
 The `appName` parameter is a name for your application to show on the cluster UI.
 `master` is a [Spark, Mesos or YARN cluster URL](submitting-applications.html#master-urls),
-or a special __"local[\*]"__ string to run in local mode. In practice, when running on a cluster, 
+or a special __"local[\*]"__ string to run in local mode. In practice, when running on a cluster,
 you will not want to hardcode `master` in the program,
 but rather [launch the application with `spark-submit`](submitting-applications.html) and
 receive it there. However, for local testing and unit tests, you can pass "local[\*]" to run Spark Streaming
@@ -481,7 +489,7 @@ JavaStreamingContext ssc = new JavaStreamingContext(conf, Duration(1000));
 
 The `appName` parameter is a name for your application to show on the cluster UI.
 `master` is a [Spark, Mesos or YARN cluster URL](submitting-applications.html#master-urls),
-or a special __"local[\*]"__ string to run in local mode. In practice, when running on a cluster, 
+or a special __"local[\*]"__ string to run in local mode. In practice, when running on a cluster,
 you will not want to hardcode `master` in the program,
 but rather [launch the application with `spark-submit`](submitting-applications.html) and
 receive it there. However, for local testing and unit tests, you can pass "local[*]" to run Spark Streaming
@@ -497,8 +505,8 @@ A `JavaStreamingContext` object can also be created from an existing `JavaSparkC
 import org.apache.spark.streaming.api.java.*;
 
 JavaSparkContext sc = ...   //existing JavaSparkContext
-JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
-{% endhighlight %} 
+JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));
+{% endhighlight %}
 </div>
 <div data-lang="python" markdown="1">
 
@@ -514,7 +522,7 @@ ssc = StreamingContext(sc, 1)
 
 The `appName` parameter is a name for your application to show on the cluster UI.
 `master` is a [Spark, Mesos or YARN cluster URL](submitting-applications.html#master-urls),
-or a special __"local[\*]"__ string to run in local mode. In practice, when running on a cluster, 
+or a special __"local[\*]"__ string to run in local mode. In practice, when running on a cluster,
 you will not want to hardcode `master` in the program,
 but rather [launch the application with `spark-submit`](submitting-applications.html) and
 receive it there. However, for local testing and unit tests, you can pass "local[\*]" to run Spark Streaming
@@ -526,17 +534,18 @@ section for more details.
 </div>
 </div>
 
-After a context is defined, you have to do the follow steps.
+After a context is defined, you have to do the following.
 
-1. Define the input sources.
-1. Setup the streaming computations.
-1. Start the receiving and procesing of data using `streamingContext.start()`.
-1. The processing will continue until `streamingContext.stop()` is called.
+1. Define the input sources by creating input DStreams.
+1. Define the streaming computations by applying transformation and output operations to DStreams.
+1. Start receiving data and processing it using `streamingContext.start()`.
+1. Wait for the processing to be stopped (manually or due to any error) using `streamingContext.awaitTermination()`.
+1. The processing can be manually stopped using `streamingContext.stop()`.
 
 ##### Points to remember:
 {:.no_toc}
-- Once a context has been started, no new streaming computations can be setup or added to it.
-- Once a context has been stopped, it cannot be started (that is, re-used) again.
+- Once a context has been started, no new streaming computations can be set up or added to it.
+- Once a context has been stopped, it cannot be restarted.
 - Only one StreamingContext can be active in a JVM at the same time.
 - stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set optional parameter of `stop()` called `stopSparkContext` to false.
 - A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.
@@ -577,29 +586,54 @@ These operations are discussed in detail in later sections.
 
 ***
 
-## Input DStreams
-Input DStreams are DStreams representing the stream of raw data received from streaming sources.
-Spark Streaming has two categories of streaming sources.
+## Input DStreams and Receivers
+Input DStreams are DStreams representing the stream of input data received from streaming
+sources. In the [quick example](#a-quick-example), `lines` was an input DStream as it represented
+the stream of data received from the netcat server. Every input DStream
+(except file stream, discussed later in this section) is associated with a **Receiver**
+([Scala doc](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver),
+[Java doc](api/java/org/apache/spark/streaming/receiver/Receiver.html)) object which receives the
+data from a source and stores it in Spark's memory for processing.
+
+Spark Streaming provides two categories of built-in streaming sources.
+
+- *Basic sources*: Sources directly available in the StreamingContext API.
+  Example: file systems, socket connections, and Akka actors.
+- *Advanced sources*: Sources like Kafka, Flume, Kinesis, Twitter, etc. are available through
+  extra utility classes. These require linking against extra dependencies as discussed in the
+  [linking](#linking) section.
+
+We are going to discuss some of the sources present in each category later in this section.
+
+Note that, if you want to receive multiple streams of data in parallel in your streaming
+application, you can create multiple input DStreams (discussed
+further in the [Performance Tuning](#level-of-parallelism-in-data-receiving) section). This will
+create multiple receivers which will simultaneously receive multiple data streams. But note that
+Spark worker/executor as a long-running task, hence it occupies one of the cores allocated to the
+Spark Streaming application. Hence, it is important to remember that Spark Streaming application
+needs to be allocated enough cores (or threads, if running locally) to process the received data,
+as well as, to run the receiver(s).
 
-- *Basic sources*: Sources directly available in the StreamingContext API. Example: file systems, socket connections, and Akka actors.
-- *Advanced sources*: Sources like Kafka, Flume, Kinesis, Twitter, etc. are available through extra utility classes. These require linking against extra dependencies as discussed in the [linking](#linking) section.
+##### Points to remember
+{:.no_toc}
 
-Every input DStream (except file stream) is associated with a single [Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver) object which receives the data from a source and stores it in Spark's memory for processing. So every input DStream receives a single stream of data. Note that in a streaming application, you can create multiple input DStreams to receive multiple streams of data in parallel. This is discussed later in the [Performance Tuning](#level-of-parallelism-in-data-receiving) section.
+- When running a Spark Streaming program locally, do not use "local" or "local[1]" as the master URL.
+  Either of these means that only one thread will be used for running tasks locally. If you are using
+  a input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will
+  be used to run the receiver, leaving no thread for processing the received data. Hence, when
+  running locally, always use "local[*n*]" as the master URL where *n* > number of receivers to run
+  (see [Spark Properties](configuration.html#spark-properties.html) for information on how to set
+  the master).
 
-A receiver is run within a Spark worker/executor as a long-running task, hence it occupies one of the cores allocated to the Spark Streaming application. Hence, it is important to remember that Spark Streaming application needs to be allocated enough cores to process the received data, as well as, to run the receiver(s). Therefore, few important points to remember are:
+- Extending the logic to running on a cluster, the number of cores allocated to the Spark Streaming
+  application must be more than the number of receivers. Otherwise the system will receive  data, but
+  not be able to process them.
 
-##### Points to remember
-{:.no_toc}
-- If the number of threads allocated to the application is less than or equal to the number of input DStreams / receivers, then the system will receive data, but not be able to process them.
-- When running locally, if you master URL is set to "local", then there is only one core to run tasks.  That is insufficient for programs using a DStream as the receiver (file streams are okay).  So, a "local" master URL in a streaming app is generally going to cause starvation for the processor.  
-Thus in any streaming app, you generally will want to allocate more than one thread (i.e. set your master to "local[2]") when testing locally.
-See [Spark Properties] (configuration.html#spark-properties.html).
-  
 ### Basic Sources
 {:.no_toc}
 
-We have already taken a look at the `ssc.socketTextStream(...)` in the [quick
-example](#a-quick-example) which creates a DStream from text
+We have already taken a look at the `ssc.socketTextStream(...)` in the [quick example](#a-quick-example)
+which creates a DStream from text
 data received over a TCP socket connection. Besides sockets, the StreamingContext API provides
 methods for creating DStreams from files and Akka actors as input sources.
 
@@ -607,10 +641,10 @@ methods for creating DStreams from files and Akka actors as input sources.
 
     <div class="codetabs">
     <div data-lang="scala" markdown="1">
-		streamingContext.fileStream[keyClass, valueClass, inputFormatClass](dataDirectory)
+        streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
     </div>
     <div data-lang="java" markdown="1">
-		streamingContext.fileStream<keyClass, valueClass, inputFormatClass>(dataDirectory);
+		streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);
     </div>
     <div data-lang="python" markdown="1">
 		streamingContext.textFileStream(dataDirectory)
@@ -626,22 +660,42 @@ methods for creating DStreams from files and Akka actors as input sources.
 
 	For simple text files, there is an easier method `streamingContext.textFileStream(dataDirectory)`. And file streams do not require running a receiver, hence does not require allocating cores.
 
-- **Streams based on Custom Actors:** DStreams can be created with data streams received through Akka actors by using `streamingContext.actorStream(actorProps, actor-name)`. See the [Custom Receiver Guide](streaming-custom-receivers.html#implementing-and-using-a-custom-actor-based-receiver) for more details.
+	<span class="badge" style="background-color: grey">Python API</span>	As of Spark 1.2,
+	`fileStream` is not available in the Python API, only	`textFileStream` is	available.
+
+- **Streams based on Custom Actors:** DStreams can be created with data streams received through Akka
+  actors by using `streamingContext.actorStream(actorProps, actor-name)`. See the [Custom Receiver
+  Guide](streaming-custom-receivers.html) for more details.
+
+  <span class="badge" style="background-color: grey">Python API</span> Since actors are available only in the Java and Scala
+  libraries, `actorStream` is not available in the Python API.
 
 - **Queue of RDDs as a Stream:** For testing a Spark Streaming application with test data, one can also create a DStream based on a queue of RDDs, using `streamingContext.queueStream(queueOfRDDs)`. Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream.
 
 For more details on streams from sockets, files, and actors,
 see the API documentations of the relevant functions in
 [StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext) for
-Scala and [JavaStreamingContext](api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html) for Java.
+Scala, [JavaStreamingContext](api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html)
+for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext) for Python.
 
 ### Advanced Sources
 {:.no_toc}
-This category of sources require interfacing with external non-Spark libraries, some of them with complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts of dependencies, the functionality to create DStreams from these sources have been moved to separate libraries, that can be [linked to](#linking) explicitly as necessary.  For example, if you want to create a DStream using data from Twitter's stream of tweets, you have to do the following.
-
-1. *Linking*: Add the artifact `spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}}` to the SBT/Maven project dependencies.
-1. *Programming*: Import the `TwitterUtils` class and create a DStream with `TwitterUtils.createStream` as shown below.
-1. *Deploying*: Generate an uber JAR with all the dependencies (including the dependency `spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}}` and its transitive dependencies) and then deploy the application. This is further explained in the [Deploying section](#deploying-applications).
+<span class="badge" style="background-color: grey">Python API</span> As of Spark 1.2,
+these sources are not available in the Python API.
+
+This category of sources require interfacing with external non-Spark libraries, some of them with
+complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts
+of dependencies, the functionality to create DStreams from these sources have been moved to separate
+libraries, that can be [linked](#linking) to explicitly when necessary. For example, if you want to
+create a DStream using data from Twitter's stream of tweets, you have to do the following.
+
+1. *Linking*: Add the artifact `spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}}` to the
+  SBT/Maven project dependencies.
+1. *Programming*: Import the `TwitterUtils` class and create a DStream with
+  `TwitterUtils.createStream` as shown below.
+1. *Deploying*: Generate an uber JAR with all the dependencies (including the dependency
+  `spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}}` and its transitive dependencies) and
+  then deploy the application. This is further explained in the [Deploying section](#deploying-applications).
 
 <div class="codetabs">
 <div data-lang="scala">
@@ -660,17 +714,21 @@ TwitterUtils.createStream(jssc);
 </div>
 </div>
 
-Note that these advanced sources are not available in the `spark-shell`, hence applications based on these
-advanced sources cannot be tested in the shell.
+Note that these advanced sources are not available in the Spark shell, hence applications based on
+these advanced sources cannot be tested in the shell. If you really want to use them in the Spark
+shell you will have to download the corresponding Maven artifact's JAR along with its dependencies
+and it in the classpath.
 
 Some of these advanced sources are as follows.
 
 - **Twitter:** Spark Streaming's TwitterUtils uses Twitter4j 3.0.3 to get the public stream of tweets using
-    [Twitter's Streaming API](https://dev.twitter.com/docs/streaming-apis). Authentication information
-    can be provided by any of the [methods](http://twitter4j.org/en/configuration.html) supported by
-    Twitter4J library. You can either get the public stream, or get the filtered stream based on a
-    keywords. See the API documentation ([Scala](api/scala/index.html#org.apache.spark.streaming.twitter.TwitterUtils$), [Java](api/java/index.html?org/apache/spark/streaming/twitter/TwitterUtils.html)) and examples ([TwitterPopularTags]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala) and
-    [TwitterAlgebirdCMS]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala)).
+  [Twitter's Streaming API](https://dev.twitter.com/docs/streaming-apis). Authentication information
+  can be provided by any of the [methods](http://twitter4j.org/en/configuration.html) supported by
+  Twitter4J library. You can either get the public stream, or get the filtered stream based on a
+  keywords. See the API documentation ([Scala](api/scala/index.html#org.apache.spark.streaming.twitter.TwitterUtils$),
+  [Java](api/java/index.html?org/apache/spark/streaming/twitter/TwitterUtils.html)) and examples
+  ([TwitterPopularTags]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala)
+  and [TwitterAlgebirdCMS]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala)).
 
 - **Flume:** Spark Streaming {{site.SPARK_VERSION_SHORT}} can received data from Flume 1.4.0. See the [Flume Integration Guide](streaming-flume-integration.html) for more details.
 
@@ -680,14 +738,37 @@ Some of these advanced sources are as follows.
 
 ### Custom Sources
 {:.no_toc}
-Input DStreams can also be created out of custom data sources. All you have to do is implement an user-defined **receiver** (see next section to understand what that is) that can receive data from the custom sources and push it into Spark. See the
-[Custom Receiver Guide](streaming-custom-receivers.html) for details.
+
+<span class="badge" style="background-color: grey">Python API</span> As of Spark 1.2,
+these sources are not available in the Python API.
+
+Input DStreams can also be created out of custom data sources. All you have to do is implement an
+user-defined **receiver** (see next section to understand what that is) that can receive data from
+the custom sources and push it into Spark. See the [Custom Receiver
+Guide](streaming-custom-receivers.html) for details.
+
+### Receiver Reliability
+{:.no_toc}
+
+There can be two kinds of data sources based on their *reliability*. Sources
+(like Kafka and Flume) allow the transferred data to be acknowledged. If the system receiving
+data from these *reliable* sources acknowledge the received data correctly, it can be ensured
+that no data gets lost due to any kind of failure. This leads to two kinds of receivers.
+
+1. *Reliable Receiver* - A *reliable receiver* correctly acknowledges a reliable
+  source that the data has been received and stored in Spark with replication.
+1. *Unreliable Receiver* - These are receivers for sources that do not support acknowledging. Even
+  for reliable sources, one may implement an unreliable receiver that do not go into the complexity
+  of acknowledging correctly.
+
+The details of how to write a reliable receiver are discussed in the
+[Custom Receiver Guide](streaming-custom-receivers.html).
 
 ***
 
 ## Transformations on DStreams
 Similar to that of RDDs, transformations allow the data from the input DStream to be modified.
-DStreams support many of the transformations available on normal Spark RDD's. 
+DStreams support many of the transformations available on normal Spark RDD's.
 Some of the common ones are as follows.
 
 <table class="table">
@@ -841,6 +922,10 @@ the `(word, 1)` pairs) and the `runningCount` having the previous count. For the
 Scala code, take a look at the example
 [stateful_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/stateful_network_wordcount.py).
 
+Note that using `updateStateByKey` requires the checkpoint directory to be configured, which is
+discussed in detail in the [checkpointing](#checkpointing) section.
+
+
 #### Transform Operation
 {:.no_toc}
 The `transform` operation (along with its variations like `transformWith`) allows
@@ -948,7 +1033,7 @@ Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer
 };
 
 // Reduce last 30 seconds of data, every 10 seconds
-JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(reduceFunc, new Duration(30000), new Duration(10000));
+JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(reduceFunc, Durations.seconds(30), Durations.seconds(10));
 {% endhighlight %}
 
 </div>
@@ -1005,7 +1090,8 @@ said two parameters - <i>windowLength</i> and <i>slideInterval</i>.
   of keys as the window slides. However, it is applicable to only "invertible reduce functions",
   that is, those reduce functions which have a corresponding "inverse reduce" function (taken as
   parameter <i>invFunc</i>. Like in <code>reduceByKeyAndWindow</code>, the number of reduce tasks
-  is configurable through an optional argument.
+  is configurable through an optional argument. Note that [checkpointing](#checkpointing) must be
+  enabled for using this operation.
 </td>
 </tr>
 <tr>
@@ -1026,49 +1112,58 @@ see [DStream](api/scala/index.html#org.apache.spark.streaming.dstream.DStream)
 and [PairDStreamFunctions](api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions).
 For the Java API, see [JavaDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaDStream.html)
 and [JavaPairDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaPairDStream.html).
-For the Python API, see [DStream](api/python/pyspark.streaming.html#pyspark.streaming.DStream)
+For the Python API, see [DStream](api/python/pyspark.streaming.html#pyspark.streaming.DStream).
 
 ***
 
 ## Output Operations on DStreams
 Output operations allow DStream's data to be pushed out external systems like a database or a file systems.
 Since the output operations actually allow the transformed data to be consumed by external systems,
-they trigger the actual execution of all the DStream transformations (similar to actions for RDDs). 
+they trigger the actual execution of all the DStream transformations (similar to actions for RDDs).
 Currently, the following output operations are defined:
 
 <table class="table">
 <tr><th style="width:30%">Output Operation</th><th>Meaning</th></tr>
 <tr>
   <td> <b>print</b>()</td>
-  <td> Prints first ten elements of every batch of data in a DStream on the driver. 
-  This is useful for development and debugging. 
+  <td> Prints first ten elements of every batch of data in a DStream on the driver node running
+  the streaming application. This is useful for development and debugging.
   <br/>
-  <b>PS</b>: called <b>pprint</b>() in Python)
+  <span class="badge" style="background-color: grey">Python API</span> This is called
+  <b>pprint()</b> in the Python API.
   </td>
 </tr>
 <tr>
+  <td> <b>saveAsTextFiles</b>(<i>prefix</i>, [<i>suffix</i>]) </td>
+  <td> Save this DStream's contents as a text files. The file name at each batch interval is
+  generated based on <i>prefix</i> and <i>suffix</i>: <i>"prefix-TIME_IN_MS[.suffix]"</i>. </td>
+</tr>
+<tr>
   <td> <b>saveAsObjectFiles</b>(<i>prefix</i>, [<i>suffix</i>]) </td>
-  <td> Save this DStream's contents as a <code>SequenceFile</code> of serialized objects. The file
+  <td> Save this DStream's contents as a <code>SequenceFile</code> of serialized Java objects. The file
   name at each batch interval is generated based on <i>prefix</i> and
   <i>suffix</i>: <i>"prefix-TIME_IN_MS[.suffix]"</i>.
+  <br/>
+  <span class="badge" style="background-color: grey">Python API</span> This is not available in
+  the Python API.
   </td>
 </tr>
 <tr>
-  <td> <b>saveAsTextFiles</b>(<i>prefix</i>, [<i>suffix</i>]) </td>
-  <td> Save this DStream's contents as a text files. The file name at each batch interval is
-  generated based on <i>prefix</i> and <i>suffix</i>: <i>"prefix-TIME_IN_MS[.suffix]"</i>. </td>
-</tr>
-<tr>
   <td> <b>saveAsHadoopFiles</b>(<i>prefix</i>, [<i>suffix</i>]) </td>
   <td> Save this DStream's contents as a Hadoop file. The file name at each batch interval is
-  generated based on <i>prefix</i> and <i>suffix</i>: <i>"prefix-TIME_IN_MS[.suffix]"</i>. </td>
+  generated based on <i>prefix</i> and <i>suffix</i>: <i>"prefix-TIME_IN_MS[.suffix]"</i>.
+  <br>
+  <span class="badge" style="background-color: grey">Python API</span> This is not available in
+  the Python API.
+  </td>
 </tr>
 <tr>
   <td> <b>foreachRDD</b>(<i>func</i>) </td>
   <td> The most generic output operator that applies a function, <i>func</i>, to each RDD generated from
   the stream. This function should push the data in each RDD to a external system, like saving the RDD to
   files, or writing it over the network to a database. Note that the function <i>func</i> is executed
-  at the driver, and will usually have RDD actions in it that will force the computation of the streaming RDDs.</td>
+  in the driver process running the streaming application, and will usually have RDD actions in it
+  that will force the computation of the streaming RDDs.</td>
 </tr>
 <tr><td></td><td></td></tr>
 </table>
@@ -1079,7 +1174,7 @@ Currently, the following output operations are defined:
 However, it is important to understand how to use this primitive correctly and efficiently.
 Some of the common mistakes to avoid are as follows.
 
-- Often writing data to external system requires creating a connection object
+Often writing data to external system requires creating a connection object
 (e.g. TCP connection to a remote server) and using it to send data to a remote system.
 For this purpose, a developer may inadvertently try creating a connection object at
 the Spark driver, but try to use it in a Spark worker to save records in the RDDs.
@@ -1087,78 +1182,78 @@ For example (in Scala),
 
 <div class="codetabs">
 <div data-lang="scala" markdown="1">
-
 {% highlight scala %}
-        dstream.foreachRDD(rdd => {
-            val connection = createNewConnection()  // executed at the driver
-            rdd.foreach(record => {
-                connection.send(record) // executed at the worker
-            })
-        })
+dstream.foreachRDD { rdd =>
+  val connection = createNewConnection()  // executed at the driver
+  rdd.foreach { record =>
+    connection.send(record) // executed at the worker
+  }
+}
 {% endhighlight %}
-
 </div>
 <div data-lang="python" markdown="1">
-
 {% highlight python %}
 def sendRecord(rdd):
     connection = createNewConnection()  # executed at the driver
     rdd.foreach(lambda record: connection.send(record))
     connection.close()
-        
+
 dstream.foreachRDD(sendRecord)
 {% endhighlight %}
-
 </div>
 </div>
 
-  This is incorrect as this requires the connection object to be serialized and sent from the driver to the worker. Such connection objects are rarely transferrable across machines. This error may manifest as serialization errors (connection object not serializable), initialization errors (connection object needs to be initialized at the workers), etc. The correct solution is to create the connection object at the worker.
+This is incorrect as this requires the connection object to be serialized and sent from the
+driver to the worker. Such connection objects are rarely transferrable across machines. This
+error may manifest as serialization errors (connection object not serializable), initialization
+errors (connection object needs to be initialized at the workers), etc. The correct solution is
+to create the connection object at the worker.
 
-- However, this can lead to another common mistake - creating a new connection for every record. For example,
+However, this can lead to another common mistake - creating a new connection for every record.
+For example,
 
 <div class="codetabs">
 <div data-lang="scala" markdown="1">
-
 {% highlight scala %}
-        dstream.foreachRDD(rdd => {
-            rdd.foreach(record => {
-                val connection = createNewConnection()
-                connection.send(record)
-                connection.close()
-            })
-        })
+dstream.foreachRDD { rdd =>
+  rdd.foreach { record =>
+    val connection = createNewConnection()
+    connection.send(record)
+    connection.close()
+  }
+}
 {% endhighlight %}
-
 </div>
 <div data-lang="python" markdown="1">
-
 {% highlight python %}
 def sendRecord(record):
     connection = createNewConnection()
     connection.send(record)
     connection.close()
-        
+
 dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
 {% endhighlight %}
-
 </div>
 </div>
 
-  Typically, creating a connection object has time and resource overheads. Therefore, creating and destroying a connection object for each record can incur unnecessarily high overheads and can significantly reduce the overall throughput of the system. A better solution is to use `rdd.foreachPartition` - create a single connection object and send all the records in a RDD partition using that connection.
+Typically, creating a connection object has time and resource overheads. Therefore, creating and
+destroying a connection object for each record can incur unnecessarily high overheads and can
+significantly reduce the overall throughput of the system. A better solution is to use
+`rdd.foreachPartition` - create a single connection object and send all the records in  a RDD
+partition using that connection.
 
 <div class="codetabs">
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-        dstream.foreachRDD(rdd => {
-            rdd.foreachPartition(partitionOfRecords => {
-                val connection = createNewConnection()
-                partitionOfRecords.foreach(record => connection.send(record))
-                connection.close()
-            })
-        })
+dstream.foreachRDD { rdd =>
+  rdd.foreachPartition { partitionOfRecords =>
+    val connection = createNewConnection()
+    partitionOfRecords.foreach(record => connection.send(record))
+    connection.close()
+  }
+}
 {% endhighlight %}
 </div>
-
 <div data-lang="python" markdown="1">
 {% highlight python %}
 def sendPartition(iter):
@@ -1166,29 +1261,29 @@ def sendPartition(iter):
     for record in iter:
         connection.send(record)
     connection.close()
-    
+
 dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
 {% endhighlight %}
 </div>
-</div>    
+</div>
 
   This amortizes the connection creation overheads over many records.
 
-- Finally, this can be further optimized by reusing connection objects across multiple RDDs/batches.
-	One can maintain a static pool of connection objects than can be reused as
-    RDDs of multiple batches are pushed to the external system, thus further reducing the overheads.
-    
+Finally, this can be further optimized by reusing connection objects across multiple RDDs/batches.
+One can maintain a static pool of connection objects than can be reused as
+RDDs of multiple batches are pushed to the external system, thus further reducing the overheads.
+
 <div class="codetabs">
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-        dstream.foreachRDD(rdd => {
-            rdd.foreachPartition(partitionOfRecords => {
-                // ConnectionPool is a static, lazily initialized pool of connections
-                val connection = ConnectionPool.getConnection()
-                partitionOfRecords.foreach(record => connection.send(record))
-                ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
-            })
-        })
+dstream.foreachRDD { rdd =>
+  rdd.foreachPartition { partitionOfRecords =>
+    // ConnectionPool is a static, lazily initialized pool of connections
+    val connection = ConnectionPool.getConnection()
+    partitionOfRecords.foreach(record => connection.send(record))
+    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
+  }
+}
 {% endhighlight %}
 </div>
 
@@ -1201,11 +1296,11 @@ def sendPartition(iter):
         connection.send(record)
     # return to the pool for future reuse
     ConnectionPool.returnConnection(connection)
-    
+
 dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
 {% endhighlight %}
 </div>
-</div> 
+</div>
 
 Note that the connections in the pool should be lazily created on demand and timed out if not used for a while. This achieves the most efficient sending of data to external systems.
 
@@ -1220,7 +1315,7 @@ Note that the connections in the pool should be lazily created on demand and tim
 
 ## Caching / Persistence
 Similar to RDDs, DStreams also allow developers to persist the stream's data in memory. That is,
-using `persist()` method on a DStream would automatically persist every RDD of that DStream in
+using `persist()` method on a DStream will automatically persist every RDD of that DStream in
 memory. This is useful if the data in the DStream will be computed multiple times (e.g., multiple
 operations on the same data). For window-based operations like `reduceByWindow` and
 `reduceByKeyAndWindow` and state-based operations like `updateStateByKey`, this is implicitly true.
@@ -1238,49 +1333,260 @@ information on different persistence levels can be found in
 ***
 
 ## Checkpointing
-A _stateful operation_ is one which operates over multiple batches of data. This includes all
-window-based operations and the `updateStateByKey` operation. Since stateful operations have a
-dependency on previous batches of data, they continuously accumulate metadata over time.
-To clear this metadata, streaming supports periodic _checkpointing_ by saving intermediate data
-to HDFS. Note that checkpointing also incurs the cost of saving to HDFS which may cause the
-corresponding batch to take longer to process. Hence, the interval of checkpointing needs to be
-set carefully. At small batch sizes (say 1 second), checkpointing every batch may significantly
-reduce operation throughput. Conversely, checkpointing too slowly causes the lineage and task
-sizes to grow which may have detrimental effects. Typically, a checkpoint interval of 5 - 10
-times of sliding interval of a DStream is good setting to try.
-
-To enable checkpointing, the developer has to provide the HDFS path to which RDD will be saved.
-This is done by using
+A streaming application must operate 24/7 and hence must be resilient to failures unrelated
+to the application logic (e.g., system failures, JVM crashes, etc.). For this to be possible,
+Spark Streaming needs to *checkpoints* enough information to a fault-
+tolerant storage system such that it can recover from failures. There are two types of data
+that are checkpointed.
+
+- *Metadata checkpointing* - Saving of the information defining the streaming computation to
+  fault-tolerant storage like HDFS. This is used to recover from failure of the node running the
+  driver of the streaming application (discussed in detail later). Metadata includes:
+  +  *Configuration* - The configuration that were used to create the streaming application.
+  +  *DStream operations* - The set of DStream operations that define the streaming application.
+  +  *Incomplete batches* - Batches whose jobs are queued but have not completed yet.
+- *Data checkpointing* - Saving of the generated RDDs to reliable storage. This is necessary
+  in some *stateful* transformations that combine data across multiple batches. In such
+  transformations, the generated RDDs depends on RDDs of previous batches, which causes the length
+  of the dependency chain to keep increasing with time. To avoid such unbounded increase in recovery
+   time (proportional to dependency chain), intermediate RDDs of stateful transformations are periodically
+  *checkpointed* to reliable storage (e.g. HDFS) to cut off the dependency chains.
+
+To summarize, metadata checkpointing is primarily needed for recovery from driver failures,
+whereas data or RDD checkpointing is necessary even for basic functioning if stateful
+transformations are used.
+
+#### When to enable Checkpointing
+{:.no_toc}
+
+Checkpointing must be enabled for applications with any of the following requirements:
+
+- *Usage of stateful transformations* - If either `updateStateByKey` or `reduceByKeyAndWindow` (with
+  inverse function) is used in the application, then the checkpoint directory must be provided for
+  allowing periodic RDD checkpointing.
+- *Recovering from failures of the driver running the application* - Metadata checkpoints are used
+  for to recover with progress information.
+
+Note that simple streaming applications without the aforementioned stateful transformations can be
+run without enabling checkpointing. The recovery from driver failures will also be partial in
+that case (some received but unprocessed data may be lost). This is often acceptable and many run
+Spark Streaming applications in this way. Support for non-Hadoop environments is expected
+to improve in the future.
+
+#### How to configure Checkpointing
+{:.no_toc}
+
+Checkpointing can be enabled by setting a directory in a fault-tolerant,
+reliable file system (e.g., HDFS, S3, etc.) to which the checkpoint information will be saved.
+This is done by using `streamingContext.checkpoint(checkpointDirectory)`. This will allow you to
+use the aforementioned stateful transformations. Additionally,
+if you want make the application recover from driver failures, you should rewrite your
+streaming application to have the following behavior.
+
+  + When the program is being started for the first time, it will create a new StreamingContext,
+    set up all the streams and then call start().
+  + When the program is being restarted after failure, it will re-create a StreamingContext
+    from the checkpoint data in the checkpoint directory.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+
+This behavior is made simple by using `StreamingContext.getOrCreate`. This is used as follows.
 
 {% highlight scala %}
-ssc.checkpoint(hdfsPath) // assuming ssc is the StreamingContext or JavaStreamingContext
+// Function to create and setup a new StreamingContext
+def functionToCreateContext(): StreamingContext = {
+    val ssc = new StreamingContext(...)   // new context
+    val lines = ssc.socketTextStream(...) // create DStreams
+    ...
+    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
+    ssc
+}
+
+// Get StreamingContext from checkpoint data or create a new one
+val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
+
+// Do additional setup on context that needs to be done,
+// irrespective of whether it is being started or restarted
+context. ...
+
+// Start the context
+context.start()
+context.awaitTermination()
 {% endhighlight %}
 
-The interval of checkpointing of a DStream can be set by using
+If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data.
+If the directory does not exist (i.e., running for the first time),
+then the function `functionToCreateContext` will be called to create a new
+context and set up the DStreams. See the Scala example
+[RecoverableNetworkWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala).
+This example appends the word counts of network data into a file.
 
-{% highlight scala %}
-dstream.checkpoint(checkpointInterval)
+</div>
+<div data-lang="java" markdown="1">
+
+This behavior is made simple by using `JavaStreamingContext.getOrCreate`. This is used as follows.
+
+{% highlight java %}
+// Create a factory object that can create a and setup a new JavaStreamingContext
+JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
+  @Override public JavaStreamingContext create() {
+    JavaStreamingContext jssc = new JavaStreamingContext(...);  // new context
+    JavaDStream<String> lines = jssc.socketTextStream(...);     // create DStreams
+    ...
+    jssc.checkpoint(checkpointDirectory);                       // set checkpoint directory
+    return jssc;
+  }
+};
+
+// Get JavaStreamingContext from checkpoint data or create a new one
+JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
+
+// Do additional setup on context that needs to be done,
+// irrespective of whether it is being started or restarted
+context. ...
+
+// Start the context
+context.start();
+context.awaitTermination();
+{% endhighlight %}
+
+If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data.
+If the directory does not exist (i.e., running for the first time),
+then the function `contextFactory` will be called to create a new
+context and set up the DStreams. See the Scala example
+[JavaRecoverableNetworkWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java).
+This example appends the word counts of network data into a file.
+
+</div>
+<div data-lang="python" markdown="1">
+
+This behavior is made simple by using `StreamingContext.getOrCreate`. This is used as follows.
+
+{% highlight python %}
+# Function to create and setup a new StreamingContext
+def functionToCreateContext():
+    sc = SparkContext(...)   # new context
+    ssc = new StreamingContext(...)
+    lines = ssc.socketTextStream(...) # create DStreams
+    ...
+    ssc.checkpoint(checkpointDirectory)   # set checkpoint directory
+    return ssc
+
+# Get StreamingContext from checkpoint data or create a new one
+context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
+
+# Do additional setup on context that needs to be done,
+# irrespective of whether it is being started or restarted
+context. ...
+
+# Start the context
+context.start()
+context.awaitTermination()
 {% endhighlight %}
 
-For DStreams that must be checkpointed (that is, DStreams created by `updateStateByKey` and
-`reduceByKeyAndWindow` with inverse function), the checkpoint interval of the DStream is by
-default set to a multiple of the DStream's sliding interval such that its at least 10 seconds.
+If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data.
+If the directory does not exist (i.e., running for the first time),
+then the function `functionToCreateContext` will be called to create a new
+context and set up the DStreams. See the Python example
+[recoverable_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/streaming/recoverable_network_wordcount.py).
+This example appends the word counts of network data into a file.
+
+You can also explicitly create a `StreamingContext` from the checkpoint data and start the
+ computation by using `StreamingContext.getOrCreate(checkpointDirectory, None)`.
+
+</div>
+</div>
+
+In addition to using `getOrCreate` one also needs to ensure that the driver process gets
+restarted automatically on failure. This can only be done by the deployment infrastructure that is
+used to run the application. This is further discussed in the
+[Deployment](#deploying-applications.html) section.
+
+Note that checkpointing of RDDs incurs the cost of saving to reliable storage.
+This may cause an increase in the processing time of those batches where RDDs get checkpointed.
+Hence, the interval of
+checkpointing needs to be set carefully. At small batch sizes (say 1 second), checkpointing every
+batch may significantly reduce operation throughput. Conversely, checkpointing too infrequently
+causes the lineage and task sizes to grow which may have detrimental effects. For stateful
+transformations that require RDD checkpointing, the default interval is a multiple of the
+batch interval that is at least 10 seconds. It can be set by using
+`dstream.checkpoint(checkpointInterval)`. Typically, a checkpoint interval of 5 - 10 times of
+sliding interval of a DStream is good setting to try.
 
 ***
 
 ## Deploying Applications
-A Spark Streaming application is deployed on a cluster in the same way as any other Spark application.
-Please refer to the [deployment guide](cluster-overview.html) for more details.
+This section discusses the steps to deploy a Spark Streaming application.
 
-Note that the applications
-that use [advanced sources](#advanced-sources) (e.g. Kafka, Flume, Twitter) are also required to package the
-extra artifact they link to, along with their dependencies, in the JAR that is used to deploy the application.
-For example, an application using `TwitterUtils` will have to include
-`spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}}` and all its transitive
-dependencies in the application JAR.
+### Requirements
+{:.no_toc}
 
-If a running Spark Streaming application needs to be upgraded (with new application code), then
-there are two possible mechanism.
+To run a Spark Streaming applications, you need to have the following.
+
+- *Cluster with a cluster manager* - This is the general requirement of any Spark application,
+  and discussed in detail in the [deployment guide](cluster-overview.html).
+
+- *Package the application JAR* - You have to compile your streaming application into a JAR.
+  If you are using [`spark-submit`](submitting-applications.html) to start the
+  application, then you will not need to provide Spark and Spark Streaming in the JAR. However,
+  if your application uses [advanced sources](#advanced-sources) (e.g. Kafka, Flume, Twitter),
+  then you will have to package the extra artifact they link to, along with their dependencies,
+  in the JAR that is used to deploy the application. For example, an application using `TwitterUtils`
+  will have to include `spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}}` and all its
+  transitive dependencies in the application JAR.
+
+- *Configuring sufficient memory for the executors* - Since the received data must be stored in
+  memory, the executors must be configured with sufficient memory to hold the received data. Note
+  that if you are doing 10 minute window operations, the system has to keep at least last 10 minutes
+  of data in memory. So the memory requirements for the application depends on the operations
+  used in it.
+
+- *Configuring checkpointing* - If the stream application requires it, then a directory in the
+  Hadoop API compatible fault-tolerant storage (e.g. HDFS, S3, etc.) must be configured as the
+  checkpoint directory and the streaming application written in a way that checkpoint
+  information can be used for failure recovery. See the [checkpointing](#checkpointing) section
+  for more details.
+
+- *Configuring automatic restart of the application driver* - To automatically recover from a
+  driver failure, the deployment infrastructure that is
+  used to run the streaming application must monitor the driver process and relaunch the driver
+  if it fails. Different [cluster managers](cluster-overview.html#cluster-manager-types)
+  have different tools to achieve this.
+    + *Spark Standalone* - A Spark application driver can be submitted to run within the Spark
+      Standalone cluster (see
+      [cluster deploy mode](spark-standalone.html#launching-spark-applications)), that is, the
+      application driver itself runs on one of the worker nodes. Furthermore, the
+      Standalone cluster manager can be instructed to *supervise* the driver,
+      and relaunch it if the driver fails either due to non-zero exit code,
+      or due to failure of the node running the driver. See *cluster mode* and *supervise* in the
+      [Spark Standalone guide](spark-standalone.html) for more details.
+    + *YARN* - Yarn supports a similar mechanism for automatically restarting an application.
+      Please refer to YARN documentation for more details.
+    + *Mesos* - [Marathon](https://github.com/mesosphere/marathon) has been used to achieve this
+      with Mesos.
+
+
+- *[Experimental in Spark 1.2] Configuring write ahead logs* - In Spark 1.2,
+  we have introduced a new experimental feature of write ahead logs for achieving strong
+  fault-tolerance guarantees. If enabled,  all the data received from a receiver gets written into
+  a write ahead log in the configuration checkpoint directory. This prevents data loss on driver
+  recovery, thus ensuring zero data loss (discussed in detail in the
+  [Fault-tolerance Semantics](#fault-tolerance-semantics) section). This can be enabled by setting
+  the [configuration parameter](configuration.html#spark-streaming)
+  `spark.streaming.receiver.writeAheadLogs.enable` to `true`. However, these stronger semantics may
+  come at the cost of the receiving throughput of individual receivers. This can be corrected by
+  running [more receivers in parallel](#level-of-parallelism-in-data-receiving)
+  to increase aggregate throughput. Additionally, it is recommended that the replication of the
+  received data within Spark be disabled when the write ahead log is enabled as the log is already
+  stored in a replicated storage system. This can be done by setting the storage level for the
+  input stream to `StorageLevel.MEMORY_AND_DISK_SER`.
+
+### Upgrading Application Code
+{:.no_toc}
+
+If a running Spark Streaming application needs to be upgraded with new
+application code, then there are two possible mechanism.
 
 - The upgraded Spark Streaming application is started and run in parallel to the existing application.
 Once the new one (receiving the same data as the old one) has been warmed up and ready
@@ -1294,8 +1600,18 @@ for graceful shutdown options) which ensure data that have been received is comp
 processed before shutdown. Then the
 upgraded application can be started, which will start processing from the same point where the earlier
 application left off. Note that this can be done only with input sources that support source-side buffering
-(like Kafka, and Flume) as data needs to be buffered while the previous application down and
-the upgraded application is not yet up.
+(like Kafka, and Flume) as data needs to be buffered while the previous application was down and
+the upgraded application is not yet up. And restarting from earlier checkpoint
+information of pre-upgrade code cannot be done. The checkpoint information essentially
+contains serialized Scala/Java/Python objects and trying to deserialize objects with new,
+modified classes may lead to errors. In this case, either start the upgraded app with a different
+checkpoint directory, or delete the previous checkpoint directory.
+
+### Other Considerations
+{:.no_toc}
+If the data is being received by the receivers faster than what can be processed,
+you can limit the rate by setting the [configuration parameter](configuration.html#spark-streaming)
+`spark.streaming.receiver.maxRate`.
 
 ***
 
@@ -1308,11 +1624,14 @@ receivers are active, number of records received, receiver error, etc.)
 and completed batches (batch processing times, queueing delays, etc.). This can be used to
 monitor the progress of the streaming application.
 
-The following two metrics in web UI is particularly important -
-*Processing Time* and *Scheduling Delay* (under *Batch Processing Statistics*). The first is the
-time to process each batch of data, and the second is the time a batch waits in a queue
-for the processing of previous batches to finish. If the batch processing time is consistently more
-than the batch interval and/or the queueing delay keeps increasing, then it indicates the system is
+The following two metrics in web UI are particularly important:
+
+- *Processing Time* - The time to process each batch of data.
+- *Scheduling Delay* - the time a batch waits in a queue for the processing of previous batches
+  to finish.
+
+If the batch processing time is consistently more than the batch interval and/or the queueing
+delay keeps increasing, then it indicates the system is
 not able to process the batches as fast they are being generated and falling behind.
 In that case, consider
 [reducing](#reducing-the-processing-time-of-each-batch) the batch processing time.
@@ -1376,13 +1695,18 @@ unifiedStream.print();
 </div>
 </div>
 
-
-Another parameter that should be considered is the receiver's blocking interval. For most receivers,
-the received data is coalesced together into large blocks of data before storing inside Spark's memory.
-The number of blocks in each batch determines the number of tasks that will be used to process those
-the received data in a map-like transformation. This blocking interval is determined by the
-[configuration parameter](configuration.html) `spark.streaming.blockInterval` and the default value
-is 200 milliseconds.
+Another parameter that should be considered is the receiver's blocking interval,
+which is determined by the [configuration parameter](configuration.html#spark-streaming)
+`spark.streaming.blockInterval`. For most receivers, the received data is coalesced together into
+blocks of data before storing inside Spark's memory. The number of blocks in each batch
+determines the number of tasks that will be used to process those
+the received data in a map-like transformation. The number of tasks per receiver per batch will be
+approximately (batch interval / block interval). For example, block interval of 200 ms will
+create 10 tasks per 2 second batches. Too low the number of tasks (that is, less than the number
+of cores per machine), then it will be inefficient as all available cores will not be used to
+process the data. To increase the number of tasks for a given batch interval, reduce the
+block interval. However, the recommended minimum value of block interval is about 50 ms,
+below which the task launching overheads may be a problem.
 
 An alternative to receiving data with multiple input streams / receivers is to explicitly repartition
 the input data stream (using `inputStream.repartition(<number of partitions>)`).
@@ -1393,12 +1717,12 @@ before further processing.
 {:.no_toc}
 Cluster resources can be under-utilized if the number of parallel tasks used in any stage of the
 computation is not high enough. For example, for distributed reduce operations like `reduceByKey`
-and `reduceByKeyAndWindow`, the default number of parallel tasks is decided by the [config property]
-(configuration.html#spark-properties) `spark.default.parallelism`. You can pass the level of
-parallelism as an argument (see [`PairDStreamFunctions`]
-(api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions)
-documentation), or set the [config property](configuration.html#spark-properties)
-`spark.default.parallelism` to change the default.
+and `reduceByKeyAndWindow`, the default number of parallel tasks is controlled by
+the`spark.default.parallelism` [configuration property](configuration.html#spark-properties). You
+can pass the level of parallelism as an argument (see
+[`PairDStreamFunctions`](api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions)
+documentation), or set the `spark.default.parallelism`
+[configuration property](configuration.html#spark-properties) to change the default.
 
 ### Data Serialization
 {:.no_toc}
@@ -1493,294 +1817,120 @@ consistent batch processing times.
 ***************************************************************************************************
 ***************************************************************************************************
 
-# Fault-tolerance Properties
-In this section, we are going to discuss the behavior of Spark Streaming application in the event
-of a node failure. To understand this, let us remember the basic fault-tolerance properties of
+# Fault-tolerance Semantics
+In this section, we will discuss the behavior of Spark Streaming applications in the event
+of node failures. To understand this, let us remember the basic fault-tolerance semantics of
 Spark's RDDs.
 
- 1. An RDD is an immutable, deterministically re-computable, distributed dataset. Each RDD
- remembers the lineage of deterministic operations that were used on a fault-tolerant input
- dataset to create it.
- 1. If any partition of an RDD is lost due to a worker node failure, then that partition can be
- re-computed from the original fault-tolerant dataset using the lineage of operations.
-
-Since all data transformations in Spark Streaming are based on RDD operations, as long as the input
-dataset is present, all intermediate data can recomputed. Keeping these properties in mind, we are
-going to discuss the failure semantics in more detail.
-
-## Failure of a Worker Node
-There are two failure behaviors based on which input sources are used.
-
-1. _Using HDFS files as input source_ - Since the data is reliably stored on HDFS, all data can
-re-computed and therefore no data will be lost due to any failure.
-1. _Using any input source that receives data through a network_ - For network-based data sources
-like Kafka and Flume, the received input data is replicated in memory between nodes of the cluster
-(default replication factor is 2). So if a worker node fails, then the system can recompute the
-lost from the the left over copy of the input data. However, if the worker node where a network
-receiver was running fails, then a tiny bit of data may be lost, that is, the data received by
-the system but not yet replicated to other node(s). The receiver will be started on a different
-node and it will continue to receive data.
+1. An RDD is an immutable, deterministically re-computable, distributed dataset. Each RDD
+remembers the lineage of deterministic operations that were used on a fault-tolerant input
+dataset to create it.
+1. If any partition of an RDD is lost due to a worker node failure, then that partition can be
+re-computed from the original fault-tolerant dataset using the lineage of operations.
+1. Assuming that all of the RDD transformations are deterministic, the data in the final transformed
+   RDD will always be the same irrespective of failures in the Spark cluster.
+
+Spark operates on data on fault-tolerant file systems like HDFS or S3. Hence,
+all of the RDDs generated from the fault-tolerant data are also fault-tolerant. However, this is not
+the case for Spark Streaming as the data in most cases is received over the network (except when
+`fileStream` is used). To achieve the same fault-tolerance properties for all of the generated RDDs,
+the received data is replicated among multiple Spark executors in worker nodes in the cluster
+(default replication factor is 2). This leads to two kinds of data in the
+system that needs to recovered in the event of failures:
+
+1. *Data received and replicated* - This data survives failure of a single worker node as a copy
+  of it exists on one of the nodes.
+1. *Data received but buffered for replication* - Since this is not replicated,
+   the only way to recover that data is to get it again from the source.
+
+Furthermore, there are two kinds of failures that we should be concerned about:
+
+1. *Failure of a Worker Node* - Any of the worker nodes running executors can fail,
+   and all in-memory data on those nodes will be lost. If any receivers were running on failed
+   nodes, then their buffered data will be lost.
+1. *Failure of the Driver Node* - If the driver node running the Spark Streaming application
+   fails, then obviously the SparkContext is lost, and all executors with their in-memory
+   data are lost.
+
+With this basic knowledge, let us understand the fault-tolerance semantics of Spark Streaming.
+
+## Semantics with files as input source
+{:.no_toc}
+If all of the input data is already present in a fault-tolerant files system like
+HDFS, Spark Streaming can always recover from any failure and process all the data. This gives
+*exactly-once* semantics, that all the data will be processed exactly once no matter what fails.
+
+## Semantics with input sources based on receivers
+{:.no_toc}
+For input sources based on receivers, the fault-tolerance semantics depend on both the failure
+scenario and the type of receiver.
+As we discussed [earlier](#receiver-reliability), there are two types of receivers:
+
+1. *Reliable Receiver* - These receivers acknowledge reliable sources only after ensuring that
+  the received data has been replicated. If such a receiver fails,
+  the buffered (unreplicated) data does not get acknowledged to the source. If the receiver is
+  restarted, the source will resend the data, and therefore no data will be lost due to the failure.
+1. *Unreliable Receiver* - Such receivers can lose data when they fail due to worker
+  or driver failures.
+
+Depending on what type of receivers are used we achieve the following semantics.
+If a worker node fails, then there is no data loss with reliable receivers. With unreliable
+receivers, data received but not replicated can get lost. If the driver node fails,
+then besides these losses, all the past data that was received and replicated in memory will be
+lost. This will affect the results of the stateful transformations.
+
+To avoid this loss of past received data, Spark 1.2 introduces an experimental feature of _write
+ahead logs_ which saves the received data to fault-tolerant storage. With the [write ahead logs
+enabled](#deploying-applications) and reliable receivers, there is zero data loss and
+exactly-once semantics.
+
+The following table summarizes the semantics under failures:
 
+<table class="table">
+  <tr>
+    <th style="width:30%">Deployment Scenario</th>
+    <th>Worker Failure</th>
+    <th>Driver Failure</th>
+  </tr>
+  <tr>
+    <td>
+      <b>Spark 1.1 or earlier, or</b><br/>
+      <b>Spark 1.2 without write ahead log</b>
+    </td>
+    <td>
+      Buffered data lost with unreliable receivers<br/>
+      Zero data loss with reliable receivers and files<br/>
+    </td>
+    <td>
+      Buffered data lost with unreliable receivers<br/>
+      Past data lost with all receivers<br/>
+      Zero data loss with files
+      </td>
+  </tr>
+  <tr>
+    <td><b>Spark 1.2 with write ahead log</b></td>
+    <td>Zero data loss with reliable receivers and files</td>
+    <td>Zero data loss with reliable receivers and files</td>
+  </tr>
+  <tr>
+    <td></td>
+    <td></td>
+    <td></td>
+  </tr>
+</table>
+
+## Semantics of output operations
+{:.no_toc}
 Since all data is modeled as RDDs with their lineage of deterministic operations, any recomputation
  always leads to the same result. As a result, all DStream transformations are guaranteed to have
  _exactly-once_ semantics. That is, the final transformed result will be same even if there were
  was a worker node failure. However, output operations (like `foreachRDD`) have _at-least once_
  semantics, that is, the transformed data may get written to an external entity more than once in
  the event of a worker failure. While this is acceptable for saving to HDFS using the
- `saveAs*Files` operations (as the file will simply get over-written by the same data),
+ `saveAs***Files` operations (as the file will simply get over-written by the same data),
  additional transactions-like mechanisms may be necessary to achieve exactly-once semantics
  for output operations.
 
-## Failure of the Driver Node
-For a streaming application to operate 24/7, Spark Streaming allows a streaming computation
-to be resumed even after the failure of the driver node. Spark Streaming periodically writes the
-metadata information of the DStreams setup through the `StreamingContext` to a
-HDFS directory (can be any Hadoop-compatible filesystem). This periodic
-*checkpointing* can be enabled by setting the checkpoint
-directory using `ssc.checkpoint(<checkpoint directory>)` as described
-[earlier](#rdd-checkpointing). On failure of the driver node,
-the lost `StreamingContext` can be recovered from this information, and restarted.
-
-To allow a Spark Streaming program to be recoverable, it must be written in a way such that
-it has the following behavior:
-
-1.  When the program is being started for the first time, it will create a new StreamingContext,
-    set up all the streams and then call start().
-1.  When the program is being restarted after failure, it will re-create a StreamingContext
-    from the checkpoint data in the checkpoint directory.
-
-<div class="codetabs">
-<div data-lang="scala" markdown="1">
-
-This behavior is made simple by using `StreamingContext.getOrCreate`. This is used as follows.
-
-{% highlight scala %}
-// Function to create and setup a new StreamingContext
-def functionToCreateContext(): StreamingContext = {
-    val ssc = new StreamingContext(...)   // new context
-    val lines = ssc.socketTextStream(...) // create DStreams
-    ...
-    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
-    ssc
-}
-
-// Get StreamingContext from checkpoint data or create a new one
-val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
-
-// Do additional setup on context that needs to be done,
-// irrespective of whether it is being started or restarted
-context. ...
-
-// Start the context
-context.start()
-context.awaitTermination()
-{% endhighlight %}
-
-If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data.
-If the directory does not exist (i.e., running for the first time),
-then the function `functionToCreateContext` will be called to create a new
-context and set up the DStreams. See the Scala example
-[RecoverableNetworkWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala).
-This example appends the word counts of network data into a file.
-
-You can also explicitly create a `StreamingContext` from the checkpoint data and start the
- computation by using `new StreamingContext(checkpointDirectory)`.
-
-</div>
-<div data-lang="java" markdown="1">
-
-This behavior is made simple by using `JavaStreamingContext.getOrCreate`. This is used as follows.
-
-{% highlight java %}
-// Create a factory object that can create a and setup a new JavaStreamingContext
-JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
-  @Override public JavaStreamingContext create() {
-    JavaStreamingContext jssc = new JavaStreamingContext(...);  // new context
-    JavaDStream<String> lines = jssc.socketTextStream(...);     // create DStreams
-    ...
-    jssc.checkpoint(checkpointDirectory);                       // set checkpoint directory
-    return jssc;
-  }
-};
-
-// Get JavaStreamingContext from checkpoint data or create a new one
-JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
-
-// Do additional setup on context that needs to be done,
-// irrespective of whether it is being started or restarted
-context. ...
-
-// Start the context
-context.start();
-context.awaitTermination();
-{% endhighlight %}
-
-If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data.
-If the directory does not exist (i.e., running for the first time),
-then the function `contextFactory` will be called to create a new
-context and set up the DStreams.
-
-You can also explicitly create a `JavaStreamingContext` from the checkpoint data and start
-the computation by using `new JavaStreamingContext(checkpointDirectory)`.
-
-</div>
-<div data-lang="python" markdown="1">
-
-This behavior is made simple by using `StreamingContext.getOrCreate`. This is used as follows.
-
-{% highlight python %}
-# Function to create and setup a new StreamingContext
-def functionToCreateContext():
-    sc = SparkContext(...)   # new context
-    ssc = new StreamingContext(...)  
-    lines = ssc.socketTextStream(...) # create DStreams
-    ...
-    ssc.checkpoint(checkpointDirectory)   # set checkpoint directory
-    return ssc
-
-# Get StreamingContext from checkpoint data or create a new one
-context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
-
-# Do additional setup on context that needs to be done,
-# irrespective of whether it is being started or restarted
-context. ...
-
-# Start the context
-context.start()
-context.awaitTermination()
-{% endhighlight %}
-
-If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data.
-If the directory does not exist (i.e., running for the first time),
-then the function `functionToCreateContext` will be called to create a new
-context and set up the DStreams. See the Python example
-[recoverable_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/streaming/recoverable_network_wordcount.py).
-This example appends the word counts of network data into a file.
-
-You can also explicitly create a `StreamingContext` from the checkpoint data and start the
- computation by using `StreamingContext.getOrCreate(checkpointDirectory, None)`.
-
-</div>
-
-</div>
-
-**Note**: If Spark Streaming and/or the Spark Streaming program is recompiled,
-you *must* create a new `StreamingContext` or `JavaStreamingContext`,
-not recreate from checkpoint data. This is because trying to load a
-context from checkpoint data may fail if the data was generated before recompilation of the
-classes. So, if you are using `getOrCreate`, then make sure that the checkpoint directory is
-explicitly deleted every time recompiled code needs to be launched.
-
-This failure recovery can be done automatically using Spark's
-[standalone cluster mode](spark-standalone.html), which allows the driver of any Spark application
-to be launched within the cluster and be restarted on failure (see
-[supervise mode](spark-standalone.html#launching-applications-inside-the-cluster)). This can be
-tested locally by launching the above example using the supervise mode in a
-local standalone cluster and killing the java process running the driver (will be shown as
-*DriverWrapper* when `jps` is run to show all active Java processes). The driver should be
-automatically restarted, and the word counts will cont
-
-For other deployment environments like Mesos and Yarn, you have to restart the driver through other
-mechanisms.
-
-#### Recovery Semantics
-{:.no_toc}
-
-There are two different failure behaviors based on which input sources are used.
-
-1. _Using HDFS files as input source_ - Since the data is reliably stored on HDFS, all data can
-re-computed and therefore no data will be lost due to any failure.
-1. _Using any input source that receives data through a network_ - The received input data is
-replicated in memory to multiple nodes. Since all the data in the Spark worker's memory is lost
-when the Spark driver fails, the past input data will not be accessible and driver recovers.
-Hence, if stateful and window-based operations are used
-(like `updateStateByKey`, `window`, `countByValueAndWindow`, etc.), then the intermediate state
-will not be recovered completely.
-
-In future releases, we will support full recoverability for all input sources. Note that for
-non-stateful transformations like `map`, `count`, and `reduceByKey`, with _all_ input streams,
-the system, upon restarting, will continue to receive and process new data.
-
-To better understand the behavior of the system under driver failure with a HDFS source, let's
-consider what will happen with a file input stream. Specifically, in the case of the file input
-stream, it will correctly identify new files that were created while the driver was down and
-process them in the same way as it would have if the driver had not failed. To explain further
-in the case of file input stream, we shall use an example. Let's say, files are being generated
-every second, and a Spark Streaming program reads every new file and output the number of lines
-in the file. This is what the sequence of outputs would be with and without a driver failure.
-
-<table class="table">
-    <!-- Results table headers -->
-    <tr>
-      <th> Time </th>
-      <th> Number of lines in input file </th>
-      <th> Output without driver failure </th>
-      <th> Output with driver failure </th>
-    </tr>
-    <tr>
-      <td>1</td>
-      <td>10</td>
-      <td>10</td>
-      <td>10</td>
-    </tr>
-    <tr>
-      <td>2</td>
-      <td>20</td>
-      <td>20</td>
-      <td>20</td>
-    </tr>
-    <tr>
-      <td>3</td>
-      <td>30</td>
-      <td>30</td>
-      <td>30</td>
-    </tr>
-    <tr>
-      <td>4</td>
-      <td>40</td>
-      <td>40</td>
-      <td>[DRIVER FAILS]<br />no output</td>
-    </tr>
-    <tr>
-      <td>5</td>
-      <td>50</td>
-      <td>50</td>
-      <td>no output</td>
-    </tr>
-    <tr>
-      <td>6</td>
-      <td>60</td>
-      <td>60</td>
-      <td>no output</td>
-    </tr>
-    <tr>
-      <td>7</td>
-      <td>70</td>
-      <td>70</td>
-      <td>[DRIVER RECOVERS]<br />40, 50, 60, 70</td>
-    </tr>
-    <tr>
-      <td>8</td>
-      <td>80</td>
-      <td>80</td>
-      <td>80</td>
-    </tr>
-    <tr>
-      <td>9</td>
-      <td>90</td>
-      <td>90</td>
-      <td>90</td>
-    </tr>
-    <tr>
-      <td>10</td>
-      <td>100</td>
-      <td>100</td>
-      <td>100</td>
-    </tr>
-</table>
-
-If the driver had crashed in the middle of the processing of time 3, then it will process time 3
-and output 30 after recovery.
 
 ***************************************************************************************************
 ***************************************************************************************************
@@ -1864,5 +2014,5 @@ package and renamed for better clarity.
 
 * More examples in [Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming)
   and [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming)
-  and [Python] ({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/streaming)
+  and [Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/streaming)
 * [Paper](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) and [video](http://youtu.be/g171ndOHgJ0) describing Spark Streaming.

http://git-wip-us.apache.org/repos/asf/spark/blob/b004150a/docs/submitting-applications.md
----------------------------------------------------------------------
diff --git a/docs/submitting-applications.md b/docs/submitting-applications.md
index 45b70b1..2581c9f 100644
--- a/docs/submitting-applications.md
+++ b/docs/submitting-applications.md
@@ -43,17 +43,18 @@ Some of the commonly used options are:
 
 * `--class`: The entry point for your application (e.g. `org.apache.spark.examples.SparkPi`)
 * `--master`: The [master URL](#master-urls) for the cluster (e.g. `spark://23.195.26.187:7077`)
-* `--deploy-mode`: Whether to deploy your driver on the worker nodes (`cluster`) or locally as an external client (`client`) (default: `client`)*
+* `--deploy-mode`: Whether to deploy your driver on the worker nodes (`cluster`) or locally as an external client (`client`) (default: `client`) <b> &#8224; </b>
 * `--conf`: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap "key=value" in quotes (as shown).
 * `application-jar`: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an `hdfs://` path or a `file://` path that is present on all nodes.
 * `application-arguments`: Arguments passed to the main method of your main class, if any
 
-*A common deployment strategy is to submit your application from a gateway machine that is
+<b>&#8224;</b> A common deployment strategy is to submit your application from a gateway machine
+that is
 physically co-located with your worker machines (e.g. Master node in a standalone EC2 cluster).
 In this setup, `client` mode is appropriate. In `client` mode, the driver is launched directly
-within the client `spark-submit` process, with the input and output of the application attached
-to the console. Thus, this mode is especially suitable for applications that involve the REPL
-(e.g. Spark shell).
+within the `spark-submit` process which acts as a *client* to the cluster. The input and
+output of the application is attached to the console. Thus, this mode is especially suitable
+for applications that involve the REPL (e.g. Spark shell).
 
 Alternatively, if your application is submitted from a machine far from the worker machines (e.g.
 locally on your laptop), it is common to use `cluster` mode to minimize network latency between
@@ -63,8 +64,12 @@ clusters, Mesos clusters, or python applications.
 For Python applications, simply pass a `.py` file in the place of `<application-jar>` instead of a JAR,
 and add Python `.zip`, `.egg` or `.py` files to the search path with `--py-files`.
 
-To enumerate all options available to `spark-submit` run it with `--help`. Here are a few
-examples of common options:
+There are a few options available that are specific to the
+[cluster manager](#cluster-overview.html#cluster-manager-types) that is being used.
+For example, with a [Spark Standalone](#spark-standalone) cluster with `cluster` deploy mode,
+you can also specify `--supervise` to make sure that the driver is automatically restarted if it
+fails with non-zero exit code. To enumerate all such options available to `spark-submit`,
+run it with `--help`. Here are a few examples of common options:
 
 {% highlight bash %}
 # Run application locally on 8 cores
@@ -74,7 +79,7 @@ examples of common options:
   /path/to/examples.jar \
   100
 
-# Run on a Spark standalone cluster
+# Run on a Spark Standalone cluster in client deploy mode
 ./bin/spark-submit \
   --class org.apache.spark.examples.SparkPi \
   --master spark://207.184.161.138:7077 \
@@ -83,6 +88,17 @@ examples of common options:
   /path/to/examples.jar \
   1000
 
+# Run on a Spark Standalone cluster in cluster deploy mode with supervise
+./bin/spark-submit \
+  --class org.apache.spark.examples.SparkPi \
+  --master spark://207.184.161.138:7077 \
+  --deploy-mode cluster
+  --supervise
+  --executor-memory 20G \
+  --total-executor-cores 100 \
+  /path/to/examples.jar \
+  1000
+
 # Run on a YARN cluster
 export HADOOP_CONF_DIR=XXX
 ./bin/spark-submit \
@@ -93,7 +109,7 @@ export HADOOP_CONF_DIR=XXX
   /path/to/examples.jar \
   1000
 
-# Run a Python application on a cluster
+# Run a Python application on a Spark Standalone cluster
 ./bin/spark-submit \
   --master spark://207.184.161.138:7077 \
   examples/src/main/python/pi.py \
@@ -163,5 +179,5 @@ to executors.
 
 # More Information
 
-Once you have deployed your application, the [cluster mode overview](cluster-overview.html) describes 
+Once you have deployed your application, the [cluster mode overview](cluster-overview.html) describes
 the components involved in distributed execution, and how to monitor and debug applications.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-4806] Streaming doc update for 1.2

Posted by td...@apache.org.
[SPARK-4806] Streaming doc update for 1.2

Important updates to the streaming programming guide
- Make the fault-tolerance properties easier to understand, with information about write ahead logs
- Update the information about deploying the spark streaming app with information about Driver HA
- Update Receiver guide to discuss reliable vs unreliable receivers.

Author: Tathagata Das <ta...@gmail.com>
Author: Josh Rosen <jo...@databricks.com>
Author: Josh Rosen <ro...@gmail.com>

Closes #3653 from tdas/streaming-doc-update-1.2 and squashes the following commits:

f53154a [Tathagata Das] Addressed Josh's comments.
ce299e4 [Tathagata Das] Minor update.
ca19078 [Tathagata Das] Minor change
f746951 [Tathagata Das] Mentioned performance problem with WAL
7787209 [Tathagata Das] Merge branch 'streaming-doc-update-1.2' of github.com:tdas/spark into streaming-doc-update-1.2
2184729 [Tathagata Das] Updated Kafka and Flume guides with reliability information.
2f3178c [Tathagata Das] Added more information about writing reliable receivers in the custom receiver guide.
91aa5aa [Tathagata Das] Improved API Docs menu
5707581 [Tathagata Das] Added Pythn API badge
b9c8c24 [Tathagata Das] Merge pull request #26 from JoshRosen/streaming-programming-guide
b8c8382 [Josh Rosen] minor fixes
a4ef126 [Josh Rosen] Restructure parts of the fault-tolerance section to read a bit nicer when skipping over the headings
65f66cd [Josh Rosen] Fix broken link to fault-tolerance semantics section.
f015397 [Josh Rosen] Minor grammar / pluralization fixes.
3019f3a [Josh Rosen] Fix minor Markdown formatting issues
aa8bb87 [Tathagata Das] Small update.
195852c [Tathagata Das] Updated based on Josh's comments, updated receiver reliability and deploying section, and also updated configuration.
17b99fb [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-doc-update-1.2
a0217c0 [Tathagata Das] Changed Deploying menu layout
67fcffc [Tathagata Das] Added cluster mode + supervise example to submitting application guide.
e45453b [Tathagata Das] Update streaming guide, added deploying section.
192c7a7 [Tathagata Das] Added more info about Python API, and rewrote the checkpointing section.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b004150a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b004150a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b004150a

Branch: refs/heads/master
Commit: b004150adb503ddbb54d5cd544e39ad974497c41
Parents: 2a5b5fd
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Dec 11 06:21:23 2014 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Dec 11 06:21:23 2014 -0800

----------------------------------------------------------------------
 docs/_layouts/global.html           |   13 +-
 docs/configuration.md               |  133 ++--
 docs/streaming-custom-receivers.md  |   90 ++-
 docs/streaming-flume-integration.md |   13 +-
 docs/streaming-kafka-integration.md |   17 +
 docs/streaming-programming-guide.md | 1068 +++++++++++++++++-------------
 docs/submitting-applications.md     |   36 +-
 7 files changed, 819 insertions(+), 551 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b004150a/docs/_layouts/global.html
----------------------------------------------------------------------
diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html
index 627ed37..8841f76 100755
--- a/docs/_layouts/global.html
+++ b/docs/_layouts/global.html
@@ -33,7 +33,7 @@
         <!-- Google analytics script -->
         <script type="text/javascript">
           var _gaq = _gaq || [];
-          _gaq.push(['_setAccount', 'UA-32518208-1']);
+          _gaq.push(['_setAccount', 'UA-32518208-2']);
           _gaq.push(['_trackPageview']);
 
           (function() {
@@ -79,9 +79,9 @@
                         <li class="dropdown">
                             <a href="#" class="dropdown-toggle" data-toggle="dropdown">API Docs<b class="caret"></b></a>
                             <ul class="dropdown-menu">
-                                <li><a href="api/scala/index.html#org.apache.spark.package">Scaladoc</a></li>
-                                <li><a href="api/java/index.html">Javadoc</a></li>
-                                <li><a href="api/python/index.html">Python API</a></li>
+                                <li><a href="api/scala/index.html#org.apache.spark.package">Scala</a></li>
+                                <li><a href="api/java/index.html">Java</a></li>
+                                <li><a href="api/python/index.html">Python</a></li>
                             </ul>
                         </li>
 
@@ -91,10 +91,11 @@
                                 <li><a href="cluster-overview.html">Overview</a></li>
                                 <li><a href="submitting-applications.html">Submitting Applications</a></li>
                                 <li class="divider"></li>
-                                <li><a href="ec2-scripts.html">Amazon EC2</a></li>
-                                <li><a href="spark-standalone.html">Standalone Mode</a></li>
+                                <li><a href="spark-standalone.html">Spark Standalone</a></li>
                                 <li><a href="running-on-mesos.html">Mesos</a></li>
                                 <li><a href="running-on-yarn.html">YARN</a></li>
+                                <li class="divider"></li>
+                                <li><a href="ec2-scripts.html">Amazon EC2</a></li>
                             </ul>
                         </li>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b004150a/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index d50b046..acee267 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -8,7 +8,7 @@ title: Spark Configuration
 Spark provides three locations to configure the system:
 
 * [Spark properties](#spark-properties) control most application parameters and can be set by using
-  a [SparkConf](api/core/index.html#org.apache.spark.SparkConf) object, or through Java
+  a [SparkConf](api/scala/index.html#org.apache.spark.SparkConf) object, or through Java
   system properties.
 * [Environment variables](#environment-variables) can be used to set per-machine settings, such as
   the IP address, through the `conf/spark-env.sh` script on each node.
@@ -23,8 +23,8 @@ application. These properties can be set directly on a
 (e.g. master URL and application name), as well as arbitrary key-value pairs through the
 `set()` method. For example, we could initialize an application with two threads as follows:
 
-Note that we run with local[2], meaning two threads - which represents "minimal" parallelism, 
-which can help detect bugs that only exist when we run in a distributed context. 
+Note that we run with local[2], meaning two threads - which represents "minimal" parallelism,
+which can help detect bugs that only exist when we run in a distributed context.
 
 {% highlight scala %}
 val conf = new SparkConf()
@@ -35,7 +35,7 @@ val sc = new SparkContext(conf)
 {% endhighlight %}
 
 Note that we can have more than 1 thread in local mode, and in cases like spark streaming, we may actually
-require one to prevent any sort of starvation issues.  
+require one to prevent any sort of starvation issues.
 
 ## Dynamically Loading Spark Properties
 In some cases, you may want to avoid hard-coding certain configurations in a `SparkConf`. For
@@ -48,8 +48,8 @@ val sc = new SparkContext(new SparkConf())
 
 Then, you can supply configuration values at runtime:
 {% highlight bash %}
-./bin/spark-submit --name "My app" --master local[4] --conf spark.shuffle.spill=false 
-  --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar 
+./bin/spark-submit --name "My app" --master local[4] --conf spark.shuffle.spill=false
+  --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar
 {% endhighlight %}
 
 The Spark shell and [`spark-submit`](submitting-applications.html)
@@ -123,7 +123,7 @@ of the most common options to set are:
   <td>
     Limit of total size of serialized results of all partitions for each Spark action (e.g. collect).
     Should be at least 1M, or 0 for unlimited. Jobs will be aborted if the total size
-    is above this limit. 
+    is above this limit.
     Having a high limit may cause out-of-memory errors in driver (depends on spark.driver.memory
     and memory overhead of objects in JVM). Setting a proper limit can protect the driver from
     out-of-memory errors.
@@ -218,6 +218,45 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.executor.logs.rolling.strategy</code></td>
+  <td>(none)</td>
+  <td>
+    Set the strategy of rolling of executor logs. By default it is disabled. It can
+    be set to "time" (time-based rolling) or "size" (size-based rolling). For "time",
+    use <code>spark.executor.logs.rolling.time.interval</code> to set the rolling interval.
+    For "size", use <code>spark.executor.logs.rolling.size.maxBytes</code> to set
+    the maximum file size for rolling.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.executor.logs.rolling.time.interval</code></td>
+  <td>daily</td>
+  <td>
+    Set the time interval by which the executor logs will be rolled over.
+    Rolling is disabled by default. Valid values are `daily`, `hourly`, `minutely` or
+    any interval in seconds. See <code>spark.executor.logs.rolling.maxRetainedFiles</code>
+    for automatic cleaning of old logs.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.executor.logs.rolling.size.maxBytes</code></td>
+  <td>(none)</td>
+  <td>
+    Set the max size of the file by which the executor logs will be rolled over.
+    Rolling is disabled by default. Value is set in terms of bytes.
+    See <code>spark.executor.logs.rolling.maxRetainedFiles</code>
+    for automatic cleaning of old logs.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.executor.logs.rolling.maxRetainedFiles</code></td>
+  <td>(none)</td>
+  <td>
+    Sets the number of latest rolling log files that are going to be retained by the system.
+    Older log files will be deleted. Disabled by default.
+  </td>
+</tr>
+<tr>
   <td><code>spark.files.userClassPathFirst</code></td>
   <td>false</td>
   <td>
@@ -250,10 +289,11 @@ Apart from these, the following properties are also available, and may be useful
   <td><code>spark.python.profile.dump</code></td>
   <td>(none)</td>
   <td>
-    The directory which is used to dump the profile result before driver exiting. 
+    The directory which is used to dump the profile result before driver exiting.
     The results will be dumped as separated file for each RDD. They can be loaded
     by ptats.Stats(). If this is specified, the profile result will not be displayed
     automatically.
+  </td>
 </tr>
 <tr>
   <td><code>spark.python.worker.reuse</code></td>
@@ -269,8 +309,8 @@ Apart from these, the following properties are also available, and may be useful
   <td><code>spark.executorEnv.[EnvironmentVariableName]</code></td>
   <td>(none)</td>
   <td>
-    Add the environment variable specified by <code>EnvironmentVariableName</code> to the Executor 
-    process. The user can specify multiple of these and to set multiple environment variables. 
+    Add the environment variable specified by <code>EnvironmentVariableName</code> to the Executor
+    process. The user can specify multiple of these and to set multiple environment variables.
   </td>
 </tr>
 <tr>
@@ -475,9 +515,9 @@ Apart from these, the following properties are also available, and may be useful
   <td>
     The codec used to compress internal data such as RDD partitions, broadcast variables and
     shuffle outputs. By default, Spark provides three codecs: <code>lz4</code>, <code>lzf</code>,
-    and <code>snappy</code>. You can also use fully qualified class names to specify the codec, 
-    e.g. 
-    <code>org.apache.spark.io.LZ4CompressionCodec</code>,    
+    and <code>snappy</code>. You can also use fully qualified class names to specify the codec,
+    e.g.
+    <code>org.apache.spark.io.LZ4CompressionCodec</code>,
     <code>org.apache.spark.io.LZFCompressionCodec</code>,
     and <code>org.apache.spark.io.SnappyCompressionCodec</code>.
   </td>
@@ -945,7 +985,7 @@ Apart from these, the following properties are also available, and may be useful
     (resources are executors in yarn mode, CPU cores in standalone mode)
     to wait for before scheduling begins. Specified as a double between 0.0 and 1.0.
     Regardless of whether the minimum ratio of resources has been reached,
-    the maximum amount of time it will wait before scheduling begins is controlled by config 
+    the maximum amount of time it will wait before scheduling begins is controlled by config
     <code>spark.scheduler.maxRegisteredResourcesWaitingTime</code>.
   </td>
 </tr>
@@ -954,7 +994,7 @@ Apart from these, the following properties are also available, and may be useful
   <td>30000</td>
   <td>
     Maximum amount of time to wait for resources to register before scheduling begins
-    (in milliseconds).  
+    (in milliseconds).
   </td>
 </tr>
 <tr>
@@ -1023,7 +1063,7 @@ Apart from these, the following properties are also available, and may be useful
   <td>false</td>
   <td>
     Whether Spark acls should are enabled. If enabled, this checks to see if the user has
-    access permissions to view or modify the job.  Note this requires the user to be known, 
+    access permissions to view or modify the job.  Note this requires the user to be known,
     so if the user comes across as null no checks are done. Filters can be used with the UI
     to authenticate and set the user.
   </td>
@@ -1062,17 +1102,31 @@ Apart from these, the following properties are also available, and may be useful
   <td><code>spark.streaming.blockInterval</code></td>
   <td>200</td>
   <td>
-    Interval (milliseconds) at which data received by Spark Streaming receivers is coalesced
-    into blocks of data before storing them in Spark.
+    Interval (milliseconds) at which data received by Spark Streaming receivers is chunked
+    into blocks of data before storing them in Spark. Minimum recommended - 50 ms. See the
+    <a href="streaming-programming-guide.html#level-of-parallelism-in-data-receiving">performance
+     tuning</a> section in the Spark Streaming programing guide for more details.
   </td>
 </tr>
 <tr>
   <td><code>spark.streaming.receiver.maxRate</code></td>
   <td>infinite</td>
   <td>
-    Maximum rate (per second) at which each receiver will push data into blocks. Effectively,
-    each stream will consume at most this number of records per second.
+    Maximum number records per second at which each receiver will receive data.
+    Effectively, each stream will consume at most this number of records per second.
     Setting this configuration to 0 or a negative number will put no limit on the rate.
+    See the <a href="streaming-programming-guide.html#deploying-applications">deployment guide</a>
+    in the Spark Streaming programing guide for mode details.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.streaming.receiver.writeAheadLogs.enable</code></td>
+  <td>false</td>
+  <td>
+    Enable write ahead logs for receivers. All the input data received through receivers
+    will be saved to write ahead logs that will allow it to be recovered after driver failures.
+    See the <a href="streaming-programming-guide.html#deploying-applications">deployment guide</a>
+    in the Spark Streaming programing guide for more details.
   </td>
 </tr>
 <tr>
@@ -1086,45 +1140,6 @@ Apart from these, the following properties are also available, and may be useful
     higher memory usage in Spark.
   </td>
 </tr>
-<tr>
-  <td><code>spark.executor.logs.rolling.strategy</code></td>
-  <td>(none)</td>
-  <td>
-    Set the strategy of rolling of executor logs. By default it is disabled. It can
-    be set to "time" (time-based rolling) or "size" (size-based rolling). For "time",
-    use <code>spark.executor.logs.rolling.time.interval</code> to set the rolling interval.
-    For "size", use <code>spark.executor.logs.rolling.size.maxBytes</code> to set
-    the maximum file size for rolling.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.executor.logs.rolling.time.interval</code></td>
-  <td>daily</td>
-  <td>
-    Set the time interval by which the executor logs will be rolled over.
-    Rolling is disabled by default. Valid values are `daily`, `hourly`, `minutely` or
-    any interval in seconds. See <code>spark.executor.logs.rolling.maxRetainedFiles</code>
-    for automatic cleaning of old logs.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.executor.logs.rolling.size.maxBytes</code></td>
-  <td>(none)</td>
-  <td>
-    Set the max size of the file by which the executor logs will be rolled over.
-    Rolling is disabled by default. Value is set in terms of bytes.
-    See <code>spark.executor.logs.rolling.maxRetainedFiles</code>
-    for automatic cleaning of old logs.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.executor.logs.rolling.maxRetainedFiles</code></td>
-  <td>(none)</td>
-  <td>
-    Sets the number of latest rolling log files that are going to be retained by the system.
-    Older log files will be deleted. Disabled by default.
-  </td>
-</tr>
 </table>
 
 #### Cluster Managers

http://git-wip-us.apache.org/repos/asf/spark/blob/b004150a/docs/streaming-custom-receivers.md
----------------------------------------------------------------------
diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md
index 27cd085..6a20481 100644
--- a/docs/streaming-custom-receivers.md
+++ b/docs/streaming-custom-receivers.md
@@ -7,25 +7,30 @@ Spark Streaming can receive streaming data from any arbitrary data source beyond
 the one's for which it has in-built support (that is, beyond Flume, Kafka, Kinesis, files, sockets, etc.).
 This requires the developer to implement a *receiver* that is customized for receiving data from
 the concerned data source. This guide walks through the process of implementing a custom receiver
-and using it in a Spark Streaming application.
+and using it in a Spark Streaming application. Note that custom receivers can be implemented
+in Scala or Java.
 
-### Implementing a Custom Receiver
+## Implementing a Custom Receiver
 
-This starts with implementing a [Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver).
+This starts with implementing a **Receiver**
+([Scala doc](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver),
+[Java doc](api/java/org/apache/spark/streaming/receiver/Receiver.html)).
 A custom receiver must extend this abstract class by implementing two methods
+
 - `onStart()`: Things to do to start receiving data.
 - `onStop()`: Things to do to stop receiving data.
 
-Note that `onStart()` and `onStop()` must not block indefinitely. Typically, onStart() would start the threads
+Both `onStart()` and `onStop()` must not block indefinitely. Typically, `onStart()` would start the threads
 that responsible for receiving the data and `onStop()` would ensure that the receiving by those threads
 are stopped. The receiving threads can also use `isStopped()`, a `Receiver` method, to check whether they
 should stop receiving data.
 
 Once the data is received, that data can be stored inside Spark
-by calling `store(data)`, which is a method provided by the
-[Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver) class.
+by calling `store(data)`, which is a method provided by the Receiver class.
 There are number of flavours of `store()` which allow you store the received data
-record-at-a-time or as whole collection of objects / serialized bytes.
+record-at-a-time or as whole collection of objects / serialized bytes. Note that the flavour of
+`store()` used to implemented a receiver affects its reliability and fault-tolerance semantics.
+This is discussed [later](#receiver-reliability) in more detail.
 
 Any exception in the receiving threads should be caught and handled properly to avoid silent
 failures of the receiver. `restart(<exception>)` will restart the receiver by
@@ -158,7 +163,7 @@ public class JavaCustomReceiver extends Receiver<String> {
 </div>
 
 
-### Using the custom receiver in a Spark Streaming application
+## Using the custom receiver in a Spark Streaming application
 
 The custom receiver can be used in a Spark Streaming application by using
 `streamingContext.receiverStream(<instance of custom receiver>)`. This will create
@@ -191,9 +196,68 @@ The full source code is in the example [JavaCustomReceiver.java](https://github.
 </div>
 </div>
 
-
-
-### Implementing and Using a Custom Actor-based Receiver
+## Receiver Reliability
+As discussed in brief in the
+[Spark Streaming Programming Guide](streaming-programming-guide.html#receiver-reliability),
+there are two kinds of receivers based on their reliability and fault-tolerance semantics.
+
+1. *Reliable Receiver* - For *reliable sources* that allow sent data to be acknowledged, a
+  *reliable receiver* correctly acknowledges to the source that the data has been received
+  and stored in Spark reliably (that is, replicated successfully). Usually,
+  implementing this receiver involves careful consideration of the semantics of source
+  acknowledgements.
+1. *Unreliable Receiver* - These are receivers for unreliable sources that do not support
+  acknowledging. Even for reliable sources, one may implement an unreliable receiver that
+  do not go into the complexity of acknowledging correctly.
+
+To implement a *reliable receiver*, you have to use `store(multiple-records)` to store data.
+This flavour of `store` is a blocking call which returns only after all the given records have
+been stored inside Spark. If the receiver's configured storage level uses replication
+(enabled by default), then this call returns after replication has completed.
+Thus it ensures that the data is reliably stored, and the receiver can now acknowledge the
+source appropriately. This ensures that no data is caused when the receiver fails in the middle
+of replicating data -- the buffered data will not be acknowledged and hence will be later resent
+by the source.
+
+An *unreliable receiver* does not have to implement any of this logic. It can simply receive
+records from the source and insert them one-at-a-time using `store(single-record)`. While it does
+not get the reliability guarantees of `store(multiple-records)`, it has the following advantages.
+
+- The system takes care of chunking that data into appropriate sized blocks (look for block
+interval in the [Spark Streaming Programming Guide](streaming-programming-guide.html)).
+- The system takes care of controlling the receiving rates if the rate limits have been specified.
+- Because of these two, unreliable receivers are simpler to implement than reliable receivers.
+
+The following table summarizes the characteristics of both types of receivers
+
+<table class="table">
+<tr>
+  <th>Receiver Type</th>
+  <th>Characteristics</th>
+</tr>
+<tr>
+  <td><b>Unreliable Receivers</b></td>
+  <td>
+    Simple to implement.<br>
+    System takes care of block generation and rate control.
+    No fault-tolerance guarantees, can lose data on receiver failure.
+  </td>
+</tr>
+<tr>
+  <td><b>Reliable Receivers</b></td>
+  <td>
+    Strong fault-tolerance guarantees, can ensure zero data loss.<br/>
+    Block generation and rate control to be handled by the receiver implementation.<br/>
+    Implementation complexity depends on the acknowledgement mechanisms of the source.
+  </td>
+</tr>
+<tr>
+  <td></td>
+  <td></td>
+</tr>
+</table>
+
+## Implementing and Using a Custom Actor-based Receiver
 
 Custom [Akka Actors](http://doc.akka.io/docs/akka/2.2.4/scala/actors.html) can also be used to
 receive data. The [`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper)
@@ -203,7 +267,7 @@ trait can be applied on any Akka actor, which allows received data to be stored
 {% highlight scala %}
 class CustomActor extends Actor with ActorHelper {
   def receive = {
-   case data: String => store(data)
+    case data: String => store(data)
   }
 }
 {% endhighlight %}
@@ -217,5 +281,3 @@ val lines = ssc.actorStream[String](Props(new CustomActor()), "CustomReceiver")
 
 See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala)
 for an end-to-end example.
-
-

http://git-wip-us.apache.org/repos/asf/spark/blob/b004150a/docs/streaming-flume-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-flume-integration.md b/docs/streaming-flume-integration.md
index d57c3e0..ac01dd3 100644
--- a/docs/streaming-flume-integration.md
+++ b/docs/streaming-flume-integration.md
@@ -66,9 +66,16 @@ configuring Flume agents.
 
 ## Approach 2 (Experimental): Pull-based Approach using a Custom Sink
 Instead of Flume pushing data directly to Spark Streaming, this approach runs a custom Flume sink that allows the following.
+
 - Flume pushes data into the sink, and the data stays buffered.
-- Spark Streaming uses transactions to pull data from the sink. Transactions succeed only after data is received and replicated by Spark Streaming.
-This ensures that better reliability and fault-tolerance than the previous approach. However, this requires configuring Flume to run a custom sink. Here are the configuration steps.
+- Spark Streaming uses a [reliable Flume receiver](streaming-programming-guide.html#receiver-reliability)
+  and transactions to pull data from the sink. Transactions succeed only after data is received and
+  replicated by Spark Streaming.
+
+This ensures stronger reliability and
+[fault-tolerance guarantees](streaming-programming-guide.html#fault-tolerance-semantics)
+than the previous approach. However, this requires configuring Flume to run a custom sink.
+Here are the configuration steps.
 
 #### General Requirements
 Choose a machine that will run the custom sink in a Flume agent. The rest of the Flume pipeline is configured to send data to that agent. Machines in the Spark cluster should have access to the chosen machine running the custom sink.
@@ -104,7 +111,7 @@ See the [Flume's documentation](https://flume.apache.org/documentation.html) for
 configuring Flume agents.
 
 #### Configuring Spark Streaming Application
-1. **Linking:** In your SBT/Maven projrect definition, link your streaming application against the `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide).
+1. **Linking:** In your SBT/Maven project definition, link your streaming application against the `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide).
 
 2. **Programming:** In the streaming application code, import `FlumeUtils` and create input DStream as follows.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b004150a/docs/streaming-kafka-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kafka-integration.md b/docs/streaming-kafka-integration.md
index a3b705d..1c956fc 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -40,3 +40,20 @@ title: Spark Streaming + Kafka Integration Guide
 	- Multiple Kafka input DStreams can be created with different groups and topics for parallel receiving of data using multiple receivers.
 
 3. **Deploying:** Package `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).
+
+Note that the Kafka receiver used by default is an
+[*unreliable* receiver](streaming-programming-guide.html#receiver-reliability) section in the
+programming guide). In Spark 1.2, we have added an experimental *reliable* Kafka receiver that
+provides stronger
+[fault-tolerance guarantees](streaming-programming-guide.html#fault-tolerance-semantics) of zero
+data loss on failures. This receiver is automatically used when the write ahead log
+(also introduced in Spark 1.2) is enabled
+(see [Deployment](#deploying-applications.html) section in the programming guide). This
+may reduce the receiving throughput of individual Kafka receivers compared to the unreliable
+receivers, but this can be corrected by running
+[more receivers in parallel](streaming-programming-guide.html#level-of-parallelism-in-data-receiving)
+to increase aggregate throughput. Additionally, it is recommended that the replication of the
+received data within Spark be disabled when the write ahead log is enabled as the log is already stored
+in a replicated storage system. This can be done by setting the storage level for the input
+stream to `StorageLevel.MEMORY_AND_DISK_SER` (that is, use
+`KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)`).


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org