You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2014/01/11 01:26:11 UTC

[17/20] git commit: Updated docs based on Patrick's comments in PR 383.

Updated docs based on Patrick's comments in PR 383.


Branch: refs/heads/master
Commit: e4bb845238d0df48f8258e925caf9af5a107af46
Parents: 2213a5a
Author: Tathagata Das <>
Authored: Fri Jan 10 12:17:09 2014 -0800
Committer: Tathagata Das <>
Committed: Fri Jan 10 12:17:09 2014 -0800

 .../apache/spark/util/TimeStampedHashMap.scala  |  4 +-
 .../streaming/examples/NetworkWordCount.scala   |  3 +-
 .../examples/RecoverableNetworkWordCount.scala  | 49 +++++++++++++++-----
 .../org/apache/spark/streaming/Checkpoint.scala | 13 ++++--
 .../api/java/JavaStreamingContext.scala         | 14 +++---
 5 files changed, 58 insertions(+), 25 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
index dde504f..8e07a0f 100644
--- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
@@ -27,8 +27,8 @@ import org.apache.spark.Logging
  * This is a custom implementation of scala.collection.mutable.Map which stores the insertion
  * timestamp along with each key-value pair. If specified, the timestamp of each pair can be
- * updated every it is accessed. Key-value pairs whose timestamp are older than a particular
- * threshold time can them be removed using the clearOldValues method. This is intended to
+ * updated every time it is accessed. Key-value pairs whose timestamp are older than a particular
+ * threshold time can then be removed using the clearOldValues method. This is intended to
  * be a drop-in replacement of scala.collection.mutable.HashMap.
  * @param updateTimeStampOnGet When enabled, the timestamp of a pair will be
  *                             updated when it is accessed
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
index aba1704..4b896ea 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
@@ -21,7 +21,8 @@ import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext._
- * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ * Counts words in text encoded with UTF8 received from the network every second.
+ *
  * Usage: NetworkWordCount <master> <hostname> <port>
  *   <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
  *   <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
index 739f805..d51e6e9 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
@@ -26,18 +26,41 @@ import
 import java.nio.charset.Charset
- * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
- * Usage: NetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-directory>
+ * Counts words in text encoded with UTF8 received from the network every second.
+ *
+ * Usage: NetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-file>
  *   <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
  *   <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
- *   <checkpoint-directory> directory in a Hadoop compatible file system to which checkpoint
- *                          data will be saved to; this must be a fault-tolerant file system
- *                          like HDFS for the system to recover from driver failures
- *   <checkpoint-
+ *   <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
+ *   <output-file> file to which the word counts will be appended
+ *
+ * In local mode, <master> should be 'local[n]' with n > 1
+ * <checkpoint-directory> and <output-file> must be absolute paths
+ *
+ *
  * To run this on your local machine, you need to first run a Netcat server
- *    `$ nc -lk 9999`
- * and then run the example
- *    `$ ./run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999`
+ *
+ *      `$ nc -lk 9999`
+ *
+ * and run the example as
+ *
+ *      `$ ./run-example org.apache.spark.streaming.examples.RecoverableNetworkWordCount \
+ *              local[2] localhost 9999 ~/checkpoint/ ~/out`
+ *
+ * If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
+ * a new StreamingContext (will print "Creating new context" to the console). Otherwise, if
+ * checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
+ * the checkpoint data.
+ *
+ * To run this example in a local standalone cluster with automatic driver recovery,
+ *
+ *      `$ ./spark-class org.apache.spark.deploy.Client -s launch <cluster-url> <path-to-examples-jar> \
+ *              org.apache.spark.streaming.examples.RecoverableNetworkWordCount <cluster-url> \
+ *              localhost 9999 ~/checkpoint ~/out`
+ *
+ * <path-to-examples-jar> would typically be <spark-dir>/examples/target/scala-XX/spark-examples....jar
+ *
+ * Refer to the online documentation for more details.
 object RecoverableNetworkWordCount {
@@ -52,7 +75,7 @@ object RecoverableNetworkWordCount {
     // Create the context with a 1 second batch size
     val ssc = new StreamingContext(master, "RecoverableNetworkWordCount", Seconds(1),
-      System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+      System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
     // Create a NetworkInputDStream on target ip:port and count the
     // words in input stream of \n delimited text (eg. generated by 'nc')
@@ -74,9 +97,13 @@ object RecoverableNetworkWordCount {
           |Usage: RecoverableNetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-file>
+          |     <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+          |     <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+          |     <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
+          |     <output-file> file to which the word counts will be appended
           |In local mode, <master> should be 'local[n]' with n > 1
-          |Both <checkpoint-directory> and <output-file> should be full paths
+          |Both <checkpoint-directory> and <output-file> must be absolute paths
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 62b2253..1249ef4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -43,8 +43,9 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
   val pendingTimes = ssc.scheduler.getPendingTimes()
   val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
   val sparkConf = ssc.conf
-  // do not save these configurations
+  // These should be unset when a checkpoint is deserialized,
+  // otherwise the SparkContext won't initialize correctly.
   def validate() {
@@ -102,8 +103,12 @@ object Checkpoint extends Logging {
  * Convenience class to handle the writing of graph checkpoint to file
-class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDir: String, hadoopConf: Configuration)
-  extends Logging {
+class CheckpointWriter(
+    jobGenerator: JobGenerator,
+    conf: SparkConf,
+    checkpointDir: String,
+    hadoopConf: Configuration
+  ) extends Logging {
   val MAX_ATTEMPTS = 3
   val executor = Executors.newFixedThreadPool(1)
   val compressionCodec = CompressionCodec.createCodec(conf)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index d96e9ac..523173d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -489,15 +489,15 @@ class JavaStreamingContext(val ssc: StreamingContext) {
- * JavaStreamingContext object contains a number of static utility functions.
+ * JavaStreamingContext object contains a number of utility functions.
 object JavaStreamingContext {
    * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
    * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
-   * recreated from the checkpoint data. If the data does not exist, then the StreamingContext
-   * will be created by called the provided `creatingFunc`.
+   * recreated from the checkpoint data. If the data does not exist, then the provided factory
+   * will be used to create a JavaStreamingContext.
    * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program
    * @param factory        JavaStreamingContextFactory object to create a new JavaStreamingContext
@@ -515,8 +515,8 @@ object JavaStreamingContext {
    * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
    * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
-   * recreated from the checkpoint data. If the data does not exist, then the StreamingContext
-   * will be created by called the provided `creatingFunc`.
+   * recreated from the checkpoint data. If the data does not exist, then the provided factory
+   * will be used to create a JavaStreamingContext.
    * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
    * @param factory        JavaStreamingContextFactory object to create a new JavaStreamingContext
@@ -537,8 +537,8 @@ object JavaStreamingContext {
    * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
    * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
-   * recreated from the checkpoint data. If the data does not exist, then the StreamingContext
-   * will be created by called the provided `creatingFunc`.
+   * recreated from the checkpoint data. If the data does not exist, then the provided factory
+   * will be used to create a JavaStreamingContext.
    * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
    * @param factory        JavaStreamingContextFactory object to create a new JavaStreamingContext