You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by ch...@apache.org on 2017/02/21 18:36:59 UTC

incubator-toree git commit: TOREE-387: Always use SparkSession.builder.getOrCreate.

Repository: incubator-toree
Updated Branches:
  refs/heads/master 349927253 -> a739674e0


TOREE-387: Always use SparkSession.builder.getOrCreate.

Keeping a reference to a SparkSession prevents users from restarting the
Spark application in long-running notebooks.


Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/a739674e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/a739674e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/a739674e

Branch: refs/heads/master
Commit: a739674e03622e75012b366e52851a0786613aaf
Parents: 3499272
Author: Ryan Blue <bl...@apache.org>
Authored: Wed Feb 15 16:03:54 2017 -0800
Committer: Ryan Blue <bl...@apache.org>
Committed: Tue Feb 21 09:36:40 2017 -0800

----------------------------------------------------------------------
 .../org/apache/toree/kernel/api/Kernel.scala    | 35 +++++++++++---------
 1 file changed, 20 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/a739674e/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala b/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
index 087a0fe..eb0c296 100644
--- a/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
+++ b/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
@@ -19,9 +19,10 @@ package org.apache.toree.kernel.api
 
 import java.io.{InputStream, PrintStream}
 import java.util.concurrent.ConcurrentHashMap
+import scala.collection.mutable
 import com.typesafe.config.Config
 import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql.{SQLContext, SparkSession}
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.toree.annotations.Experimental
 import org.apache.toree.boot.layer.InterpreterManager
@@ -87,11 +88,6 @@ class Kernel (
   private val currentErrorKernelMessage =
     new DynamicVariable[KernelMessage](null)
 
-  private var _sparkSession: SparkSession = null
-  def _sparkContext:SparkContext = _sparkSession.sparkContext
-  def _javaSparkContext: JavaSparkContext = new JavaSparkContext(_sparkContext)
-  //def _sqlContext = _sparkSession;
-
   /**
    * Represents magics available through the kernel.
    */
@@ -345,20 +341,20 @@ class Kernel (
 
   override def createSparkContext(conf: SparkConf): SparkContext = {
     val sconf = createSparkConf(conf)
-    _sparkSession = SparkSession.builder.config(sconf).getOrCreate()
+    val _sparkSession = SparkSession.builder.config(sconf).getOrCreate()
 
     val sparkMaster = sconf.getOption("spark.master").getOrElse("not_set")
     logger.info( s"Connecting to spark.master $sparkMaster")
 
     // TODO: Convert to events
-    pluginManager.dependencyManager.add(sconf)
+    pluginManager.dependencyManager.add(_sparkSession.sparkContext.getConf)
     pluginManager.dependencyManager.add(_sparkSession)
-    pluginManager.dependencyManager.add(_sparkContext)
-    pluginManager.dependencyManager.add(_javaSparkContext)
+    pluginManager.dependencyManager.add(_sparkSession.sparkContext)
+    pluginManager.dependencyManager.add(javaSparkContext(_sparkSession))
 
     pluginManager.fireEvent(SparkReady)
 
-    _sparkContext
+    _sparkSession.sparkContext
   }
 
   override def createSparkContext(
@@ -400,8 +396,17 @@ class Kernel (
     interpreterManager.interpreters.get(name)
   }
 
-  override def sparkContext: SparkContext = _sparkContext
-  override def sparkConf: SparkConf = _sparkSession.sparkContext.getConf
-  override def javaSparkContext: JavaSparkContext = _javaSparkContext
-  override def sparkSession: SparkSession = _sparkSession
+  override def sparkSession: SparkSession = SparkSession.builder.getOrCreate
+  override def sparkContext: SparkContext = sparkSession.sparkContext
+  override def sparkConf: SparkConf = sparkSession.sparkContext.getConf
+  override def javaSparkContext: JavaSparkContext = javaSparkContext(sparkSession)
+
+  private val javaContexts = new mutable.WeakHashMap[SparkSession, JavaSparkContext]
+  private def javaSparkContext(sparkSession: SparkSession): JavaSparkContext = {
+    javaContexts.synchronized {
+      javaContexts.getOrElseUpdate(
+        sparkSession,
+        new JavaSparkContext(sparkSession.sparkContext))
+    }
+  }
 }