You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lr...@apache.org on 2016/01/11 22:01:50 UTC

[08/50] [abbrv] incubator-toree git commit: make sure we can instantiate ScalaInterpreter the same as the rest

make sure we can instantiate ScalaInterpreter the same as the rest


Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/09ad06e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/09ad06e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/09ad06e0

Branch: refs/heads/master
Commit: 09ad06e0f4d40706f190e2a1fe692716a650e493
Parents: 25b343b
Author: Brian Burns <bb...@us.ibm.com>
Authored: Thu Nov 12 16:47:45 2015 -0500
Committer: Brian Burns <bb...@us.ibm.com>
Committed: Thu Nov 12 16:47:45 2015 -0500

----------------------------------------------------------------------
 .../com/ibm/spark/kernel/api/KernelLike.scala   |   3 +
 .../boot/layer/ComponentInitialization.scala    | 116 ++-----------------
 .../spark/boot/layer/InterpreterManager.scala   |   2 +-
 .../scala/com/ibm/spark/kernel/api/Kernel.scala |  17 ++-
 .../com/ibm/spark/kernel/api/KernelSpec.scala   |   2 +-
 .../InterpreterActorSpecForIntegration.scala    |  25 +++-
 .../PostProcessorSpecForIntegration.scala       |  27 ++++-
 .../scala/test/utils/SparkKernelDeployer.scala  |  45 +------
 resources/compile/reference.conf                |   1 +
 resources/test/reference.conf                   |   1 +
 .../interpreter/scala/ScalaInterpreter.scala    |  67 +++++++----
 .../scala/ScalaInterpreterSpec.scala            |   5 +-
 .../AddExternalJarMagicSpecForIntegration.scala |  26 ++++-
 13 files changed, 136 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/kernel-api/src/main/scala/com/ibm/spark/kernel/api/KernelLike.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/main/scala/com/ibm/spark/kernel/api/KernelLike.scala b/kernel-api/src/main/scala/com/ibm/spark/kernel/api/KernelLike.scala
index 8fb5d80..c9442aa 100644
--- a/kernel-api/src/main/scala/com/ibm/spark/kernel/api/KernelLike.scala
+++ b/kernel-api/src/main/scala/com/ibm/spark/kernel/api/KernelLike.scala
@@ -18,6 +18,7 @@ package com.ibm.spark.kernel.api
 
 import java.io.{PrintStream, InputStream, OutputStream}
 
+import com.typesafe.config.Config
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql.SQLContext
@@ -93,6 +94,8 @@ trait KernelLike {
 
   def interpreter(name: String): Option[com.ibm.spark.interpreter.Interpreter]
 
+  def config: Config
+
   def sparkContext: SparkContext
 
   def sparkConf: SparkConf

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala b/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala
index e403e9b..973075b 100644
--- a/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala
+++ b/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala
@@ -25,10 +25,6 @@ import com.ibm.spark.dependencies.{DependencyDownloader, IvyDependencyDownloader
 import com.ibm.spark.global
 import com.ibm.spark.interpreter._
 import com.ibm.spark.kernel.api.{KernelLike, Kernel}
-import com.ibm.spark.kernel.interpreter.pyspark.PySparkInterpreter
-import com.ibm.spark.kernel.interpreter.sparkr.SparkRInterpreter
-import com.ibm.spark.kernel.interpreter.scala.{TaskManagerProducerLike, StandardSparkIMainProducer, StandardSettingsProducer, ScalaInterpreter}
-import com.ibm.spark.kernel.interpreter.sql.SqlInterpreter
 import com.ibm.spark.kernel.protocol.v5.KMBuilder
 import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
 import com.ibm.spark.kernel.protocol.v5.stream.KernelOutputStream
@@ -81,105 +77,28 @@ trait StandardComponentInitialization extends ComponentInitialization {
   ) = {
     val (commStorage, commRegistrar, commManager) =
       initializeCommObjects(actorLoader)
-    val interpreter = initializeInterpreter(config)
+
+    val manager =  InterpreterManager(config)
+    val scalaInterpreter = manager.interpreters.get("Scala").orNull
 
     val dependencyDownloader = initializeDependencyDownloader(config)
     val magicLoader = initializeMagicLoader(
-      config, interpreter, dependencyDownloader)
-    val manager =  InterpreterManager(config)
-      .addInterpreter("Scala",interpreter)
+      config, scalaInterpreter, dependencyDownloader)
+
     val kernel = initializeKernel(
       config, actorLoader, manager, commManager, magicLoader
     )
-    val responseMap = initializeResponseMap()
-
 
-    /*
-    // NOTE: Tested via initializing the following and returning this
-    //       interpreter instead of the Scala one
-    val pySparkInterpreter = new PySparkInterpreter(kernel)
-    //pySparkInterpreter.start()
-    kernel.data.put("PySpark", pySparkInterpreter)
-
-    // NOTE: Tested via initializing the following and returning this
-    //       interpreter instead of the Scala one
-    val sparkRInterpreter = new SparkRInterpreter(kernel)
-    //sparkRInterpreter.start()
-    kernel.data.put("SparkR", sparkRInterpreter)
-
-    val sqlInterpreter = new SqlInterpreter(kernel)
-    //sqlInterpreter.start()
-    kernel.data.put("SQL", sqlInterpreter)
-
-
-    val plugins = initializeInterpreterPlugins(kernel, config)
-
-    kernel.data.putAll(plugins.asJava)
-
-    // Add Scala to available data map
-    kernel.data.put("Scala", interpreter)
-    val defaultInterpreter: Interpreter =
-      config.getString("default_interpreter").toLowerCase match {
-        case "scala" =>
-          logger.info("Using Scala interpreter as default!")
-          interpreter.doQuietly {
-            interpreter.bind(
-              "kernel", "com.ibm.spark.kernel.api.Kernel",
-              kernel, List( """@transient implicit""")
-            )
-          }
-          interpreter
-        case "pyspark" =>
-          logger.info("Using PySpark interpreter as default!")
-          pySparkInterpreter
-        case "sparkr" =>
-          logger.info("Using SparkR interpreter as default!")
-          sparkRInterpreter
-        case "sql" =>
-          logger.info("Using SQL interpreter as default!")
-          sqlInterpreter
-        case p if(kernel.data.containsKey(p)) =>
-          kernel.data.get(p).asInstanceOf[Interpreter]
-        case unknown =>
-          logger.warn(s"Unknown interpreter '$unknown'! Defaulting to Scala!")
-          interpreter
-      }
+    val responseMap = initializeResponseMap()
 
-    */
-    //kernel.interpreter = defaultInterpreter
     initializeSparkContext(config, kernel, appName)
 
     (commStorage, commRegistrar, commManager,
-      manager.defaultInterpreter.getOrElse(null), kernel,
+      manager.defaultInterpreter.orNull, kernel,
       dependencyDownloader, magicLoader, responseMap)
 
   }
 
-  def initializeInterpreterPlugins(
-    kernel: KernelLike,
-    config: Config
-  ): Map[String, Interpreter] = {
-    val p = config
-      .getStringList("interpreter_plugins")
-      .listIterator().asScala
-
-    p.foldLeft(Map[String, Interpreter]())( (acc, v) => {
-      v.split(":") match {
-        case Array(name, className) =>
-          try {
-            acc + (name -> Class
-              .forName(className)
-              .getConstructor(classOf[KernelLike])
-              .newInstance(kernel)
-              .asInstanceOf[Interpreter])
-          }
-          catch {
-            case _:Throwable => acc
-          }
-        case _ => acc
-      }
-    })
-  }
 
   def initializeSparkContext(config:Config, kernel:Kernel, appName:String) = {
     if(!config.getBoolean("nosparkcontext")) {
@@ -209,27 +128,6 @@ trait StandardComponentInitialization extends ComponentInitialization {
     dependencyDownloader
   }
 
-  protected def initializeInterpreter(config: Config) = {
-    val interpreterArgs = config.getStringList("interpreter_args").asScala.toList
-    val maxInterpreterThreads = config.getInt("max_interpreter_threads")
-
-    logger.info(
-      s"Constructing interpreter with $maxInterpreterThreads threads and " +
-      "with arguments: " + interpreterArgs.mkString(" "))
-    val interpreter = new ScalaInterpreter(interpreterArgs, Console.out)
-      with StandardSparkIMainProducer
-      with TaskManagerProducerLike
-      with StandardSettingsProducer {
-      override def newTaskManager(): TaskManager =
-        new TaskManager(maximumWorkers = maxInterpreterThreads)
-    }
-
-    logger.debug("Starting interpreter")
-    interpreter.start()
-
-    interpreter
-  }
-
   protected[layer] def initializeSqlContext(sparkContext: SparkContext) = {
     val sqlContext: SQLContext = try {
       logger.info("Attempting to create Hive Context")

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/kernel/src/main/scala/com/ibm/spark/boot/layer/InterpreterManager.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/boot/layer/InterpreterManager.scala b/kernel/src/main/scala/com/ibm/spark/boot/layer/InterpreterManager.scala
index 903ab69..520d68c 100644
--- a/kernel/src/main/scala/com/ibm/spark/boot/layer/InterpreterManager.scala
+++ b/kernel/src/main/scala/com/ibm/spark/boot/layer/InterpreterManager.scala
@@ -26,7 +26,7 @@ case class InterpreterManager(
     copy(interpreters = interpreters + (name -> interpreter))
   }
 
-  def defaultInterpreter(): Option[Interpreter] = {
+  def defaultInterpreter: Option[Interpreter] = {
     interpreters.get(default)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala b/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala
index 9a91a2e..fe6bc2d 100644
--- a/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala
+++ b/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala
@@ -46,19 +46,20 @@ import com.ibm.spark.global.ExecuteRequestState
 /**
  * Represents the main kernel API to be used for interaction.
  *
- * @param config The configuration used when starting the kernel
+ * @param _config The configuration used when starting the kernel
  * @param interpreterManager The interpreter manager to expose in this instance
  * @param comm The Comm manager to expose in this instance
  * @param actorLoader The actor loader to use for message relaying
  */
 @Experimental
 class Kernel (
-  private val config: Config,
+  private val _config: Config,
   private val actorLoader: ActorLoader,
   val interpreterManager: InterpreterManager,
   val comm: CommManager,
   val magicLoader: MagicLoader
 ) extends KernelLike with LogLike {
+
   /**
    * Represents the current input stream used by the kernel for the specific
    * thread.
@@ -111,7 +112,7 @@ class Kernel (
 
   interpreterManager.initializeInterpreters(this)
 
-  val interpreter = interpreterManager.defaultInterpreter().get
+  val interpreter = interpreterManager.defaultInterpreter.get
 
   /**
    * Handles the output of interpreting code.
@@ -136,6 +137,10 @@ class Kernel (
     }
   }
 
+  override def config:Config = {
+    _config
+  }
+
   /**
    * Executes a block of code represented as a string and returns the result.
    *
@@ -202,7 +207,7 @@ class Kernel (
     parentMessage: v5.KernelMessage = lastKernelMessage(),
     kmBuilder: v5.KMBuilder = v5.KMBuilder()
   ): FactoryMethods = {
-    new FactoryMethods(config, actorLoader, parentMessage, kmBuilder)
+    new FactoryMethods(_config, actorLoader, parentMessage, kmBuilder)
   }
 
   /**
@@ -347,7 +352,7 @@ class Kernel (
     conf.set("spark.submit.deployMode", "client")
 
     KeyValuePairUtils.stringToKeyValuePairSeq(
-      config.getString("spark_configuration")
+      _config.getString("spark_configuration")
     ).foreach { keyValuePair =>
       logger.info(s"Setting ${keyValuePair.key} to ${keyValuePair.value}")
       Try(conf.set(keyValuePair.key, keyValuePair.value))
@@ -372,7 +377,7 @@ class Kernel (
     var sparkContext: SparkContext = null
     val outStream = new KernelOutputStream(
       actorLoader, KMBuilder(), global.ScheduledTaskManager.instance,
-      sendEmptyOutput = config.getBoolean("send_empty_output")
+      sendEmptyOutput = _config.getBoolean("send_empty_output")
     )
 
     // Update global stream state and use it to set the Console local variables

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala b/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala
index f5d5517..a5756a9 100644
--- a/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala
+++ b/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala
@@ -44,7 +44,7 @@ class KernelSpec extends FunSpec with Matchers with MockitoSugar
     mockInterpreterManager = mock[InterpreterManager]
     mockSparkContext = mock[SparkContext]
     mockSparkConf = mock[SparkConf]
-    when(mockInterpreterManager.defaultInterpreter())
+    when(mockInterpreterManager.defaultInterpreter)
       .thenReturn(Some(mockInterpreter))
     when(mockInterpreterManager.interpreters)
       .thenReturn(Map[String, com.ibm.spark.interpreter.Interpreter]())

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/kernel/src/test/scala/integration/InterpreterActorSpecForIntegration.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/integration/InterpreterActorSpecForIntegration.scala b/kernel/src/test/scala/integration/InterpreterActorSpecForIntegration.scala
index 93c86bc..09aa6fd 100644
--- a/kernel/src/test/scala/integration/InterpreterActorSpecForIntegration.scala
+++ b/kernel/src/test/scala/integration/InterpreterActorSpecForIntegration.scala
@@ -21,11 +21,13 @@ import java.io.{ByteArrayOutputStream, OutputStream}
 import akka.actor.{ActorSystem, Props}
 import akka.testkit.{ImplicitSender, TestKit}
 import com.ibm.spark.interpreter._
+import com.ibm.spark.kernel.api.KernelLike
 import com.ibm.spark.kernel.interpreter.scala.{StandardTaskManagerProducer, StandardSparkIMainProducer, StandardSettingsProducer, ScalaInterpreter}
 import com.ibm.spark.kernel.protocol.v5._
 import com.ibm.spark.kernel.protocol.v5.content._
 import com.ibm.spark.kernel.protocol.v5.interpreter.InterpreterActor
 import com.ibm.spark.kernel.protocol.v5.interpreter.tasks.InterpreterTaskFactory
+import com.ibm.spark.utils.MultiOutputStream
 import com.typesafe.config.ConfigFactory
 import org.apache.spark.{SparkConf, SparkContext}
 import org.scalatest.mock.MockitoSugar
@@ -52,11 +54,25 @@ class InterpreterActorSpecForIntegration extends TestKit(
   with MockitoSugar with UncaughtExceptionSuppression {
 
   private val output = new ByteArrayOutputStream()
-  private val interpreter = new ScalaInterpreter(List(), output)
-    with StandardSparkIMainProducer
-    with StandardTaskManagerProducer
-    with StandardSettingsProducer
+  private val interpreter = new ScalaInterpreter {
+    override protected val multiOutputStream = MultiOutputStream(List(mock[OutputStream], lastResultOut))
+    override def init(kernel: KernelLike): Interpreter = {
+      settings = newSettings(List[String]())
+
+      val urls = _thisClassloader match {
+        case cl: java.net.URLClassLoader => cl.getURLs.toList
+        case a => // TODO: Should we really be using sys.error here?
+          sys.error("[SparkInterpreter] Unexpected class loader: " + a.getClass)
+      }
+      val classpath = urls.map(_.toString)
+
+      settings.classpath.value =
+        classpath.distinct.mkString(java.io.File.pathSeparator)
+      settings.embeddedDefaults(_runtimeClassloader)
 
+      this
+    }
+  }
   private val conf = new SparkConf()
     .setMaster("local[*]")
     .setAppName("Test Kernel")
@@ -65,6 +81,7 @@ class InterpreterActorSpecForIntegration extends TestKit(
 
   before {
     output.reset()
+    interpreter.init(mock[KernelLike])
     interpreter.start()
 
 

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/kernel/src/test/scala/integration/PostProcessorSpecForIntegration.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/integration/PostProcessorSpecForIntegration.scala b/kernel/src/test/scala/integration/PostProcessorSpecForIntegration.scala
index 28d08b1..2bdce57 100644
--- a/kernel/src/test/scala/integration/PostProcessorSpecForIntegration.scala
+++ b/kernel/src/test/scala/integration/PostProcessorSpecForIntegration.scala
@@ -18,8 +18,11 @@ package integration
 
 import java.io.OutputStream
 
+import com.ibm.spark.interpreter.Interpreter
+import com.ibm.spark.kernel.api.KernelLike
 import com.ibm.spark.kernel.interpreter.scala.{StandardSettingsProducer, StandardTaskManagerProducer, StandardSparkIMainProducer, ScalaInterpreter}
 import com.ibm.spark.kernel.protocol.v5.magic.PostProcessor
+import com.ibm.spark.utils.MultiOutputStream
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{BeforeAndAfter, Matchers, FunSpec}
 
@@ -32,10 +35,26 @@ class PostProcessorSpecForIntegration extends FunSpec with Matchers
   before {
     // TODO: Move instantiation and start of interpreter to a beforeAll
     //       for performance improvements
-    scalaInterpreter = new ScalaInterpreter(Nil, mock[OutputStream])
-      with StandardSparkIMainProducer
-      with StandardTaskManagerProducer
-      with StandardSettingsProducer
+    scalaInterpreter = new ScalaInterpreter {
+      override protected val multiOutputStream = MultiOutputStream(List(mock[OutputStream], lastResultOut))
+      override def init(kernel: KernelLike): Interpreter = {
+        settings = newSettings(List[String]())
+
+        val urls = _thisClassloader match {
+          case cl: java.net.URLClassLoader => cl.getURLs.toList
+          case a => // TODO: Should we really be using sys.error here?
+            sys.error("[SparkInterpreter] Unexpected class loader: " + a.getClass)
+        }
+        val classpath = urls.map(_.toString)
+
+        settings.classpath.value =
+          classpath.distinct.mkString(java.io.File.pathSeparator)
+        settings.embeddedDefaults(_runtimeClassloader)
+
+        this
+      }
+    }
+    scalaInterpreter.init(mock[KernelLike])
 
     scalaInterpreter.start()
 

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/kernel/src/test/scala/test/utils/SparkKernelDeployer.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/test/utils/SparkKernelDeployer.scala b/kernel/src/test/scala/test/utils/SparkKernelDeployer.scala
index 4f0412a..877b162 100644
--- a/kernel/src/test/scala/test/utils/SparkKernelDeployer.scala
+++ b/kernel/src/test/scala/test/utils/SparkKernelDeployer.scala
@@ -62,50 +62,7 @@ object SparkKernelDeployer extends LogLike with MockitoSugar {
     }
   }
 
-  private trait ExposedComponentInitialization extends StandardComponentInitialization
-    with LogLike {
-    override protected def initializeInterpreter(config: Config): ScalaInterpreter
-      with StandardSparkIMainProducer with StandardTaskManagerProducer
-      with StandardSettingsProducer = {
-      val interpreterArgs = config.getStringList("interpreter_args").asScala.toList
-
-      logger.info("Constructing interpreter with arguments: " +
-        interpreterArgs.mkString(" "))
-      val interpreter = new ScalaInterpreter(interpreterArgs, mock[OutputStream])
-        with StandardSparkIMainProducer
-        with StandardTaskManagerProducer
-        with StandardSettingsProducer
-
-      logger.debug("Starting interpreter")
-      interpreter.start()
-
-      interpreter
-    }
-
-
 
-    /*
-    def reallyInitializeSparkContext(
-      config: Config,
-      actorLoader: ActorLoader,
-      kmBuilder: KMBuilder,
-      sparkConf: SparkConf
-    ): SparkContext = {
-      logger.debug("Constructing new Spark Context")
-      // TODO: Inject stream redirect headers in Spark dynamically
-      var sparkContext: SparkContext = null
-      val outStream = new KernelOutputStream(
-        actorLoader, KMBuilder(), global.ScheduledTaskManager.instance)
-      global.StreamState.setStreams(System.in, outStream, outStream)
-      global.StreamState.withStreams {
-        sparkContext = SparkContextProvider.sparkContext
-      }
-
-      sparkContext
-     }
-     */
-
-  }
 
   /**
    * Runs bare initialization, wrapping socket actors with test logic to
@@ -200,7 +157,7 @@ object SparkKernelDeployer extends LogLike with MockitoSugar {
     val kernelBootstrap =
       (new KernelBootstrap(new CommandLineOptions(Nil).toConfig)
         with ExposedBareInitialization
-        with ExposedComponentInitialization
+        with StandardComponentInitialization
         with StandardHandlerInitialization
         with StandardHookInitialization).initialize()
 

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/resources/compile/reference.conf
----------------------------------------------------------------------
diff --git a/resources/compile/reference.conf b/resources/compile/reference.conf
index 2aa6d63..d64d2b8 100644
--- a/resources/compile/reference.conf
+++ b/resources/compile/reference.conf
@@ -60,6 +60,7 @@ default_interpreter = "Scala"
 default_interpreter = ${?DEFAULT_INTERPRETER}
 
 default_interpreter_plugin = [
+  "Scala:com.ibm.spark.kernel.interpreter.scala.ScalaInterpreter",
   "PySpark:com.ibm.spark.kernel.interpreter.pyspark.PySparkInterpreter",
   "SparkR:com.ibm.spark.kernel.interpreter.sparkr.SparkRInterpreter",
   "SQL:com.ibm.spark.kernel.interpreter.sql.SqlInterpreter"

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/resources/test/reference.conf
----------------------------------------------------------------------
diff --git a/resources/test/reference.conf b/resources/test/reference.conf
index 7f3fcbb..6100469 100644
--- a/resources/test/reference.conf
+++ b/resources/test/reference.conf
@@ -58,6 +58,7 @@ default_interpreter = "Scala"
 default_interpreter = ${?DEFAULT_INTERPRETER}
 
 default_interpreter_plugin = [
+  "Scala:com.ibm.spark.kernel.interpreter.scala.ScalaInterpreter",
   "PySpark:com.ibm.spark.kernel.interpreter.pyspark.PySparkInterpreter",
   "SparkR:com.ibm.spark.kernel.interpreter.sparkr.SparkRInterpreter",
   "SQL:com.ibm.spark.kernel.interpreter.sql.SqlInterpreter"

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreter.scala
----------------------------------------------------------------------
diff --git a/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreter.scala b/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreter.scala
index bccdf62..5d64b45 100644
--- a/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreter.scala
+++ b/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreter.scala
@@ -30,7 +30,7 @@ import com.ibm.spark.interpreter.imports.printers.{WrapperConsole, WrapperSystem
 import com.ibm.spark.kernel.api.{KernelLike, KernelOptions}
 import com.ibm.spark.utils.{MultiOutputStream, TaskManager}
 import org.apache.spark.SparkContext
-import org.apache.spark.repl.{SparkIMain, SparkJLineCompletion}
+import org.apache.spark.repl.{SparkCommandLine, SparkIMain, SparkJLineCompletion}
 import org.slf4j.LoggerFactory
 
 import scala.concurrent.{Await, Future}
@@ -42,44 +42,43 @@ import scala.tools.nsc.util.MergedClassPath
 import scala.tools.nsc.{Global, Settings, io}
 import scala.util.{Try => UtilTry}
 
-class ScalaInterpreter(
-  args: List[String],
-  out: OutputStream
-) extends Interpreter {
-  this: SparkIMainProducerLike
-    with TaskManagerProducerLike
-    with SettingsProducerLike =>
+class ScalaInterpreter() extends Interpreter {
 
   protected val logger = LoggerFactory.getLogger(this.getClass.getName)
 
   private val ExecutionExceptionName = "lastException"
-  val settings: Settings = newSettings(args)
+  protected var settings: Settings = null
 
-  private val _thisClassloader = this.getClass.getClassLoader
+  protected val _thisClassloader = this.getClass.getClassLoader
   protected val _runtimeClassloader =
     new URLClassLoader(Array(), _thisClassloader) {
       def addJar(url: URL) = this.addURL(url)
     }
 
-  /* Add scala.runtime libraries to interpreter classpath */ {
-    val urls = _thisClassloader match {
-      case cl: java.net.URLClassLoader => cl.getURLs.toList
-      case a => // TODO: Should we really be using sys.error here?
-        sys.error("[SparkInterpreter] Unexpected class loader: " + a.getClass)
-    }
-    val classpath = urls.map(_.toString)
-
-    settings.classpath.value =
-      classpath.distinct.mkString(java.io.File.pathSeparator)
-    settings.embeddedDefaults(_runtimeClassloader)
-  }
 
-  private val lastResultOut = new ByteArrayOutputStream()
-  private val multiOutputStream = MultiOutputStream(List(out, lastResultOut))
+  protected val lastResultOut = new ByteArrayOutputStream()
+  protected val multiOutputStream = MultiOutputStream(List(Console.out, lastResultOut))
   private var taskManager: TaskManager = _
   var sparkIMain: SparkIMain = _
   protected var jLineCompleter: SparkJLineCompletion = _
 
+  protected def newSparkIMain(
+    settings: Settings, out: JPrintWriter
+  ): SparkIMain = {
+    val s = new SparkIMain(settings, out)
+    s.initializeSynchronous()
+
+    s
+  }
+
+  private var maxInterpreterThreads:Int = TaskManager.DefaultMaximumWorkers
+
+  protected def newTaskManager(): TaskManager =
+    new TaskManager(maximumWorkers = maxInterpreterThreads)
+
+  protected def newSettings(args: List[String]): Settings =
+    new SparkCommandLine(args).settings
+
   /**
    * Adds jars to the runtime and compile time classpaths. Does not work with
    * directories or expanding star in a path.
@@ -191,6 +190,25 @@ class ScalaInterpreter(
   }
 
   override def init(kernel: KernelLike): Interpreter = {
+    import scala.collection.JavaConverters._
+    val args = kernel.config.getStringList("interpreter_args").asScala.toList
+    this.settings = newSettings(args)
+
+    val urls = _thisClassloader match {
+      case cl: java.net.URLClassLoader => cl.getURLs.toList
+      case a => // TODO: Should we really be using sys.error here?
+        sys.error("[SparkInterpreter] Unexpected class loader: " + a.getClass)
+    }
+    val classpath = urls.map(_.toString)
+
+    this.settings.classpath.value =
+      classpath.distinct.mkString(java.io.File.pathSeparator)
+    this.settings.embeddedDefaults(_runtimeClassloader)
+
+    maxInterpreterThreads = kernel.config.getInt("max_interpreter_threads")
+
+    start()
+
     doQuietly {
       bind(
         "kernel", "com.ibm.spark.kernel.api.Kernel",
@@ -201,6 +219,7 @@ class ScalaInterpreter(
     this
   }
 
+
   override def interrupt(): Interpreter = {
     require(sparkIMain != null && taskManager != null)
 

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/scala-interpreter/src/test/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/scala-interpreter/src/test/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreterSpec.scala b/scala-interpreter/src/test/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreterSpec.scala
index 34a75f7..0de546c 100644
--- a/scala-interpreter/src/test/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreterSpec.scala
+++ b/scala-interpreter/src/test/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreterSpec.scala
@@ -86,10 +86,7 @@ class ScalaInterpreterSpec extends FunSpec
   }
 
   class StubbedStartInterpreter
-    extends ScalaInterpreter(mock[List[String]], mock[OutputStream])
-    with SparkIMainProducerLike
-    with TaskManagerProducerLike
-    with SettingsProducerLike
+    extends ScalaInterpreter
   {
     override def newSparkIMain(settings: Settings, out: JPrintWriter): SparkIMain = mockSparkIMain
     override def newTaskManager(): TaskManager = mockTaskManager

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/09ad06e0/scala-interpreter/src/test/scala/integration/interpreter/scala/AddExternalJarMagicSpecForIntegration.scala
----------------------------------------------------------------------
diff --git a/scala-interpreter/src/test/scala/integration/interpreter/scala/AddExternalJarMagicSpecForIntegration.scala b/scala-interpreter/src/test/scala/integration/interpreter/scala/AddExternalJarMagicSpecForIntegration.scala
index 490bff0..4032435 100644
--- a/scala-interpreter/src/test/scala/integration/interpreter/scala/AddExternalJarMagicSpecForIntegration.scala
+++ b/scala-interpreter/src/test/scala/integration/interpreter/scala/AddExternalJarMagicSpecForIntegration.scala
@@ -20,7 +20,9 @@ import java.io.{ByteArrayOutputStream, OutputStream}
 
 import com.ibm.spark.global.StreamState
 import com.ibm.spark.interpreter._
+import com.ibm.spark.kernel.api.KernelLike
 import com.ibm.spark.kernel.interpreter.scala.{ScalaInterpreter, StandardSettingsProducer, StandardSparkIMainProducer, StandardTaskManagerProducer}
+import com.ibm.spark.utils.MultiOutputStream
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{BeforeAndAfter, FunSpec, Matchers}
 
@@ -32,10 +34,26 @@ class AddExternalJarMagicSpecForIntegration
   private var interpreter: Interpreter = _
 
   before {
-    interpreter = new ScalaInterpreter(Nil, mock[OutputStream])
-      with StandardSparkIMainProducer
-      with StandardTaskManagerProducer
-      with StandardSettingsProducer
+    interpreter = new ScalaInterpreter {
+      override protected val multiOutputStream = MultiOutputStream(List(mock[OutputStream], lastResultOut))
+      override def init(kernel: KernelLike): Interpreter = {
+        settings = newSettings(List[String]())
+
+        val urls = _thisClassloader match {
+          case cl: java.net.URLClassLoader => cl.getURLs.toList
+          case a => // TODO: Should we really be using sys.error here?
+            sys.error("[SparkInterpreter] Unexpected class loader: " + a.getClass)
+        }
+        val classpath = urls.map(_.toString)
+
+        settings.classpath.value =
+          classpath.distinct.mkString(java.io.File.pathSeparator)
+        settings.embeddedDefaults(_runtimeClassloader)
+
+        this
+      }
+    }
+    interpreter.init(mock[KernelLike])
     interpreter.start()
 
     StreamState.setStreams(outputStream = outputResult)