You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by ch...@apache.org on 2016/03/31 16:13:12 UTC
[2/6] incubator-toree git commit: Refactoring some unnecessary
methods for binding contexts
Refactoring some unnecessary methods for binding contexts
Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/5fe5245a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/5fe5245a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/5fe5245a
Branch: refs/heads/master
Commit: 5fe5245a3ed6a6c6bbcdf4c062fed8bdedb9059e
Parents: bbf2866
Author: Gino Bustelo <lb...@apache.org>
Authored: Mon Mar 28 13:38:13 2016 -0500
Committer: Gino Bustelo <lb...@apache.org>
Committed: Wed Mar 30 11:42:42 2016 -0500
----------------------------------------------------------------------
.../apache/toree/interpreter/Interpreter.scala | 14 ----
.../toree/interpreter/broker/BrokerBridge.scala | 4 --
.../producer/JavaSparkContextProducerLike.scala | 43 -----------
.../producer/SQLContextProducerLike.scala | 43 -----------
.../interpreter/broker/BrokerBridgeSpec.scala | 5 --
.../boot/layer/ComponentInitialization.scala | 15 ++--
.../toree/boot/layer/InterpreterManager.scala | 31 ++++++--
.../org/apache/toree/kernel/api/Kernel.scala | 20 ------
.../InterpreterActorSpecForIntegration.scala | 2 +-
.../PostProcessorSpecForIntegration.scala | 2 +-
.../interpreter/pyspark/PySparkBridge.scala | 6 +-
.../pyspark/PySparkInterpreter.scala | 9 +--
.../interpreter/scala/ScalaInterpreter.scala | 76 +++++++++-----------
.../AddExternalJarMagicSpecForIntegration.scala | 2 +-
.../interpreter/sparkr/SparkRBridge.scala | 6 +-
.../interpreter/sparkr/SparkRInterpreter.scala | 8 ---
.../interpreter/sparkr/SparkRService.scala | 6 +-
.../kernel/interpreter/sql/SqlInterpreter.scala | 14 +---
18 files changed, 76 insertions(+), 230 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5fe5245a/kernel-api/src/main/scala/org/apache/toree/interpreter/Interpreter.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/Interpreter.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/Interpreter.scala
index 9a0ffcb..67bd889 100644
--- a/kernel-api/src/main/scala/org/apache/toree/interpreter/Interpreter.scala
+++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/Interpreter.scala
@@ -82,20 +82,6 @@ trait Interpreter {
def doQuietly[T](body: => T): T
/**
- * Binds the SparkContext instance to the interpreter's namespace.
- *
- * @param sparkContext The SparkContext to bind
- */
- def bindSparkContext(sparkContext: SparkContext): Unit
-
- /**
- * Binds the SQLContext instance to the interpreter's namespace.
- *
- * @param sqlContext The SQLContext to bind
- */
- def bindSqlContext(sqlContext: SQLContext): Unit
-
- /**
* Binds a variable in the interpreter to a value.
* @param variableName The name to expose the value in the interpreter
* @param typeName The type of the variable, must be the fully qualified class name
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5fe5245a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerBridge.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerBridge.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerBridge.scala
index 1c54495..202ee0b 100644
--- a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerBridge.scala
+++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerBridge.scala
@@ -17,11 +17,7 @@
package org.apache.toree.interpreter.broker
-import org.apache.toree.interpreter.broker.producer.{SQLContextProducerLike, JavaSparkContextProducerLike}
import org.apache.toree.kernel.api.KernelLike
-import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.{SparkConf, SparkContext}
/**
* Represents the API available to the broker to act as the bridge for data
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5fe5245a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/producer/JavaSparkContextProducerLike.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/producer/JavaSparkContextProducerLike.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/producer/JavaSparkContextProducerLike.scala
deleted file mode 100644
index 610b319..0000000
--- a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/producer/JavaSparkContextProducerLike.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.toree.interpreter.broker.producer
-
-import org.apache.spark.SparkContext
-import org.apache.spark.api.java.JavaSparkContext
-
-/**
- * Represents a producer for a JavaSparkContext.
- */
-trait JavaSparkContextProducerLike {
- /**
- * Creates a new JavaSparkContext instance.
- *
- * @param sparkContext The SparkContext instance to use to create the Java one
- *
- * @return The new JavaSparkContext
- */
- def newJavaSparkContext(sparkContext: SparkContext): JavaSparkContext
-}
-
-/**
- * Represents the standard producer for a JavaSparkContext.
- */
-trait StandardJavaSparkContextProducer extends JavaSparkContextProducerLike {
- def newJavaSparkContext(sparkContext: SparkContext): JavaSparkContext =
- new JavaSparkContext(sparkContext)
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5fe5245a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/producer/SQLContextProducerLike.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/producer/SQLContextProducerLike.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/producer/SQLContextProducerLike.scala
deleted file mode 100644
index 4a78c2b..0000000
--- a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/producer/SQLContextProducerLike.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.toree.interpreter.broker.producer
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
-
-/**
- * Represents a producer for a SQLContext.
- */
-trait SQLContextProducerLike {
- /**
- * Creates a new SQLContext instance.
- *
- * @param sparkContext The SparkContext instance to use to create the SQL one
- *
- * @return The new SQLContext
- */
- def newSQLContext(sparkContext: SparkContext): SQLContext
-}
-
-/**
- * Represents the standard producer for a SQLContext.
- */
-trait StandardSQLContextProducer extends SQLContextProducerLike {
- def newSQLContext(sparkContext: SparkContext): SQLContext =
- new SQLContext(sparkContext)
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5fe5245a/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerBridgeSpec.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerBridgeSpec.scala b/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerBridgeSpec.scala
index 8ab4959..26cfca8 100644
--- a/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerBridgeSpec.scala
+++ b/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerBridgeSpec.scala
@@ -16,14 +16,9 @@
*/
package org.apache.toree.interpreter.broker
-import org.apache.toree.interpreter.broker.producer.{SQLContextProducerLike, JavaSparkContextProducerLike}
import org.apache.toree.kernel.api.KernelLike
-import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.mock.MockitoSugar
import org.scalatest.{FunSpec, Matchers, OneInstancePerTest}
-import org.mockito.Mockito._
class BrokerBridgeSpec extends FunSpec with Matchers with OneInstancePerTest
with MockitoSugar
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5fe5245a/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala b/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala
index d361708..61ca88b 100644
--- a/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala
+++ b/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala
@@ -80,14 +80,15 @@ trait StandardComponentInitialization extends ComponentInitialization {
config, scalaInterpreter, dependencyDownloader)
val kernel = initializeKernel(
- config, actorLoader, manager, commManager, pluginManager
+ config, actorLoader, manager, commManager, pluginManager
)
- initializePlugins(config, pluginManager)
+ initializeSparkContext(config, kernel, appName)
+
+ manager.initializeInterpreters(kernel)
val responseMap = initializeResponseMap()
- initializeSparkContext(config, kernel, appName)
(commStorage, commRegistrar, commManager,
manager.defaultInterpreter.orNull, kernel,
@@ -145,14 +146,6 @@ trait StandardComponentInitialization extends ComponentInitialization {
commManager,
pluginManager
)
- /*
- interpreter.doQuietly {
- interpreter.bind(
- "kernel", "org.apache.toree.kernel.api.Kernel",
- kernel, List( """@transient implicit""")
- )
- }
- */
pluginManager.dependencyManager.add(kernel)
kernel
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5fe5245a/kernel/src/main/scala/org/apache/toree/boot/layer/InterpreterManager.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/boot/layer/InterpreterManager.scala b/kernel/src/main/scala/org/apache/toree/boot/layer/InterpreterManager.scala
index 1b5ad72..a0409fe 100644
--- a/kernel/src/main/scala/org/apache/toree/boot/layer/InterpreterManager.scala
+++ b/kernel/src/main/scala/org/apache/toree/boot/layer/InterpreterManager.scala
@@ -57,13 +57,11 @@ object InterpreterManager {
config.getStringList("default_interpreter_plugin").asScala
val m = ip.foldLeft(Map[String, Interpreter]())( (acc, v) => {
+
v.split(":") match {
case Array(name, className) =>
try {
- val i = Class
- .forName(className)
- .newInstance()
- .asInstanceOf[Interpreter]
+ val i = instantiate(className, config)
acc + (name -> i)
}
catch {
@@ -80,4 +78,29 @@ object InterpreterManager {
InterpreterManager(interpreters = m, default = default)
}
+
+ /**
+ * instantiate will look for a constructor that take a Config. If available, will
+ * call that, else it will assume that there is a default empty constructor.
+ * @param className
+ * @param config
+ * @return
+ */
+ private def instantiate(className:String, config:Config):Interpreter = {
+ try {
+ Class
+ .forName(className)
+ .getConstructor(Class.forName("com.typesafe.config.Config"))
+ .newInstance(config).asInstanceOf[Interpreter]
+ }
+ catch {
+ case e: NoSuchMethodException =>
+ logger.debug("Using default constructor for class " + className)
+ Class
+ .forName(className)
+ .newInstance().asInstanceOf[Interpreter]
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5fe5245a/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala b/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
index bf7f56a..777cd0f 100644
--- a/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
+++ b/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
@@ -110,9 +110,6 @@ class Kernel (
*/
val data: java.util.Map[String, Any] = new ConcurrentHashMap[String, Any]()
-
- interpreterManager.initializeInterpreters(this)
-
val interpreter = interpreterManager.defaultInterpreter.get
/**
@@ -356,9 +353,6 @@ class Kernel (
val sparkMaster = _sparkConf.getOption("spark.master").getOrElse("not_set")
logger.info( s"Connecting to spark.master $sparkMaster")
- updateInterpreterWithSparkContext(interpreter, sparkContext)
- updateInterpreterWithSqlContext(interpreter, sqlContext)
-
// TODO: Convert to events
pluginManager.dependencyManager.add(_sparkConf)
pluginManager.dependencyManager.add(_sparkContext)
@@ -414,14 +408,6 @@ class Kernel (
sparkContext
}
- // TODO: Think of a better way to test without exposing this
- protected[kernel] def updateInterpreterWithSparkContext(
- interpreter: Interpreter, sparkContext: SparkContext
- ) = {
-
- interpreter.bindSparkContext(sparkContext)
- }
-
protected[kernel] def initializeSqlContext(
sparkContext: SparkContext
): SQLContext = {
@@ -451,12 +437,6 @@ class Kernel (
sqlContext
}
- protected[kernel] def updateInterpreterWithSqlContext(
- interpreter: Interpreter, sqlContext: SQLContext
- ): Unit = {
- interpreter.bindSqlContext(sqlContext)
- }
-
override def interpreter(name: String): Option[Interpreter] = {
interpreterManager.interpreters.get(name)
}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5fe5245a/kernel/src/test/scala/integration/InterpreterActorSpecForIntegration.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/integration/InterpreterActorSpecForIntegration.scala b/kernel/src/test/scala/integration/InterpreterActorSpecForIntegration.scala
index 62da297..08effdb 100644
--- a/kernel/src/test/scala/integration/InterpreterActorSpecForIntegration.scala
+++ b/kernel/src/test/scala/integration/InterpreterActorSpecForIntegration.scala
@@ -66,7 +66,7 @@ class InterpreterActorSpecForIntegration extends TestKit(
TaskManager.DefaultMaximumWorkers
}
- override protected def bindKernelVarialble(kernel: KernelLike): Unit = { }
+ override protected def bindKernelVariable(kernel: KernelLike): Unit = { }
}
private val conf = new SparkConf()
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5fe5245a/kernel/src/test/scala/integration/PostProcessorSpecForIntegration.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/integration/PostProcessorSpecForIntegration.scala b/kernel/src/test/scala/integration/PostProcessorSpecForIntegration.scala
index 6a401b9..cfd7ea0 100644
--- a/kernel/src/test/scala/integration/PostProcessorSpecForIntegration.scala
+++ b/kernel/src/test/scala/integration/PostProcessorSpecForIntegration.scala
@@ -47,7 +47,7 @@ class PostProcessorSpecForIntegration extends FunSpec with Matchers
TaskManager.DefaultMaximumWorkers
}
- override protected def bindKernelVarialble(kernel: KernelLike): Unit = { }
+ override protected def bindKernelVariable(kernel: KernelLike): Unit = { }
}
scalaInterpreter.init(mock[KernelLike])
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5fe5245a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkBridge.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkBridge.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkBridge.scala
index 4926aef..ae6e15a 100644
--- a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkBridge.scala
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkBridge.scala
@@ -16,10 +16,8 @@
*/
package org.apache.toree.kernel.interpreter.pyspark
-import org.apache.toree.interpreter.broker.producer.{StandardSQLContextProducer, StandardJavaSparkContextProducer, SQLContextProducerLike, JavaSparkContextProducerLike}
-import org.apache.toree.interpreter.broker.{BrokerState, BrokerBridge}
+import org.apache.toree.interpreter.broker.{BrokerBridge, BrokerState}
import org.apache.toree.kernel.api.KernelLike
-import org.apache.spark.SparkContext
/**
* Represents constants for the PySpark bridge.
@@ -43,7 +41,7 @@ object PySparkBridge {
new PySparkBridge(
_brokerState = brokerState,
_kernel = kernel
- ) with StandardJavaSparkContextProducer with StandardSQLContextProducer
+ )
}
}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5fe5245a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
index 4a40fb7..b07200d 100644
--- a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
@@ -18,11 +18,10 @@ package org.apache.toree.kernel.interpreter.pyspark
import java.net.URL
+import com.typesafe.config.Config
import org.apache.toree.interpreter.Results.Result
import org.apache.toree.interpreter._
import org.apache.toree.kernel.api.KernelLike
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
import org.slf4j.LoggerFactory
import py4j.GatewayServer
@@ -80,12 +79,6 @@ class PySparkInterpreter(
this
}
- // Unsupported (but can be invoked)
- override def bindSparkContext(sparkContext: SparkContext): Unit = {}
-
- // Unsupported (but can be invoked)
- override def bindSqlContext(sqlContext: SQLContext): Unit = {}
-
/**
* Executes the provided code with the option to silence output.
* @param code The code to execute
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5fe5245a/scala-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/scala/ScalaInterpreter.scala
----------------------------------------------------------------------
diff --git a/scala-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/scala/ScalaInterpreter.scala b/scala-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/scala/ScalaInterpreter.scala
index 249441e..5a888fe 100644
--- a/scala-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/scala/ScalaInterpreter.scala
+++ b/scala-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/scala/ScalaInterpreter.scala
@@ -22,49 +22,60 @@ import java.net.{URL, URLClassLoader}
import java.nio.charset.Charset
import java.util.concurrent.ExecutionException
-import akka.actor.Actor
-import akka.actor.Actor.Receive
+import com.typesafe.config.Config
+import org.apache.spark.SparkContext
+import org.apache.spark.repl.{SparkCommandLine, SparkIMain, SparkJLineCompletion}
+import org.apache.spark.sql.SQLContext
import org.apache.toree.global.StreamState
-import org.apache.toree.interpreter
import org.apache.toree.interpreter._
import org.apache.toree.interpreter.imports.printers.{WrapperConsole, WrapperSystem}
import org.apache.toree.kernel.api.{KernelLike, KernelOptions}
import org.apache.toree.utils.{MultiOutputStream, TaskManager}
-import org.apache.spark.SparkContext
-import org.apache.spark.repl.{SparkCommandLine, SparkIMain, SparkJLineCompletion}
-import org.apache.spark.sql.SQLContext
import org.slf4j.LoggerFactory
import scala.annotation.tailrec
import scala.concurrent.{Await, Future}
import scala.language.reflectiveCalls
import scala.tools.nsc.backend.JavaPlatform
-import scala.tools.nsc.interpreter.{OutputStream, IR, JPrintWriter, InputStream}
+import scala.tools.nsc.interpreter.{IR, InputStream, JPrintWriter, OutputStream}
import scala.tools.nsc.io.AbstractFile
import scala.tools.nsc.util.{ClassPath, MergedClassPath}
import scala.tools.nsc.{Global, Settings, io}
import scala.util.{Try => UtilTry}
-class ScalaInterpreter() extends Interpreter {
+class ScalaInterpreter(config:Config) extends Interpreter {
protected val logger = LoggerFactory.getLogger(this.getClass.getName)
private val ExecutionExceptionName = "lastException"
- protected var settings: Settings = null
-
protected val _thisClassloader = this.getClass.getClassLoader
+
protected val _runtimeClassloader =
new URLClassLoader(Array(), _thisClassloader) {
def addJar(url: URL) = this.addURL(url)
}
+ protected val lastResultOut = new ByteArrayOutputStream()
- protected val lastResultOut = new ByteArrayOutputStream()
protected val multiOutputStream = MultiOutputStream(List(Console.out, lastResultOut))
private var taskManager: TaskManager = _
var sparkIMain: SparkIMain = _
protected var jLineCompleter: SparkJLineCompletion = _
+ protected var settings: Settings = newSettings(interpreterArgs())
+
+ settings.classpath.value = buildClasspath(_thisClassloader)
+ settings.embeddedDefaults(_runtimeClassloader)
+
+ private val maxInterpreterThreads: Int = {
+ if(config.hasPath("max_interpreter_threads"))
+ config.getInt("max_interpreter_threads")
+ else
+ TaskManager.DefaultMaximumWorkers
+ }
+
+ start()
+
protected def newSparkIMain(
settings: Settings, out: JPrintWriter
): SparkIMain = {
@@ -74,8 +85,6 @@ class ScalaInterpreter() extends Interpreter {
s
}
- private var maxInterpreterThreads:Int = TaskManager.DefaultMaximumWorkers
-
protected def newTaskManager(): TaskManager =
new TaskManager(maximumWorkers = maxInterpreterThreads)
@@ -193,16 +202,9 @@ class ScalaInterpreter() extends Interpreter {
}
override def init(kernel: KernelLike): Interpreter = {
- val args = interpreterArgs(kernel)
- this.settings = newSettings(args)
-
- this.settings.classpath.value = buildClasspath(_thisClassloader)
- this.settings.embeddedDefaults(_runtimeClassloader)
-
- maxInterpreterThreads = maxInterpreterThreads(kernel)
-
- start()
- bindKernelVarialble(kernel)
+ bindKernelVariable(kernel)
+ bindSparkContext(kernel.sparkContext)
+ bindSqlContext(kernel.sqlContext)
this
}
@@ -228,16 +230,12 @@ class ScalaInterpreter() extends Interpreter {
urls.foldLeft("")((l, r) => ClassPath.join(l, r.toString))
}
- protected def interpreterArgs(kernel: KernelLike): List[String] = {
+ protected def interpreterArgs(): List[String] = {
import scala.collection.JavaConverters._
- kernel.config.getStringList("interpreter_args").asScala.toList
+ config.getStringList("interpreter_args").asScala.toList
}
- protected def maxInterpreterThreads(kernel: KernelLike): Int = {
- kernel.config.getInt("max_interpreter_threads")
- }
-
- protected def bindKernelVarialble(kernel: KernelLike): Unit = {
+ protected def bindKernelVariable(kernel: KernelLike): Unit = {
doQuietly {
bind(
"kernel", "org.apache.toree.kernel.api.Kernel",
@@ -515,17 +513,12 @@ class ScalaInterpreter() extends Interpreter {
sparkIMain.beQuietDuring[T](body)
}
- override def bindSparkContext(sparkContext: SparkContext) = {
+ def bindSparkContext(sparkContext: SparkContext) = {
val bindName = "sc"
doQuietly {
logger.debug(s"Binding SparkContext into interpreter as $bindName")
- bind(
- bindName,
- "org.apache.spark.SparkContext",
- sparkContext,
- List( """@transient""")
- )
+ interpret(s"""def ${bindName}: org.apache.spark.SparkContext = kernel.sparkContext""")
// NOTE: This is needed because interpreter blows up after adding
// dependencies to SparkContext and Interpreter before the
@@ -546,20 +539,15 @@ class ScalaInterpreter() extends Interpreter {
}
}
- override def bindSqlContext(sqlContext: SQLContext): Unit = {
+ def bindSqlContext(sqlContext: SQLContext): Unit = {
val bindName = "sqlContext"
doQuietly {
// TODO: This only adds the context to the main interpreter AND
// is limited to the Scala interpreter interface
logger.debug(s"Binding SQLContext into interpreter as $bindName")
- bind(
- bindName,
- classOf[SQLContext].getName,
- sqlContext,
- List( """@transient""")
- )
+ interpret(s"""def ${bindName}: ${classOf[SQLContext].getName} = kernel.sqlContext""")
sqlContext
}
}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5fe5245a/scala-interpreter/src/test/scala/integration/interpreter/scala/AddExternalJarMagicSpecForIntegration.scala
----------------------------------------------------------------------
diff --git a/scala-interpreter/src/test/scala/integration/interpreter/scala/AddExternalJarMagicSpecForIntegration.scala b/scala-interpreter/src/test/scala/integration/interpreter/scala/AddExternalJarMagicSpecForIntegration.scala
index 2d5faa4..b4bf596 100644
--- a/scala-interpreter/src/test/scala/integration/interpreter/scala/AddExternalJarMagicSpecForIntegration.scala
+++ b/scala-interpreter/src/test/scala/integration/interpreter/scala/AddExternalJarMagicSpecForIntegration.scala
@@ -46,7 +46,7 @@ class AddExternalJarMagicSpecForIntegration
TaskManager.DefaultMaximumWorkers
}
- override protected def bindKernelVarialble(kernel: KernelLike): Unit = { }
+ override protected def bindKernelVariable(kernel: KernelLike): Unit = { }
}
interpreter.init(mock[KernelLike])
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5fe5245a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRBridge.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRBridge.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRBridge.scala
index 5d82e51..a9cfc63 100644
--- a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRBridge.scala
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRBridge.scala
@@ -16,10 +16,8 @@
*/
package org.apache.toree.kernel.interpreter.sparkr
-import org.apache.toree.interpreter.broker.producer.{StandardSQLContextProducer, StandardJavaSparkContextProducer, JavaSparkContextProducerLike, SQLContextProducerLike}
-import org.apache.toree.interpreter.broker.{BrokerState, BrokerBridge}
+import org.apache.toree.interpreter.broker.{BrokerBridge, BrokerState}
import org.apache.toree.kernel.api.KernelLike
-import org.apache.spark.SparkContext
/**
* Represents constants for the SparkR bridge.
@@ -60,7 +58,7 @@ object SparkRBridge {
new SparkRBridge(
_brokerState = brokerState,
_kernel = kernel
- ) with StandardJavaSparkContextProducer with StandardSQLContextProducer
+ )
}
}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5fe5245a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
index 851a2d2..c99ada0 100644
--- a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
@@ -21,8 +21,6 @@ import java.net.URL
import org.apache.toree.interpreter.Results.Result
import org.apache.toree.interpreter._
import org.apache.toree.kernel.api.KernelLike
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
import org.slf4j.LoggerFactory
import scala.concurrent.Await
@@ -135,12 +133,6 @@ class SparkRInterpreter(
// Unsupported
override def classServerURI: String = ""
- // Unsupported (but can be invoked)
- override def bindSparkContext(sparkContext: SparkContext): Unit = {}
-
- // Unsupported (but can be invoked)
- override def bindSqlContext(sqlContext: SQLContext): Unit = {}
-
// Unsupported
override def interrupt(): Interpreter = ???
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5fe5245a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala
index be10e63..b44eae4 100644
--- a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala
@@ -16,15 +16,13 @@
*/
package org.apache.toree.kernel.interpreter.sparkr
-import java.util.concurrent.{TimeUnit, Semaphore}
+import java.util.concurrent.{Semaphore, TimeUnit}
import org.apache.toree.interpreter.broker.BrokerService
-import org.apache.toree.kernel.api.KernelLike
import org.apache.toree.kernel.interpreter.sparkr.SparkRTypes.{Code, CodeResults}
-import org.apache.spark.SparkContext
import org.slf4j.LoggerFactory
-import scala.concurrent.{future, Future}
+import scala.concurrent.{Future, future}
/**
* Represents the service that provides the high-level interface between the
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5fe5245a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala
index b9d534f..207ab57 100644
--- a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala
+++ b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala
@@ -18,15 +18,13 @@ package org.apache.toree.kernel.interpreter.sql
import java.net.URL
-import org.apache.toree.interpreter.{ExecuteFailure, ExecuteOutput, Interpreter}
import org.apache.toree.interpreter.Results.Result
+import org.apache.toree.interpreter.{ExecuteFailure, ExecuteOutput, Interpreter}
import org.apache.toree.kernel.api.KernelLike
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
-import scala.concurrent.duration._
import scala.concurrent.Await
-import scala.tools.nsc.interpreter.{OutputStream, InputStream}
+import scala.concurrent.duration._
+import scala.tools.nsc.interpreter.{InputStream, OutputStream}
/**
* Represents an interpreter interface to Spark SQL.
@@ -103,12 +101,6 @@ class SqlInterpreter() extends Interpreter {
// Unsupported
override def classServerURI: String = ""
- // Unsupported (but can be invoked)
- override def bindSparkContext(sparkContext: SparkContext): Unit = {}
-
- // Unsupported (but can be invoked)
- override def bindSqlContext(sqlContext: SQLContext): Unit = {}
-
// Unsupported
override def interrupt(): Interpreter = ???