You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by ch...@apache.org on 2017/02/21 18:36:59 UTC
incubator-toree git commit: TOREE-387: Always use
SparkSession.builder.getOrCreate.
Repository: incubator-toree
Updated Branches:
refs/heads/master 349927253 -> a739674e0
TOREE-387: Always use SparkSession.builder.getOrCreate.
Keeping a reference to a SparkSession prevents users from restarting the
Spark application in long-running notebooks.
Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/a739674e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/a739674e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/a739674e
Branch: refs/heads/master
Commit: a739674e03622e75012b366e52851a0786613aaf
Parents: 3499272
Author: Ryan Blue <bl...@apache.org>
Authored: Wed Feb 15 16:03:54 2017 -0800
Committer: Ryan Blue <bl...@apache.org>
Committed: Tue Feb 21 09:36:40 2017 -0800
----------------------------------------------------------------------
.../org/apache/toree/kernel/api/Kernel.scala | 35 +++++++++++---------
1 file changed, 20 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/a739674e/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala b/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
index 087a0fe..eb0c296 100644
--- a/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
+++ b/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
@@ -19,9 +19,10 @@ package org.apache.toree.kernel.api
import java.io.{InputStream, PrintStream}
import java.util.concurrent.ConcurrentHashMap
+import scala.collection.mutable
import com.typesafe.config.Config
import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql.{SQLContext, SparkSession}
+import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.toree.annotations.Experimental
import org.apache.toree.boot.layer.InterpreterManager
@@ -87,11 +88,6 @@ class Kernel (
private val currentErrorKernelMessage =
new DynamicVariable[KernelMessage](null)
- private var _sparkSession: SparkSession = null
- def _sparkContext:SparkContext = _sparkSession.sparkContext
- def _javaSparkContext: JavaSparkContext = new JavaSparkContext(_sparkContext)
- //def _sqlContext = _sparkSession;
-
/**
* Represents magics available through the kernel.
*/
@@ -345,20 +341,20 @@ class Kernel (
override def createSparkContext(conf: SparkConf): SparkContext = {
val sconf = createSparkConf(conf)
- _sparkSession = SparkSession.builder.config(sconf).getOrCreate()
+ val _sparkSession = SparkSession.builder.config(sconf).getOrCreate()
val sparkMaster = sconf.getOption("spark.master").getOrElse("not_set")
logger.info( s"Connecting to spark.master $sparkMaster")
// TODO: Convert to events
- pluginManager.dependencyManager.add(sconf)
+ pluginManager.dependencyManager.add(_sparkSession.sparkContext.getConf)
pluginManager.dependencyManager.add(_sparkSession)
- pluginManager.dependencyManager.add(_sparkContext)
- pluginManager.dependencyManager.add(_javaSparkContext)
+ pluginManager.dependencyManager.add(_sparkSession.sparkContext)
+ pluginManager.dependencyManager.add(javaSparkContext(_sparkSession))
pluginManager.fireEvent(SparkReady)
- _sparkContext
+ _sparkSession.sparkContext
}
override def createSparkContext(
@@ -400,8 +396,17 @@ class Kernel (
interpreterManager.interpreters.get(name)
}
- override def sparkContext: SparkContext = _sparkContext
- override def sparkConf: SparkConf = _sparkSession.sparkContext.getConf
- override def javaSparkContext: JavaSparkContext = _javaSparkContext
- override def sparkSession: SparkSession = _sparkSession
+ override def sparkSession: SparkSession = SparkSession.builder.getOrCreate
+ override def sparkContext: SparkContext = sparkSession.sparkContext
+ override def sparkConf: SparkConf = sparkSession.sparkContext.getConf
+ override def javaSparkContext: JavaSparkContext = javaSparkContext(sparkSession)
+
+ private val javaContexts = new mutable.WeakHashMap[SparkSession, JavaSparkContext]
+ private def javaSparkContext(sparkSession: SparkSession): JavaSparkContext = {
+ javaContexts.synchronized {
+ javaContexts.getOrElseUpdate(
+ sparkSession,
+ new JavaSparkContext(sparkSession.sparkContext))
+ }
+ }
}