You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:26:04 UTC

[10/20] git commit: Fixed bugs in reading of checkpoints.

Fixed bugs in reading of checkpoints.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/4a5558ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/4a5558ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/4a5558ca

Branch: refs/heads/master
Commit: 4a5558ca9921ce89b3996e9ead13b07123fc7a2d
Parents: f1d206c
Author: Tathagata Das <ta...@gmail.com>
Authored: Fri Jan 10 03:28:39 2014 +0000
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Jan 10 03:28:39 2014 +0000

----------------------------------------------------------------------
 .../org/apache/spark/streaming/Checkpoint.scala | 20 ++++++++++++++++----
 .../spark/streaming/StreamingContext.scala      | 17 ++++-------------
 2 files changed, 20 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4a5558ca/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 476ae70..d268b68 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -165,16 +165,28 @@ object CheckpointReader extends Logging {
   def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] = {
     val checkpointPath = new Path(checkpointDir)
     def fs = checkpointPath.getFileSystem(hadoopConf)
-    val existingFiles = graphFileNames.map(new Path(checkpointPath, _)).filter(fs.exists)
+    
+        // See if the checkpoint directory exists
+    if (!fs.exists(checkpointPath)) {
+      logInfo("Could not load checkpoint as path '" + checkpointPath + "' does not exist")
+      return None
+    }
 
-    // Log the file listing if graph checkpoint file was not found
+    // Try to find the checkpoint data
+    val existingFiles = graphFileNames.map(new Path(checkpointPath, _)).filter(fs.exists)
     if (existingFiles.isEmpty) {
-      logInfo("Could not find graph file in " + checkpointDir + ", which contains the files:\n" +
-        fs.listStatus(checkpointPath).mkString("\n"))
+      logInfo("Could not load checkpoint as checkpoint data was not " +
+        "found in directory " + checkpointDir + "")
+      val statuses = fs.listStatus(checkpointPath)
+      if (statuses!=null) {
+        logInfo("Checkpoint directory " + checkpointDir + " contains the files:\n" +
+          statuses.mkString("\n"))
+      }
       return None
     }
     logInfo("Checkpoint files found: " + existingFiles.mkString(","))
 
+    // Try to read the checkpoint data
     val compressionCodec = CompressionCodec.createCodec(conf)
     existingFiles.foreach(file => {
       logInfo("Attempting to load checkpoint from file '" + file + "'")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4a5558ca/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 76be816..dd34f6f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -496,26 +496,17 @@ object StreamingContext extends Logging {
       hadoopConf: Configuration = new Configuration(),
       createOnError: Boolean = false
     ): StreamingContext = {
-
-    try {
-      CheckpointReader.read(checkpointPath,  new SparkConf(), hadoopConf) match {
-        case Some(checkpoint) =>
-          return new StreamingContext(null, checkpoint, null)
-        case None =>
-          logInfo("Creating new StreamingContext")
-          return creatingFunc()
-      }
+    val checkpointOption = try {
+      CheckpointReader.read(checkpointPath,  new SparkConf(), hadoopConf)
     } catch {
       case e: Exception =>
         if (createOnError) {
-          logWarning("Error reading checkpoint", e)
-          logInfo("Creating new StreamingContext")
-          return creatingFunc()
+          None
         } else {
-          logError("Error reading checkpoint", e)
           throw e
         }
     }
+    checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
   }
 
   /**