You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/05/13 01:44:18 UTC

spark git commit: [SPARK-7553] [STREAMING] Added methods to maintain a singleton StreamingContext

Repository: spark
Updated Branches:
  refs/heads/master 96c4846db -> 00e7b09a0


[SPARK-7553] [STREAMING] Added methods to maintain a singleton StreamingContext

In a REPL/notebook environment, its very easy to lose a reference to a StreamingContext by overriding the variable name. So if you happen to execute the following commands
```
val ssc = new StreamingContext(...) // cmd 1
ssc.start() // cmd 2
...
val ssc = new StreamingContext(...) // accidentally run cmd 1 again
```
The value of ssc will be overwritten. Now you can neither start the new context (as only one context can be started), nor stop the previous context (as the reference is lost).
Hence its best to maintain a singleton reference to the active context, so that we never loose reference for the active context.
Since this problem occurs useful in REPL environments, its best to add this as an Experimental support in the Scala API only so that it can be used in Scala REPLs and notebooks.

Author: Tathagata Das <ta...@gmail.com>

Closes #6070 from tdas/SPARK-7553 and squashes the following commits:

731c9a1 [Tathagata Das] Fixed style
a797171 [Tathagata Das] Added more unit tests
19fc70b [Tathagata Das] Added :: Experimental :: in docs
64706c9 [Tathagata Das] Fixed test
634db5d [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7553
3884a25 [Tathagata Das] Fixing test bug
d37a846 [Tathagata Das] Added getActive and getActiveOrCreate


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00e7b09a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00e7b09a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00e7b09a

Branch: refs/heads/master
Commit: 00e7b09a0bee2fcfd0ce34992bd26435758daf26
Parents: 96c4846
Author: Tathagata Das <ta...@gmail.com>
Authored: Tue May 12 16:44:14 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue May 12 16:44:14 2015 -0700

----------------------------------------------------------------------
 .../spark/streaming/StreamingContext.scala      |  61 +++++++-
 .../spark/streaming/StreamingContextSuite.scala | 152 ++++++++++++++++++-
 2 files changed, 202 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/00e7b09a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 8461e90..407cab4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -637,8 +637,10 @@ class StreamingContext private[streaming] (
  */
 
 object StreamingContext extends Logging {
+
   /**
-   * Lock that guards access to global variables that track active StreamingContext.
+   * Lock that guards activation of a StreamingContext as well as access to the singleton active
+   * StreamingContext in getActiveOrCreate().
    */
   private val ACTIVATION_LOCK = new Object()
 
@@ -661,6 +663,18 @@ object StreamingContext extends Logging {
     }
   }
 
+  /**
+   * :: Experimental ::
+   *
+   * Get the currently active context, if there is one. Active means started but not stopped.
+   */
+  @Experimental
+  def getActive(): Option[StreamingContext] = {
+    ACTIVATION_LOCK.synchronized {
+      Option(activeContext.get())
+    }
+  }
+
   @deprecated("Replaced by implicit functions in the DStream companion object. This is " +
     "kept here only for backward compatibility.", "1.3.0")
   def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
@@ -670,6 +684,48 @@ object StreamingContext extends Logging {
   }
 
   /**
+   * :: Experimental ::
+   *
+   * Either return the "active" StreamingContext (that is, started but not stopped), or create a
+   * new StreamingContext that is
+   * @param creatingFunc   Function to create a new StreamingContext
+   */
+  @Experimental
+  def getActiveOrCreate(creatingFunc: () => StreamingContext): StreamingContext = {
+    ACTIVATION_LOCK.synchronized {
+      getActive().getOrElse { creatingFunc() }
+    }
+  }
+
+  /**
+   * :: Experimental ::
+   *
+   * Either get the currently active StreamingContext (that is, started but not stopped),
+   * OR recreate a StreamingContext from checkpoint data in the given path. If checkpoint data
+   * does not exist in the provided, then create a new StreamingContext by calling the provided
+   * `creatingFunc`.
+   *
+   * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
+   * @param creatingFunc   Function to create a new StreamingContext
+   * @param hadoopConf     Optional Hadoop configuration if necessary for reading from the
+   *                       file system
+   * @param createOnError  Optional, whether to create a new StreamingContext if there is an
+   *                       error in reading checkpoint data. By default, an exception will be
+   *                       thrown on error.
+   */
+  @Experimental
+  def getActiveOrCreate(
+      checkpointPath: String,
+      creatingFunc: () => StreamingContext,
+      hadoopConf: Configuration = new Configuration(),
+      createOnError: Boolean = false
+    ): StreamingContext = {
+    ACTIVATION_LOCK.synchronized {
+      getActive().getOrElse { getOrCreate(checkpointPath, creatingFunc, hadoopConf, createOnError) }
+    }
+  }
+
+  /**
    * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
    * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
    * recreated from the checkpoint data. If the data does not exist, then the StreamingContext
@@ -694,7 +750,6 @@ object StreamingContext extends Logging {
     checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
   }
 
-
   /**
    * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
    * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
@@ -761,7 +816,7 @@ object StreamingContext extends Logging {
     ): SparkContext = {
     val conf = SparkContext.updatedConf(
       new SparkConf(), master, appName, sparkHome, jars, environment)
-    createNewSparkContext(conf)
+    new SparkContext(conf)
   }
 
   private[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/00e7b09a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 4729951..5d09b23 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -41,6 +41,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
   val batchDuration = Milliseconds(500)
   val sparkHome = "someDir"
   val envPair = "key" -> "value"
+  val conf = new SparkConf().setMaster(master).setAppName(appName)
 
   var sc: SparkContext = null
   var ssc: StreamingContext = null
@@ -390,23 +391,23 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
       assert(newContextCreated, "new context not created")
     }
 
-    val corrutedCheckpointPath = createCorruptedCheckpoint()
+    val corruptedCheckpointPath = createCorruptedCheckpoint()
 
     // getOrCreate should throw exception with fake checkpoint file and createOnError = false
     intercept[Exception] {
-      ssc = StreamingContext.getOrCreate(corrutedCheckpointPath, creatingFunction _)
+      ssc = StreamingContext.getOrCreate(corruptedCheckpointPath, creatingFunction _)
     }
 
     // getOrCreate should throw exception with fake checkpoint file
     intercept[Exception] {
       ssc = StreamingContext.getOrCreate(
-        corrutedCheckpointPath, creatingFunction _, createOnError = false)
+        corruptedCheckpointPath, creatingFunction _, createOnError = false)
     }
 
     // getOrCreate should create new context with fake checkpoint file and createOnError = true
     testGetOrCreate {
       ssc = StreamingContext.getOrCreate(
-        corrutedCheckpointPath, creatingFunction _, createOnError = true)
+        corruptedCheckpointPath, creatingFunction _, createOnError = true)
       assert(ssc != null, "no context created")
       assert(newContextCreated, "new context not created")
     }
@@ -491,8 +492,145 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
     }
   }
 
+  test("getActive and getActiveOrCreate") {
+    require(StreamingContext.getActive().isEmpty, "context exists from before")
+    sc = new SparkContext(conf)
+
+    var newContextCreated = false
+
+    def creatingFunc(): StreamingContext = {
+      newContextCreated = true
+      val newSsc = new StreamingContext(sc, batchDuration)
+      val input = addInputStream(newSsc)
+      input.foreachRDD { rdd => rdd.count }
+      newSsc
+    }
+
+    def testGetActiveOrCreate(body: => Unit): Unit = {
+      newContextCreated = false
+      try {
+        body
+      } finally {
+
+        if (ssc != null) {
+          ssc.stop(stopSparkContext = false)
+        }
+        ssc = null
+      }
+    }
+
+    // getActiveOrCreate should create new context and getActive should return it only
+    // after starting the context
+    testGetActiveOrCreate {
+      ssc = StreamingContext.getActiveOrCreate(creatingFunc _)
+      assert(ssc != null, "no context created")
+      assert(newContextCreated === true, "new context not created")
+      assert(StreamingContext.getActive().isEmpty,
+        "new initialized context returned before starting")
+      ssc.start()
+      assert(StreamingContext.getActive() === Some(ssc),
+        "active context not returned")
+      assert(StreamingContext.getActiveOrCreate(creatingFunc _) === ssc,
+        "active context not returned")
+      ssc.stop()
+      assert(StreamingContext.getActive().isEmpty,
+        "inactive context returned")
+      assert(StreamingContext.getActiveOrCreate(creatingFunc _) !== ssc,
+        "inactive context returned")
+    }
+
+    // getActiveOrCreate and getActive should return independently created context after activating
+    testGetActiveOrCreate {
+      ssc = creatingFunc()  // Create
+      assert(StreamingContext.getActive().isEmpty,
+        "new initialized context returned before starting")
+      ssc.start()
+      assert(StreamingContext.getActive() === Some(ssc),
+        "active context not returned")
+      assert(StreamingContext.getActiveOrCreate(creatingFunc _) === ssc,
+        "active context not returned")
+      ssc.stop()
+      assert(StreamingContext.getActive().isEmpty,
+        "inactive context returned")
+    }
+  }
+
+  test("getActiveOrCreate with checkpoint") {
+    // Function to create StreamingContext that has a config to identify it to be new context
+    var newContextCreated = false
+    def creatingFunction(): StreamingContext = {
+      newContextCreated = true
+      new StreamingContext(conf, batchDuration)
+    }
+
+    // Call ssc.stop after a body of code
+    def testGetActiveOrCreate(body: => Unit): Unit = {
+      require(StreamingContext.getActive().isEmpty) // no active context
+      newContextCreated = false
+      try {
+        body
+      } finally {
+        if (ssc != null) {
+          ssc.stop()
+        }
+        ssc = null
+      }
+    }
+
+    val emptyPath = Utils.createTempDir().getAbsolutePath()
+    val corruptedCheckpointPath = createCorruptedCheckpoint()
+    val checkpointPath = createValidCheckpoint()
+
+    // getActiveOrCreate should return the current active context if there is one
+    testGetActiveOrCreate {
+      ssc = new StreamingContext(
+        conf.clone.set("spark.streaming.clock", "org.apache.spark.util.ManualClock"), batchDuration)
+      addInputStream(ssc).register()
+      ssc.start()
+      val returnedSsc = StreamingContext.getActiveOrCreate(checkpointPath, creatingFunction _)
+      assert(!newContextCreated, "new context created instead of returning")
+      assert(returnedSsc.eq(ssc), "returned context is not the activated context")
+    }
+
+    // getActiveOrCreate should create new context with empty path
+    testGetActiveOrCreate {
+      ssc = StreamingContext.getActiveOrCreate(emptyPath, creatingFunction _)
+      assert(ssc != null, "no context created")
+      assert(newContextCreated, "new context not created")
+    }
+
+    // getActiveOrCreate should throw exception with fake checkpoint file and createOnError = false
+    intercept[Exception] {
+      ssc = StreamingContext.getOrCreate(corruptedCheckpointPath, creatingFunction _)
+    }
+
+    // getActiveOrCreate should throw exception with fake checkpoint file
+    intercept[Exception] {
+      ssc = StreamingContext.getActiveOrCreate(
+        corruptedCheckpointPath, creatingFunction _, createOnError = false)
+    }
+
+    // getActiveOrCreate should create new context with fake
+    // checkpoint file and createOnError = true
+    testGetActiveOrCreate {
+      ssc = StreamingContext.getActiveOrCreate(
+        corruptedCheckpointPath, creatingFunction _, createOnError = true)
+      assert(ssc != null, "no context created")
+      assert(newContextCreated, "new context not created")
+    }
+
+    // getActiveOrCreate should recover context with checkpoint path, and recover old configuration
+    testGetActiveOrCreate {
+      ssc = StreamingContext.getActiveOrCreate(checkpointPath, creatingFunction _)
+      assert(ssc != null, "no context created")
+      assert(!newContextCreated, "old context not recovered")
+      assert(ssc.conf.get("someKey") === "someValue")
+    }
+  }
+
   test("multiple streaming contexts") {
-    sc = new SparkContext(new SparkConf().setMaster(master).setAppName(appName))
+    sc = new SparkContext(
+      conf.clone.set("spark.streaming.clock", "org.apache.spark.util.ManualClock"))
     ssc = new StreamingContext(sc, Seconds(1))
     val input = addInputStream(ssc)
     input.foreachRDD { rdd => rdd.count }
@@ -522,9 +660,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
   def createValidCheckpoint(): String = {
     val testDirectory = Utils.createTempDir().getAbsolutePath()
     val checkpointDirectory = Utils.createTempDir().getAbsolutePath()
-    val conf = new SparkConf().setMaster(master).setAppName(appName)
-    conf.set("someKey", "someValue")
-    ssc = new StreamingContext(conf, batchDuration)
+    ssc = new StreamingContext(conf.clone.set("someKey", "someValue"), batchDuration)
     ssc.checkpoint(checkpointDirectory)
     ssc.textFileStream(testDirectory).foreachRDD { rdd => rdd.count() }
     ssc.start()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org