You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/04/18 03:29:13 UTC

spark git commit: [SPARK-6703][Core] Provide a way to discover existing SparkContext's

Repository: spark
Updated Branches:
  refs/heads/master a452c5921 -> c5ed51013


[SPARK-6703][Core] Provide a way to discover existing SparkContext's

I've added a static getOrCreate method to the static SparkContext object that allows one to either retrieve a previously created SparkContext or to instantiate a new one with the provided config. The method accepts an optional SparkConf to make usage intuitive.

Still working on a test for this, basically want to create a new context from scratch, then ensure that subsequent calls don't overwrite that.

Author: Ilya Ganelin <il...@capitalone.com>

Closes #5501 from ilganeli/SPARK-6703 and squashes the following commits:

db9a963 [Ilya Ganelin] Closing second spark context
1dc0444 [Ilya Ganelin] Added ref equality check
8c884fa [Ilya Ganelin] Made getOrCreate synchronized
cb0c6b7 [Ilya Ganelin] Doc updates and code cleanup
270cfe3 [Ilya Ganelin] [SPARK-6703] Documentation fixes
15e8dea [Ilya Ganelin] Updated comments and added MiMa Exclude
0e1567c [Ilya Ganelin] Got rid of unecessary option for AtomicReference
dfec4da [Ilya Ganelin] Changed activeContext to AtomicReference
733ec9f [Ilya Ganelin] Fixed some bugs in test code
8be2f83 [Ilya Ganelin] Replaced match with if
e92caf7 [Ilya Ganelin] [SPARK-6703] Added test to ensure that getOrCreate both allows creation, retrieval, and a second context if desired
a99032f [Ilya Ganelin] Spacing fix
d7a06b8 [Ilya Ganelin] Updated SparkConf class to add getOrCreate method. Started test suite implementation


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5ed5101
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5ed5101
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5ed5101

Branch: refs/heads/master
Commit: c5ed510135aee3a1a0402057b3b5229892aa6f3a
Parents: a452c59
Author: Ilya Ganelin <il...@capitalone.com>
Authored: Fri Apr 17 18:28:42 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Fri Apr 17 18:28:42 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 49 +++++++++++++++++---
 .../org/apache/spark/SparkContextSuite.scala    | 20 ++++++++
 project/MimaExcludes.scala                      |  4 ++
 3 files changed, 66 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c5ed5101/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e106c5c..86269ea 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -23,7 +23,7 @@ import java.io._
 import java.lang.reflect.Constructor
 import java.net.URI
 import java.util.{Arrays, Properties, UUID}
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean, AtomicInteger}
 import java.util.UUID.randomUUID
 
 import scala.collection.{Map, Set}
@@ -1887,11 +1887,12 @@ object SparkContext extends Logging {
   private val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object()
 
   /**
-   * The active, fully-constructed SparkContext.  If no SparkContext is active, then this is `None`.
+   * The active, fully-constructed SparkContext.  If no SparkContext is active, then this is `null`.
    *
-   * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK
+   * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK.
    */
-  private var activeContext: Option[SparkContext] = None
+  private val activeContext: AtomicReference[SparkContext] = 
+    new AtomicReference[SparkContext](null)
 
   /**
    * Points to a partially-constructed SparkContext if some thread is in the SparkContext
@@ -1926,7 +1927,8 @@ object SparkContext extends Logging {
           logWarning(warnMsg)
         }
 
-        activeContext.foreach { ctx =>
+        if (activeContext.get() != null) {
+          val ctx = activeContext.get()
           val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." +
             " To ignore this error, set spark.driver.allowMultipleContexts = true. " +
             s"The currently running SparkContext was created at:\n${ctx.creationSite.longForm}"
@@ -1942,6 +1944,39 @@ object SparkContext extends Logging {
   }
 
   /**
+   * This function may be used to get or instantiate a SparkContext and register it as a 
+   * singleton object. Because we can only have one active SparkContext per JVM, 
+   * this is useful when applications may wish to share a SparkContext. 
+   *
+   * Note: This function cannot be used to create multiple SparkContext instances 
+   * even if multiple contexts are allowed.
+   */
+  def getOrCreate(config: SparkConf): SparkContext = {
+    // Synchronize to ensure that multiple create requests don't trigger an exception
+    // from assertNoOtherContextIsRunning within setActiveContext
+    SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
+      if (activeContext.get() == null) {
+        setActiveContext(new SparkContext(config), allowMultipleContexts = false)
+      }
+      activeContext.get()
+    }
+  }
+  
+  /**
+   * This function may be used to get or instantiate a SparkContext and register it as a 
+   * singleton object. Because we can only have one active SparkContext per JVM, 
+   * this is useful when applications may wish to share a SparkContext.
+   * 
+   * This method allows not passing a SparkConf (useful if just retrieving).
+   * 
+   * Note: This function cannot be used to create multiple SparkContext instances 
+   * even if multiple contexts are allowed. 
+   */ 
+  def getOrCreate(): SparkContext = {
+    getOrCreate(new SparkConf())
+  }
+
+  /**
    * Called at the beginning of the SparkContext constructor to ensure that no SparkContext is
    * running.  Throws an exception if a running context is detected and logs a warning if another
    * thread is constructing a SparkContext.  This warning is necessary because the current locking
@@ -1967,7 +2002,7 @@ object SparkContext extends Logging {
     SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
       assertNoOtherContextIsRunning(sc, allowMultipleContexts)
       contextBeingConstructed = None
-      activeContext = Some(sc)
+      activeContext.set(sc)
     }
   }
 
@@ -1978,7 +2013,7 @@ object SparkContext extends Logging {
    */
   private[spark] def clearActiveContext(): Unit = {
     SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
-      activeContext = None
+      activeContext.set(null)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c5ed5101/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 94be1c6..728558a 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -67,6 +67,26 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test("Test getOrCreate") {
+    var sc2: SparkContext = null
+    SparkContext.clearActiveContext()
+    val conf = new SparkConf().setAppName("test").setMaster("local")
+    
+    sc = SparkContext.getOrCreate(conf)
+    
+    assert(sc.getConf.get("spark.app.name").equals("test"))
+    sc2 = SparkContext.getOrCreate(new SparkConf().setAppName("test2").setMaster("local"))
+    assert(sc2.getConf.get("spark.app.name").equals("test"))
+    assert(sc === sc2)
+    assert(sc eq sc2)
+    
+    // Try creating second context to confirm that it's still possible, if desired
+    sc2 = new SparkContext(new SparkConf().setAppName("test3").setMaster("local")
+        .set("spark.driver.allowMultipleContexts", "true"))
+    
+    sc2.stop()
+  }
+  
   test("BytesWritable implicit conversion is correct") {
     // Regression test for SPARK-3121
     val bytesWritable = new BytesWritable()

http://git-wip-us.apache.org/repos/asf/spark/blob/c5ed5101/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 1564bab..7ef363a 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -68,6 +68,10 @@ object MimaExcludes {
             // SPARK-6693 add tostring with max lines and width for matrix
             ProblemFilters.exclude[MissingMethodProblem](
               "org.apache.spark.mllib.linalg.Matrix.toString")
+          )++ Seq(
+            // SPARK-6703 Add getOrCreate method to SparkContext
+            ProblemFilters.exclude[IncompatibleResultTypeProblem]
+                ("org.apache.spark.SparkContext.org$apache$spark$SparkContext$$activeContext")
           )
 
         case v if v.startsWith("1.3") =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org