You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/05/14 02:33:18 UTC

spark git commit: [SPARK-6752] [STREAMING] [REVISED] Allow StreamingContext to be recreated from checkpoint and existing SparkContext

Repository: spark
Updated Branches:
  refs/heads/master 59aaa1dad -> bce00dac4


[SPARK-6752] [STREAMING] [REVISED] Allow StreamingContext to be recreated from checkpoint and existing SparkContext

This is a revision of the earlier version (see #5773) that passed the active SparkContext explicitly through a new set of Java and Scala API. The drawbacks are.

* Hard to implement in python.
* New API introduced. This is even more confusing since we are introducing getActiveOrCreate in SPARK-7553

Furthermore, there is now a direct way get an existing active SparkContext or create a new on - SparkContext.getOrCreate(conf). Its better to use this to get the SparkContext rather than have a new API to explicitly pass the context.

So in this PR I have
* Removed the new versions of StreamingContext.getOrCreate() which took SparkContext
* Added the ability to pick up existing SparkContext when the StreamingContext tries to create a SparkContext.

Author: Tathagata Das <ta...@gmail.com>

Closes #6096 from tdas/SPARK-6752 and squashes the following commits:

53f4b2d [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-6752
f024b77 [Tathagata Das] Removed extra API and used SparkContext.getOrCreate


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bce00dac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bce00dac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bce00dac

Branch: refs/heads/master
Commit: bce00dac403d3be2be59218b7b93a56c34c68f1a
Parents: 59aaa1d
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed May 13 17:33:15 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed May 13 17:33:15 2015 -0700

----------------------------------------------------------------------
 .../spark/streaming/StreamingContext.scala      | 49 +-------------
 .../api/java/JavaStreamingContext.scala         | 45 -------------
 .../apache/spark/streaming/JavaAPISuite.java    | 25 +------
 .../spark/streaming/StreamingContextSuite.scala | 70 ++------------------
 4 files changed, 9 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bce00dac/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 407cab4..1d2ecdd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -134,7 +134,7 @@ class StreamingContext private[streaming] (
     if (sc_ != null) {
       sc_
     } else if (isCheckpointPresent) {
-      new SparkContext(cp_.createSparkConf())
+      SparkContext.getOrCreate(cp_.createSparkConf())
     } else {
       throw new SparkException("Cannot create StreamingContext without a SparkContext")
     }
@@ -751,53 +751,6 @@ object StreamingContext extends Logging {
   }
 
   /**
-   * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
-   * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
-   * recreated from the checkpoint data. If the data does not exist, then the StreamingContext
-   * will be created by called the provided `creatingFunc` on the provided `sparkContext`. Note
-   * that the SparkConf configuration in the checkpoint data will not be restored as the
-   * SparkContext has already been created.
-   *
-   * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
-   * @param creatingFunc   Function to create a new StreamingContext using the given SparkContext
-   * @param sparkContext   SparkContext using which the StreamingContext will be created
-   */
-  def getOrCreate(
-      checkpointPath: String,
-      creatingFunc: SparkContext => StreamingContext,
-      sparkContext: SparkContext
-    ): StreamingContext = {
-    getOrCreate(checkpointPath, creatingFunc, sparkContext, createOnError = false)
-  }
-
-  /**
-   * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
-   * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
-   * recreated from the checkpoint data. If the data does not exist, then the StreamingContext
-   * will be created by called the provided `creatingFunc` on the provided `sparkContext`. Note
-   * that the SparkConf configuration in the checkpoint data will not be restored as the
-   * SparkContext has already been created.
-   *
-   * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
-   * @param creatingFunc   Function to create a new StreamingContext using the given SparkContext
-   * @param sparkContext   SparkContext using which the StreamingContext will be created
-   * @param createOnError  Whether to create a new StreamingContext if there is an
-   *                       error in reading checkpoint data. By default, an exception will be
-   *                       thrown on error.
-   */
-  def getOrCreate(
-      checkpointPath: String,
-      creatingFunc: SparkContext => StreamingContext,
-      sparkContext: SparkContext,
-      createOnError: Boolean
-    ): StreamingContext = {
-    val checkpointOption = CheckpointReader.read(
-      checkpointPath, sparkContext.conf, sparkContext.hadoopConfiguration, createOnError)
-    checkpointOption.map(new StreamingContext(sparkContext, _, null))
-                    .getOrElse(creatingFunc(sparkContext))
-  }
-
-  /**
    * Find the JAR from which a given class was loaded, to make it easy for users to pass
    * their JARs to StreamingContext.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/bce00dac/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index d8fbed2..b639b94 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -805,51 +805,6 @@ object JavaStreamingContext {
   }
 
   /**
-   * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
-   * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
-   * recreated from the checkpoint data. If the data does not exist, then the provided factory
-   * will be used to create a JavaStreamingContext.
-   *
-   * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
-   * @param creatingFunc   Function to create a new JavaStreamingContext
-   * @param sparkContext   SparkContext using which the StreamingContext will be created
-   */
-  def getOrCreate(
-      checkpointPath: String,
-      creatingFunc: JFunction[JavaSparkContext, JavaStreamingContext],
-      sparkContext: JavaSparkContext
-    ): JavaStreamingContext = {
-    val ssc = StreamingContext.getOrCreate(checkpointPath, (sparkContext: SparkContext) => {
-      creatingFunc.call(new JavaSparkContext(sparkContext)).ssc
-    }, sparkContext.sc)
-    new JavaStreamingContext(ssc)
-  }
-
-  /**
-   * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
-   * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
-   * recreated from the checkpoint data. If the data does not exist, then the provided factory
-   * will be used to create a JavaStreamingContext.
-   *
-   * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
-   * @param creatingFunc   Function to create a new JavaStreamingContext
-   * @param sparkContext   SparkContext using which the StreamingContext will be created
-   * @param createOnError  Whether to create a new JavaStreamingContext if there is an
-   *                       error in reading checkpoint data.
-   */
-  def getOrCreate(
-      checkpointPath: String,
-      creatingFunc: JFunction[JavaSparkContext, JavaStreamingContext],
-      sparkContext: JavaSparkContext,
-      createOnError: Boolean
-    ): JavaStreamingContext = {
-    val ssc = StreamingContext.getOrCreate(checkpointPath, (sparkContext: SparkContext) => {
-      creatingFunc.call(new JavaSparkContext(sparkContext)).ssc
-    }, sparkContext.sc, createOnError)
-    new JavaStreamingContext(ssc)
-  }
-
-  /**
    * Find the JAR from which a given class was loaded, to make it easy for users to pass
    * their JARs to StreamingContext.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/bce00dac/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 2e00b98..1077b1b 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -1766,29 +1766,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertTrue("old context not recovered", !newContextCreated.get());
     ssc.stop();
 
-    // Function to create JavaStreamingContext using existing JavaSparkContext
-    // without any output operations (used to detect the new context)
-    Function<JavaSparkContext, JavaStreamingContext> creatingFunc2 =
-        new Function<JavaSparkContext, JavaStreamingContext>() {
-          public JavaStreamingContext call(JavaSparkContext context) {
-            newContextCreated.set(true);
-            return new JavaStreamingContext(context, Seconds.apply(1));
-          }
-        };
-
-    JavaSparkContext sc = new JavaSparkContext(conf);
-    newContextCreated.set(false);
-    ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc2, sc);
-    Assert.assertTrue("new context not created", newContextCreated.get());
-    ssc.stop(false);
-
     newContextCreated.set(false);
-    ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc2, sc, true);
-    Assert.assertTrue("new context not created", newContextCreated.get());
-    ssc.stop(false);
-
-    newContextCreated.set(false);
-    ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc2, sc);
+    JavaSparkContext sc = new JavaSparkContext(conf);
+    ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
+        new org.apache.hadoop.conf.Configuration());
     Assert.assertTrue("old context not recovered", !newContextCreated.get());
     ssc.stop();
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/bce00dac/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 5f93332..4b12aff 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -419,76 +419,16 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
       ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
       assert(ssc != null, "no context created")
       assert(!newContextCreated, "old context not recovered")
-      assert(ssc.conf.get("someKey") === "someValue")
-    }
-  }
-
-  test("getOrCreate with existing SparkContext") {
-    val conf = new SparkConf().setMaster(master).setAppName(appName)
-    sc = new SparkContext(conf)
-
-    // Function to create StreamingContext that has a config to identify it to be new context
-    var newContextCreated = false
-    def creatingFunction(sparkContext: SparkContext): StreamingContext = {
-      newContextCreated = true
-      new StreamingContext(sparkContext, batchDuration)
-    }
-
-    // Call ssc.stop(stopSparkContext = false) after a body of cody
-    def testGetOrCreate(body: => Unit): Unit = {
-      newContextCreated = false
-      try {
-        body
-      } finally {
-        if (ssc != null) {
-          ssc.stop(stopSparkContext = false)
-        }
-        ssc = null
-      }
-    }
-
-    val emptyPath = Utils.createTempDir().getAbsolutePath()
-
-    // getOrCreate should create new context with empty path
-    testGetOrCreate {
-      ssc = StreamingContext.getOrCreate(emptyPath, creatingFunction _, sc, createOnError = true)
-      assert(ssc != null, "no context created")
-      assert(newContextCreated, "new context not created")
-      assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
+      assert(ssc.conf.get("someKey") === "someValue", "checkpointed config not recovered")
     }
 
-    val corrutedCheckpointPath = createCorruptedCheckpoint()
-
-    // getOrCreate should throw exception with fake checkpoint file and createOnError = false
-    intercept[Exception] {
-      ssc = StreamingContext.getOrCreate(corrutedCheckpointPath, creatingFunction _, sc)
-    }
-
-    // getOrCreate should throw exception with fake checkpoint file
-    intercept[Exception] {
-      ssc = StreamingContext.getOrCreate(
-        corrutedCheckpointPath, creatingFunction _, sc, createOnError = false)
-    }
-
-    // getOrCreate should create new context with fake checkpoint file and createOnError = true
-    testGetOrCreate {
-      ssc = StreamingContext.getOrCreate(
-        corrutedCheckpointPath, creatingFunction _, sc, createOnError = true)
-      assert(ssc != null, "no context created")
-      assert(newContextCreated, "new context not created")
-      assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
-    }
-
-    val checkpointPath = createValidCheckpoint()
-
-    // StreamingContext.getOrCreate should recover context with checkpoint path
+    // getOrCreate should recover StreamingContext with existing SparkContext
     testGetOrCreate {
-      ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _, sc)
+      sc = new SparkContext(conf)
+      ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
       assert(ssc != null, "no context created")
       assert(!newContextCreated, "old context not recovered")
-      assert(ssc.sparkContext === sc, "new StreamingContext does not use existing SparkContext")
-      assert(!ssc.conf.contains("someKey"),
-        "recovered StreamingContext unexpectedly has old config")
+      assert(!ssc.conf.contains("someKey"), "checkpointed config unexpectedly recovered")
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org