You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lr...@apache.org on 2016/01/11 22:01:50 UTC
[08/50] [abbrv] incubator-toree git commit: make sure we can
instantiate ScalaInterpreter the same as the rest
make sure we can instantiate ScalaInterpreter the same as the rest
Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/09ad06e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/09ad06e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/09ad06e0
Branch: refs/heads/master
Commit: 09ad06e0f4d40706f190e2a1fe692716a650e493
Parents: 25b343b
Author: Brian Burns <bb...@us.ibm.com>
Authored: Thu Nov 12 16:47:45 2015 -0500
Committer: Brian Burns <bb...@us.ibm.com>
Committed: Thu Nov 12 16:47:45 2015 -0500
----------------------------------------------------------------------
.../com/ibm/spark/kernel/api/KernelLike.scala | 3 +
.../boot/layer/ComponentInitialization.scala | 116 ++-----------------
.../spark/boot/layer/InterpreterManager.scala | 2 +-
.../scala/com/ibm/spark/kernel/api/Kernel.scala | 17 ++-
.../com/ibm/spark/kernel/api/KernelSpec.scala | 2 +-
.../InterpreterActorSpecForIntegration.scala | 25 +++-
.../PostProcessorSpecForIntegration.scala | 27 ++++-
.../scala/test/utils/SparkKernelDeployer.scala | 45 +------
resources/compile/reference.conf | 1 +
resources/test/reference.conf | 1 +
.../interpreter/scala/ScalaInterpreter.scala | 67 +++++++----
.../scala/ScalaInterpreterSpec.scala | 5 +-
.../AddExternalJarMagicSpecForIntegration.scala | 26 ++++-
13 files changed, 136 insertions(+), 201 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/kernel-api/src/main/scala/com/ibm/spark/kernel/api/KernelLike.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/main/scala/com/ibm/spark/kernel/api/KernelLike.scala b/kernel-api/src/main/scala/com/ibm/spark/kernel/api/KernelLike.scala
index 8fb5d80..c9442aa 100644
--- a/kernel-api/src/main/scala/com/ibm/spark/kernel/api/KernelLike.scala
+++ b/kernel-api/src/main/scala/com/ibm/spark/kernel/api/KernelLike.scala
@@ -18,6 +18,7 @@ package com.ibm.spark.kernel.api
import java.io.{PrintStream, InputStream, OutputStream}
+import com.typesafe.config.Config
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
@@ -93,6 +94,8 @@ trait KernelLike {
def interpreter(name: String): Option[com.ibm.spark.interpreter.Interpreter]
+ def config: Config
+
def sparkContext: SparkContext
def sparkConf: SparkConf
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala b/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala
index e403e9b..973075b 100644
--- a/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala
+++ b/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala
@@ -25,10 +25,6 @@ import com.ibm.spark.dependencies.{DependencyDownloader, IvyDependencyDownloader
import com.ibm.spark.global
import com.ibm.spark.interpreter._
import com.ibm.spark.kernel.api.{KernelLike, Kernel}
-import com.ibm.spark.kernel.interpreter.pyspark.PySparkInterpreter
-import com.ibm.spark.kernel.interpreter.sparkr.SparkRInterpreter
-import com.ibm.spark.kernel.interpreter.scala.{TaskManagerProducerLike, StandardSparkIMainProducer, StandardSettingsProducer, ScalaInterpreter}
-import com.ibm.spark.kernel.interpreter.sql.SqlInterpreter
import com.ibm.spark.kernel.protocol.v5.KMBuilder
import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
import com.ibm.spark.kernel.protocol.v5.stream.KernelOutputStream
@@ -81,105 +77,28 @@ trait StandardComponentInitialization extends ComponentInitialization {
) = {
val (commStorage, commRegistrar, commManager) =
initializeCommObjects(actorLoader)
- val interpreter = initializeInterpreter(config)
+
+ val manager = InterpreterManager(config)
+ val scalaInterpreter = manager.interpreters.get("Scala").orNull
val dependencyDownloader = initializeDependencyDownloader(config)
val magicLoader = initializeMagicLoader(
- config, interpreter, dependencyDownloader)
- val manager = InterpreterManager(config)
- .addInterpreter("Scala",interpreter)
+ config, scalaInterpreter, dependencyDownloader)
+
val kernel = initializeKernel(
config, actorLoader, manager, commManager, magicLoader
)
- val responseMap = initializeResponseMap()
-
- /*
- // NOTE: Tested via initializing the following and returning this
- // interpreter instead of the Scala one
- val pySparkInterpreter = new PySparkInterpreter(kernel)
- //pySparkInterpreter.start()
- kernel.data.put("PySpark", pySparkInterpreter)
-
- // NOTE: Tested via initializing the following and returning this
- // interpreter instead of the Scala one
- val sparkRInterpreter = new SparkRInterpreter(kernel)
- //sparkRInterpreter.start()
- kernel.data.put("SparkR", sparkRInterpreter)
-
- val sqlInterpreter = new SqlInterpreter(kernel)
- //sqlInterpreter.start()
- kernel.data.put("SQL", sqlInterpreter)
-
-
- val plugins = initializeInterpreterPlugins(kernel, config)
-
- kernel.data.putAll(plugins.asJava)
-
- // Add Scala to available data map
- kernel.data.put("Scala", interpreter)
- val defaultInterpreter: Interpreter =
- config.getString("default_interpreter").toLowerCase match {
- case "scala" =>
- logger.info("Using Scala interpreter as default!")
- interpreter.doQuietly {
- interpreter.bind(
- "kernel", "com.ibm.spark.kernel.api.Kernel",
- kernel, List( """@transient implicit""")
- )
- }
- interpreter
- case "pyspark" =>
- logger.info("Using PySpark interpreter as default!")
- pySparkInterpreter
- case "sparkr" =>
- logger.info("Using SparkR interpreter as default!")
- sparkRInterpreter
- case "sql" =>
- logger.info("Using SQL interpreter as default!")
- sqlInterpreter
- case p if(kernel.data.containsKey(p)) =>
- kernel.data.get(p).asInstanceOf[Interpreter]
- case unknown =>
- logger.warn(s"Unknown interpreter '$unknown'! Defaulting to Scala!")
- interpreter
- }
+ val responseMap = initializeResponseMap()
- */
- //kernel.interpreter = defaultInterpreter
initializeSparkContext(config, kernel, appName)
(commStorage, commRegistrar, commManager,
- manager.defaultInterpreter.getOrElse(null), kernel,
+ manager.defaultInterpreter.orNull, kernel,
dependencyDownloader, magicLoader, responseMap)
}
- def initializeInterpreterPlugins(
- kernel: KernelLike,
- config: Config
- ): Map[String, Interpreter] = {
- val p = config
- .getStringList("interpreter_plugins")
- .listIterator().asScala
-
- p.foldLeft(Map[String, Interpreter]())( (acc, v) => {
- v.split(":") match {
- case Array(name, className) =>
- try {
- acc + (name -> Class
- .forName(className)
- .getConstructor(classOf[KernelLike])
- .newInstance(kernel)
- .asInstanceOf[Interpreter])
- }
- catch {
- case _:Throwable => acc
- }
- case _ => acc
- }
- })
- }
def initializeSparkContext(config:Config, kernel:Kernel, appName:String) = {
if(!config.getBoolean("nosparkcontext")) {
@@ -209,27 +128,6 @@ trait StandardComponentInitialization extends ComponentInitialization {
dependencyDownloader
}
- protected def initializeInterpreter(config: Config) = {
- val interpreterArgs = config.getStringList("interpreter_args").asScala.toList
- val maxInterpreterThreads = config.getInt("max_interpreter_threads")
-
- logger.info(
- s"Constructing interpreter with $maxInterpreterThreads threads and " +
- "with arguments: " + interpreterArgs.mkString(" "))
- val interpreter = new ScalaInterpreter(interpreterArgs, Console.out)
- with StandardSparkIMainProducer
- with TaskManagerProducerLike
- with StandardSettingsProducer {
- override def newTaskManager(): TaskManager =
- new TaskManager(maximumWorkers = maxInterpreterThreads)
- }
-
- logger.debug("Starting interpreter")
- interpreter.start()
-
- interpreter
- }
-
protected[layer] def initializeSqlContext(sparkContext: SparkContext) = {
val sqlContext: SQLContext = try {
logger.info("Attempting to create Hive Context")
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/kernel/src/main/scala/com/ibm/spark/boot/layer/InterpreterManager.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/boot/layer/InterpreterManager.scala b/kernel/src/main/scala/com/ibm/spark/boot/layer/InterpreterManager.scala
index 903ab69..520d68c 100644
--- a/kernel/src/main/scala/com/ibm/spark/boot/layer/InterpreterManager.scala
+++ b/kernel/src/main/scala/com/ibm/spark/boot/layer/InterpreterManager.scala
@@ -26,7 +26,7 @@ case class InterpreterManager(
copy(interpreters = interpreters + (name -> interpreter))
}
- def defaultInterpreter(): Option[Interpreter] = {
+ def defaultInterpreter: Option[Interpreter] = {
interpreters.get(default)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala b/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala
index 9a91a2e..fe6bc2d 100644
--- a/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala
+++ b/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala
@@ -46,19 +46,20 @@ import com.ibm.spark.global.ExecuteRequestState
/**
* Represents the main kernel API to be used for interaction.
*
- * @param config The configuration used when starting the kernel
+ * @param _config The configuration used when starting the kernel
* @param interpreterManager The interpreter manager to expose in this instance
* @param comm The Comm manager to expose in this instance
* @param actorLoader The actor loader to use for message relaying
*/
@Experimental
class Kernel (
- private val config: Config,
+ private val _config: Config,
private val actorLoader: ActorLoader,
val interpreterManager: InterpreterManager,
val comm: CommManager,
val magicLoader: MagicLoader
) extends KernelLike with LogLike {
+
/**
* Represents the current input stream used by the kernel for the specific
* thread.
@@ -111,7 +112,7 @@ class Kernel (
interpreterManager.initializeInterpreters(this)
- val interpreter = interpreterManager.defaultInterpreter().get
+ val interpreter = interpreterManager.defaultInterpreter.get
/**
* Handles the output of interpreting code.
@@ -136,6 +137,10 @@ class Kernel (
}
}
+ override def config:Config = {
+ _config
+ }
+
/**
* Executes a block of code represented as a string and returns the result.
*
@@ -202,7 +207,7 @@ class Kernel (
parentMessage: v5.KernelMessage = lastKernelMessage(),
kmBuilder: v5.KMBuilder = v5.KMBuilder()
): FactoryMethods = {
- new FactoryMethods(config, actorLoader, parentMessage, kmBuilder)
+ new FactoryMethods(_config, actorLoader, parentMessage, kmBuilder)
}
/**
@@ -347,7 +352,7 @@ class Kernel (
conf.set("spark.submit.deployMode", "client")
KeyValuePairUtils.stringToKeyValuePairSeq(
- config.getString("spark_configuration")
+ _config.getString("spark_configuration")
).foreach { keyValuePair =>
logger.info(s"Setting ${keyValuePair.key} to ${keyValuePair.value}")
Try(conf.set(keyValuePair.key, keyValuePair.value))
@@ -372,7 +377,7 @@ class Kernel (
var sparkContext: SparkContext = null
val outStream = new KernelOutputStream(
actorLoader, KMBuilder(), global.ScheduledTaskManager.instance,
- sendEmptyOutput = config.getBoolean("send_empty_output")
+ sendEmptyOutput = _config.getBoolean("send_empty_output")
)
// Update global stream state and use it to set the Console local variables
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala b/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala
index f5d5517..a5756a9 100644
--- a/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala
+++ b/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala
@@ -44,7 +44,7 @@ class KernelSpec extends FunSpec with Matchers with MockitoSugar
mockInterpreterManager = mock[InterpreterManager]
mockSparkContext = mock[SparkContext]
mockSparkConf = mock[SparkConf]
- when(mockInterpreterManager.defaultInterpreter())
+ when(mockInterpreterManager.defaultInterpreter)
.thenReturn(Some(mockInterpreter))
when(mockInterpreterManager.interpreters)
.thenReturn(Map[String, com.ibm.spark.interpreter.Interpreter]())
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/kernel/src/test/scala/integration/InterpreterActorSpecForIntegration.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/integration/InterpreterActorSpecForIntegration.scala b/kernel/src/test/scala/integration/InterpreterActorSpecForIntegration.scala
index 93c86bc..09aa6fd 100644
--- a/kernel/src/test/scala/integration/InterpreterActorSpecForIntegration.scala
+++ b/kernel/src/test/scala/integration/InterpreterActorSpecForIntegration.scala
@@ -21,11 +21,13 @@ import java.io.{ByteArrayOutputStream, OutputStream}
import akka.actor.{ActorSystem, Props}
import akka.testkit.{ImplicitSender, TestKit}
import com.ibm.spark.interpreter._
+import com.ibm.spark.kernel.api.KernelLike
import com.ibm.spark.kernel.interpreter.scala.{StandardTaskManagerProducer, StandardSparkIMainProducer, StandardSettingsProducer, ScalaInterpreter}
import com.ibm.spark.kernel.protocol.v5._
import com.ibm.spark.kernel.protocol.v5.content._
import com.ibm.spark.kernel.protocol.v5.interpreter.InterpreterActor
import com.ibm.spark.kernel.protocol.v5.interpreter.tasks.InterpreterTaskFactory
+import com.ibm.spark.utils.MultiOutputStream
import com.typesafe.config.ConfigFactory
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.mock.MockitoSugar
@@ -52,11 +54,25 @@ class InterpreterActorSpecForIntegration extends TestKit(
with MockitoSugar with UncaughtExceptionSuppression {
private val output = new ByteArrayOutputStream()
- private val interpreter = new ScalaInterpreter(List(), output)
- with StandardSparkIMainProducer
- with StandardTaskManagerProducer
- with StandardSettingsProducer
+ private val interpreter = new ScalaInterpreter {
+ override protected val multiOutputStream = MultiOutputStream(List(mock[OutputStream], lastResultOut))
+ override def init(kernel: KernelLike): Interpreter = {
+ settings = newSettings(List[String]())
+
+ val urls = _thisClassloader match {
+ case cl: java.net.URLClassLoader => cl.getURLs.toList
+ case a => // TODO: Should we really be using sys.error here?
+ sys.error("[SparkInterpreter] Unexpected class loader: " + a.getClass)
+ }
+ val classpath = urls.map(_.toString)
+
+ settings.classpath.value =
+ classpath.distinct.mkString(java.io.File.pathSeparator)
+ settings.embeddedDefaults(_runtimeClassloader)
+ this
+ }
+ }
private val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("Test Kernel")
@@ -65,6 +81,7 @@ class InterpreterActorSpecForIntegration extends TestKit(
before {
output.reset()
+ interpreter.init(mock[KernelLike])
interpreter.start()
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/kernel/src/test/scala/integration/PostProcessorSpecForIntegration.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/integration/PostProcessorSpecForIntegration.scala b/kernel/src/test/scala/integration/PostProcessorSpecForIntegration.scala
index 28d08b1..2bdce57 100644
--- a/kernel/src/test/scala/integration/PostProcessorSpecForIntegration.scala
+++ b/kernel/src/test/scala/integration/PostProcessorSpecForIntegration.scala
@@ -18,8 +18,11 @@ package integration
import java.io.OutputStream
+import com.ibm.spark.interpreter.Interpreter
+import com.ibm.spark.kernel.api.KernelLike
import com.ibm.spark.kernel.interpreter.scala.{StandardSettingsProducer, StandardTaskManagerProducer, StandardSparkIMainProducer, ScalaInterpreter}
import com.ibm.spark.kernel.protocol.v5.magic.PostProcessor
+import com.ibm.spark.utils.MultiOutputStream
import org.scalatest.mock.MockitoSugar
import org.scalatest.{BeforeAndAfter, Matchers, FunSpec}
@@ -32,10 +35,26 @@ class PostProcessorSpecForIntegration extends FunSpec with Matchers
before {
// TODO: Move instantiation and start of interpreter to a beforeAll
// for performance improvements
- scalaInterpreter = new ScalaInterpreter(Nil, mock[OutputStream])
- with StandardSparkIMainProducer
- with StandardTaskManagerProducer
- with StandardSettingsProducer
+ scalaInterpreter = new ScalaInterpreter {
+ override protected val multiOutputStream = MultiOutputStream(List(mock[OutputStream], lastResultOut))
+ override def init(kernel: KernelLike): Interpreter = {
+ settings = newSettings(List[String]())
+
+ val urls = _thisClassloader match {
+ case cl: java.net.URLClassLoader => cl.getURLs.toList
+ case a => // TODO: Should we really be using sys.error here?
+ sys.error("[SparkInterpreter] Unexpected class loader: " + a.getClass)
+ }
+ val classpath = urls.map(_.toString)
+
+ settings.classpath.value =
+ classpath.distinct.mkString(java.io.File.pathSeparator)
+ settings.embeddedDefaults(_runtimeClassloader)
+
+ this
+ }
+ }
+ scalaInterpreter.init(mock[KernelLike])
scalaInterpreter.start()
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/kernel/src/test/scala/test/utils/SparkKernelDeployer.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/test/utils/SparkKernelDeployer.scala b/kernel/src/test/scala/test/utils/SparkKernelDeployer.scala
index 4f0412a..877b162 100644
--- a/kernel/src/test/scala/test/utils/SparkKernelDeployer.scala
+++ b/kernel/src/test/scala/test/utils/SparkKernelDeployer.scala
@@ -62,50 +62,7 @@ object SparkKernelDeployer extends LogLike with MockitoSugar {
}
}
- private trait ExposedComponentInitialization extends StandardComponentInitialization
- with LogLike {
- override protected def initializeInterpreter(config: Config): ScalaInterpreter
- with StandardSparkIMainProducer with StandardTaskManagerProducer
- with StandardSettingsProducer = {
- val interpreterArgs = config.getStringList("interpreter_args").asScala.toList
-
- logger.info("Constructing interpreter with arguments: " +
- interpreterArgs.mkString(" "))
- val interpreter = new ScalaInterpreter(interpreterArgs, mock[OutputStream])
- with StandardSparkIMainProducer
- with StandardTaskManagerProducer
- with StandardSettingsProducer
-
- logger.debug("Starting interpreter")
- interpreter.start()
-
- interpreter
- }
-
-
- /*
- def reallyInitializeSparkContext(
- config: Config,
- actorLoader: ActorLoader,
- kmBuilder: KMBuilder,
- sparkConf: SparkConf
- ): SparkContext = {
- logger.debug("Constructing new Spark Context")
- // TODO: Inject stream redirect headers in Spark dynamically
- var sparkContext: SparkContext = null
- val outStream = new KernelOutputStream(
- actorLoader, KMBuilder(), global.ScheduledTaskManager.instance)
- global.StreamState.setStreams(System.in, outStream, outStream)
- global.StreamState.withStreams {
- sparkContext = SparkContextProvider.sparkContext
- }
-
- sparkContext
- }
- */
-
- }
/**
* Runs bare initialization, wrapping socket actors with test logic to
@@ -200,7 +157,7 @@ object SparkKernelDeployer extends LogLike with MockitoSugar {
val kernelBootstrap =
(new KernelBootstrap(new CommandLineOptions(Nil).toConfig)
with ExposedBareInitialization
- with ExposedComponentInitialization
+ with StandardComponentInitialization
with StandardHandlerInitialization
with StandardHookInitialization).initialize()
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/resources/compile/reference.conf
----------------------------------------------------------------------
diff --git a/resources/compile/reference.conf b/resources/compile/reference.conf
index 2aa6d63..d64d2b8 100644
--- a/resources/compile/reference.conf
+++ b/resources/compile/reference.conf
@@ -60,6 +60,7 @@ default_interpreter = "Scala"
default_interpreter = ${?DEFAULT_INTERPRETER}
default_interpreter_plugin = [
+ "Scala:com.ibm.spark.kernel.interpreter.scala.ScalaInterpreter",
"PySpark:com.ibm.spark.kernel.interpreter.pyspark.PySparkInterpreter",
"SparkR:com.ibm.spark.kernel.interpreter.sparkr.SparkRInterpreter",
"SQL:com.ibm.spark.kernel.interpreter.sql.SqlInterpreter"
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/resources/test/reference.conf
----------------------------------------------------------------------
diff --git a/resources/test/reference.conf b/resources/test/reference.conf
index 7f3fcbb..6100469 100644
--- a/resources/test/reference.conf
+++ b/resources/test/reference.conf
@@ -58,6 +58,7 @@ default_interpreter = "Scala"
default_interpreter = ${?DEFAULT_INTERPRETER}
default_interpreter_plugin = [
+ "Scala:com.ibm.spark.kernel.interpreter.scala.ScalaInterpreter",
"PySpark:com.ibm.spark.kernel.interpreter.pyspark.PySparkInterpreter",
"SparkR:com.ibm.spark.kernel.interpreter.sparkr.SparkRInterpreter",
"SQL:com.ibm.spark.kernel.interpreter.sql.SqlInterpreter"
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreter.scala
----------------------------------------------------------------------
diff --git a/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreter.scala b/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreter.scala
index bccdf62..5d64b45 100644
--- a/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreter.scala
+++ b/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreter.scala
@@ -30,7 +30,7 @@ import com.ibm.spark.interpreter.imports.printers.{WrapperConsole, WrapperSystem
import com.ibm.spark.kernel.api.{KernelLike, KernelOptions}
import com.ibm.spark.utils.{MultiOutputStream, TaskManager}
import org.apache.spark.SparkContext
-import org.apache.spark.repl.{SparkIMain, SparkJLineCompletion}
+import org.apache.spark.repl.{SparkCommandLine, SparkIMain, SparkJLineCompletion}
import org.slf4j.LoggerFactory
import scala.concurrent.{Await, Future}
@@ -42,44 +42,43 @@ import scala.tools.nsc.util.MergedClassPath
import scala.tools.nsc.{Global, Settings, io}
import scala.util.{Try => UtilTry}
-class ScalaInterpreter(
- args: List[String],
- out: OutputStream
-) extends Interpreter {
- this: SparkIMainProducerLike
- with TaskManagerProducerLike
- with SettingsProducerLike =>
+class ScalaInterpreter() extends Interpreter {
protected val logger = LoggerFactory.getLogger(this.getClass.getName)
private val ExecutionExceptionName = "lastException"
- val settings: Settings = newSettings(args)
+ protected var settings: Settings = null
- private val _thisClassloader = this.getClass.getClassLoader
+ protected val _thisClassloader = this.getClass.getClassLoader
protected val _runtimeClassloader =
new URLClassLoader(Array(), _thisClassloader) {
def addJar(url: URL) = this.addURL(url)
}
- /* Add scala.runtime libraries to interpreter classpath */ {
- val urls = _thisClassloader match {
- case cl: java.net.URLClassLoader => cl.getURLs.toList
- case a => // TODO: Should we really be using sys.error here?
- sys.error("[SparkInterpreter] Unexpected class loader: " + a.getClass)
- }
- val classpath = urls.map(_.toString)
-
- settings.classpath.value =
- classpath.distinct.mkString(java.io.File.pathSeparator)
- settings.embeddedDefaults(_runtimeClassloader)
- }
- private val lastResultOut = new ByteArrayOutputStream()
- private val multiOutputStream = MultiOutputStream(List(out, lastResultOut))
+ protected val lastResultOut = new ByteArrayOutputStream()
+ protected val multiOutputStream = MultiOutputStream(List(Console.out, lastResultOut))
private var taskManager: TaskManager = _
var sparkIMain: SparkIMain = _
protected var jLineCompleter: SparkJLineCompletion = _
+ protected def newSparkIMain(
+ settings: Settings, out: JPrintWriter
+ ): SparkIMain = {
+ val s = new SparkIMain(settings, out)
+ s.initializeSynchronous()
+
+ s
+ }
+
+ private var maxInterpreterThreads:Int = TaskManager.DefaultMaximumWorkers
+
+ protected def newTaskManager(): TaskManager =
+ new TaskManager(maximumWorkers = maxInterpreterThreads)
+
+ protected def newSettings(args: List[String]): Settings =
+ new SparkCommandLine(args).settings
+
/**
* Adds jars to the runtime and compile time classpaths. Does not work with
* directories or expanding star in a path.
@@ -191,6 +190,25 @@ class ScalaInterpreter(
}
override def init(kernel: KernelLike): Interpreter = {
+ import scala.collection.JavaConverters._
+ val args = kernel.config.getStringList("interpreter_args").asScala.toList
+ this.settings = newSettings(args)
+
+ val urls = _thisClassloader match {
+ case cl: java.net.URLClassLoader => cl.getURLs.toList
+ case a => // TODO: Should we really be using sys.error here?
+ sys.error("[SparkInterpreter] Unexpected class loader: " + a.getClass)
+ }
+ val classpath = urls.map(_.toString)
+
+ this.settings.classpath.value =
+ classpath.distinct.mkString(java.io.File.pathSeparator)
+ this.settings.embeddedDefaults(_runtimeClassloader)
+
+ maxInterpreterThreads = kernel.config.getInt("max_interpreter_threads")
+
+ start()
+
doQuietly {
bind(
"kernel", "com.ibm.spark.kernel.api.Kernel",
@@ -201,6 +219,7 @@ class ScalaInterpreter(
this
}
+
override def interrupt(): Interpreter = {
require(sparkIMain != null && taskManager != null)
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/scala-interpreter/src/test/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/scala-interpreter/src/test/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreterSpec.scala b/scala-interpreter/src/test/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreterSpec.scala
index 34a75f7..0de546c 100644
--- a/scala-interpreter/src/test/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreterSpec.scala
+++ b/scala-interpreter/src/test/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreterSpec.scala
@@ -86,10 +86,7 @@ class ScalaInterpreterSpec extends FunSpec
}
class StubbedStartInterpreter
- extends ScalaInterpreter(mock[List[String]], mock[OutputStream])
- with SparkIMainProducerLike
- with TaskManagerProducerLike
- with SettingsProducerLike
+ extends ScalaInterpreter
{
override def newSparkIMain(settings: Settings, out: JPrintWriter): SparkIMain = mockSparkIMain
override def newTaskManager(): TaskManager = mockTaskManager
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/scala-interpreter/src/test/scala/integration/interpreter/scala/AddExternalJarMagicSpecForIntegration.scala
----------------------------------------------------------------------
diff --git a/scala-interpreter/src/test/scala/integration/interpreter/scala/AddExternalJarMagicSpecForIntegration.scala b/scala-interpreter/src/test/scala/integration/interpreter/scala/AddExternalJarMagicSpecForIntegration.scala
index 490bff0..4032435 100644
--- a/scala-interpreter/src/test/scala/integration/interpreter/scala/AddExternalJarMagicSpecForIntegration.scala
+++ b/scala-interpreter/src/test/scala/integration/interpreter/scala/AddExternalJarMagicSpecForIntegration.scala
@@ -20,7 +20,9 @@ import java.io.{ByteArrayOutputStream, OutputStream}
import com.ibm.spark.global.StreamState
import com.ibm.spark.interpreter._
+import com.ibm.spark.kernel.api.KernelLike
import com.ibm.spark.kernel.interpreter.scala.{ScalaInterpreter, StandardSettingsProducer, StandardSparkIMainProducer, StandardTaskManagerProducer}
+import com.ibm.spark.utils.MultiOutputStream
import org.scalatest.mock.MockitoSugar
import org.scalatest.{BeforeAndAfter, FunSpec, Matchers}
@@ -32,10 +34,26 @@ class AddExternalJarMagicSpecForIntegration
private var interpreter: Interpreter = _
before {
- interpreter = new ScalaInterpreter(Nil, mock[OutputStream])
- with StandardSparkIMainProducer
- with StandardTaskManagerProducer
- with StandardSettingsProducer
+ interpreter = new ScalaInterpreter {
+ override protected val multiOutputStream = MultiOutputStream(List(mock[OutputStream], lastResultOut))
+ override def init(kernel: KernelLike): Interpreter = {
+ settings = newSettings(List[String]())
+
+ val urls = _thisClassloader match {
+ case cl: java.net.URLClassLoader => cl.getURLs.toList
+ case a => // TODO: Should we really be using sys.error here?
+ sys.error("[SparkInterpreter] Unexpected class loader: " + a.getClass)
+ }
+ val classpath = urls.map(_.toString)
+
+ settings.classpath.value =
+ classpath.distinct.mkString(java.io.File.pathSeparator)
+ settings.embeddedDefaults(_runtimeClassloader)
+
+ this
+ }
+ }
+ interpreter.init(mock[KernelLike])
interpreter.start()
StreamState.setStreams(outputStream = outputResult)