You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/04/06 04:57:26 UTC

spark git commit: [SPARK-13211][STREAMING] StreamingContext throws NoSuchElementException when created from non-existent checkpoint directory

Repository: spark
Updated Branches:
  refs/heads/master 7d29c72f6 -> 8e5c1cbf2


[SPARK-13211][STREAMING] StreamingContext throws NoSuchElementException when created from non-existent checkpoint directory

## What changes were proposed in this pull request?

Take 2: avoid None.get NoSuchElementException in favor of more descriptive IllegalArgumentException if a non-existent checkpoint dir is used without a SparkContext

## How was this patch tested?

Jenkins test plus new test for this particular case

Author: Sean Owen <so...@cloudera.com>

Closes #12174 from srowen/SPARK-13211.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8e5c1cbf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8e5c1cbf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8e5c1cbf

Branch: refs/heads/master
Commit: 8e5c1cbf2c3d5eaa7d9dd35def177414a0d4cf82
Parents: 7d29c72
Author: Sean Owen <so...@cloudera.com>
Authored: Tue Apr 5 19:57:23 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Tue Apr 5 19:57:23 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/streaming/Checkpoint.scala    |  3 +--
 .../org/apache/spark/streaming/StreamingContext.scala    | 11 ++++-------
 .../org/apache/spark/streaming/CheckpointSuite.scala     |  5 +++++
 3 files changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8e5c1cbf/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index f9f3d97..5cc677d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -334,8 +334,7 @@ object CheckpointReader extends Logging {
       ignoreReadError: Boolean = false): Option[Checkpoint] = {
     val checkpointPath = new Path(checkpointDir)
 
-    // TODO(rxin): Why is this a def?!
-    def fs: FileSystem = checkpointPath.getFileSystem(hadoopConf)
+    val fs = checkpointPath.getFileSystem(hadoopConf)
 
     // Try to find the checkpoint files
     val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).reverse

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5c1cbf/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ac37e8e..83a1092 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -106,7 +106,7 @@ class StreamingContext private[streaming] (
    *                   HDFS compatible filesystems
    */
   def this(path: String, hadoopConf: Configuration) =
-    this(null, CheckpointReader.read(path, new SparkConf(), hadoopConf).get, null)
+    this(null, CheckpointReader.read(path, new SparkConf(), hadoopConf).orNull, null)
 
   /**
    * Recreate a StreamingContext from a checkpoint file.
@@ -122,15 +122,12 @@ class StreamingContext private[streaming] (
   def this(path: String, sparkContext: SparkContext) = {
     this(
       sparkContext,
-      CheckpointReader.read(path, sparkContext.conf, sparkContext.hadoopConfiguration).get,
+      CheckpointReader.read(path, sparkContext.conf, sparkContext.hadoopConfiguration).orNull,
       null)
   }
 
-
-  if (_sc == null && _cp == null) {
-    throw new Exception("Spark Streaming cannot be initialized with " +
-      "both SparkContext and checkpoint as null")
-  }
+  require(_sc != null || _cp != null,
+    "Spark Streaming cannot be initialized with both SparkContext and checkpoint as null")
 
   private[streaming] val isCheckpointPresent: Boolean = _cp != null
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5c1cbf/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 9a3248b..fbb25d4 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -228,6 +228,11 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
     }
   }
 
+  test("non-existent checkpoint dir") {
+    // SPARK-13211
+    intercept[IllegalArgumentException](new StreamingContext("nosuchdirectory"))
+  }
+
   test("basic rdd checkpoints + dstream graph checkpoint recovery") {
 
     assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org