You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lr...@apache.org on 2016/01/11 22:01:44 UTC
[02/50] [abbrv] incubator-toree git commit: Begin Interpreter Plugin
Begin Interpreter Plugin
Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/708180ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/708180ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/708180ad
Branch: refs/heads/master
Commit: 708180ad0d7dc10eda8ce3d4ab199ea8a8f67952
Parents: 9db161f
Author: Brian Burns <bb...@us.ibm.com>
Authored: Wed Nov 4 13:44:22 2015 -0500
Committer: Brian Burns <bb...@us.ibm.com>
Committed: Wed Nov 4 13:44:22 2015 -0500
----------------------------------------------------------------------
.../com/ibm/spark/boot/CommandLineOptions.scala | 11 +-
.../boot/layer/ComponentInitialization.scala | 187 ++++---------------
.../StandardComponentInitializationSpec.scala | 76 +++-----
.../scala/test/utils/DummyInterpreter.scala | 105 +++++++++++
4 files changed, 173 insertions(+), 206 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/708180ad/kernel/src/main/scala/com/ibm/spark/boot/CommandLineOptions.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/boot/CommandLineOptions.scala b/kernel/src/main/scala/com/ibm/spark/boot/CommandLineOptions.scala
index 069cab7..27c5c68 100644
--- a/kernel/src/main/scala/com/ibm/spark/boot/CommandLineOptions.scala
+++ b/kernel/src/main/scala/com/ibm/spark/boot/CommandLineOptions.scala
@@ -96,6 +96,9 @@ class CommandLineOptions(args: Seq[String]) {
private val _nosparkcontext =
parser.accepts("nosparkcontext", "kernel should not create a spark context")
+ private val _plugins = parser.accepts(
+ "interpreter-plugin"
+ ).withRequiredArg().ofType(classOf[String])
private val options = parser.parse(args.map(_.trim): _*)
@@ -152,7 +155,8 @@ class CommandLineOptions(args: Seq[String]) {
"max_interpreter_threads" -> get(_max_interpreter_threads),
"jar_dir" -> get(_jar_dir),
"default_interpreter" -> get(_default_interpreter),
- "nosparkcontext" -> (if (has(_nosparkcontext)) Some(true) else Some(false))
+ "nosparkcontext" -> (if (has(_nosparkcontext)) Some(true) else Some(false)),
+ "interpreter_plugins" -> interpreterPlugins
).flatMap(removeEmptyOptions).asInstanceOf[Map[String, AnyRef]].asJava)
commandLineConfig.withFallback(profileConfig).withFallback(ConfigFactory.load)
@@ -173,6 +177,11 @@ class CommandLineOptions(args: Seq[String]) {
}
}
+ private def interpreterPlugins: Option[java.util.List[String]] = {
+ val p = getAll(_plugins)
+ p.map(_.asJava)
+ }
+
/**
* Prints the help message to the output stream provided.
* @param out The output stream to direct the help message
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/708180ad/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala b/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala
index 8ce6360..55b133d 100644
--- a/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala
+++ b/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala
@@ -16,6 +16,7 @@
package com.ibm.spark.boot.layer
+import java.util
import java.util.concurrent.ConcurrentHashMap
import akka.actor.ActorRef
@@ -23,7 +24,7 @@ import com.ibm.spark.comm.{CommManager, KernelCommManager, CommRegistrar, CommSt
import com.ibm.spark.dependencies.{DependencyDownloader, IvyDependencyDownloader}
import com.ibm.spark.global
import com.ibm.spark.interpreter._
-import com.ibm.spark.kernel.api.Kernel
+import com.ibm.spark.kernel.api.{KernelLike, Kernel}
import com.ibm.spark.kernel.interpreter.pyspark.PySparkInterpreter
import com.ibm.spark.kernel.interpreter.sparkr.SparkRInterpreter
import com.ibm.spark.kernel.interpreter.scala.{TaskManagerProducerLike, StandardSparkIMainProducer, StandardSettingsProducer, ScalaInterpreter}
@@ -82,13 +83,6 @@ trait StandardComponentInitialization extends ComponentInitialization {
initializeCommObjects(actorLoader)
val interpreter = initializeInterpreter(config)
- //val sparkContext = null
- //val sparkContext = initializeSparkContext(
- // config, appName, actorLoader, interpreter)
- //val sqlContext = null
- //val sqlContext = initializeSqlContext(sparkContext)
- //updateInterpreterWithSqlContext(sqlContext, interpreter)
-
val dependencyDownloader = initializeDependencyDownloader(config)
val magicLoader = initializeMagicLoader(
config, interpreter, dependencyDownloader)
@@ -113,6 +107,11 @@ trait StandardComponentInitialization extends ComponentInitialization {
//sqlInterpreter.start()
kernel.data.put("SQL", sqlInterpreter)
+
+ val plugins = initializeInterpreterPlugins(kernel, config)
+
+ kernel.data.putAll(plugins.asJava)
+
// Add Scala to available data map
kernel.data.put("Scala", interpreter)
val defaultInterpreter: Interpreter =
@@ -129,6 +128,8 @@ trait StandardComponentInitialization extends ComponentInitialization {
case "sql" =>
logger.info("Using SQL interpreter as default!")
sqlInterpreter
+ case p if(kernel.data.containsKey(p)) =>
+ kernel.data.get(p).asInstanceOf[Interpreter]
case unknown =>
logger.warn(s"Unknown interpreter '$unknown'! Defaulting to Scala!")
interpreter
@@ -141,6 +142,32 @@ trait StandardComponentInitialization extends ComponentInitialization {
}
+ def initializeInterpreterPlugins(
+ kernel: KernelLike,
+ config: Config
+ ): Map[String, Interpreter] = {
+ val p = config
+ .getStringList("interpreter_plugins")
+ .listIterator().asScala
+
+ p.foldLeft(Map[String, Interpreter]())( (acc, v) => {
+ v.split(":") match {
+ case Array(name, className) =>
+ try {
+ acc + (name -> Class
+ .forName(className)
+ .getConstructor(classOf[KernelLike])
+ .newInstance(kernel)
+ .asInstanceOf[Interpreter])
+ }
+ catch {
+ case _:Throwable => acc
+ }
+ case _ => acc
+ }
+ })
+ }
+
def initializeSparkContext(config:Config, kernel:Kernel, appName:String) = {
if(!config.getBoolean("nosparkcontext")) {
kernel.createSparkContext(config.getString("spark.master"), appName)
@@ -190,150 +217,6 @@ trait StandardComponentInitialization extends ComponentInitialization {
interpreter
}
- // TODO: Think of a better way to test without exposing this
- /*
- protected[layer] def initializeSparkContext(
- config: Config, appName: String, actorLoader: ActorLoader,
- interpreter: Interpreter
- ) = {
- logger.debug("Creating Spark Configuration")
- val conf = new SparkConf()
-
- val master = config.getString("spark.master")
- logger.info("Using " + master + " as Spark Master")
- conf.setMaster(master)
-
- logger.info("Setting deployMode to client")
- conf.set("spark.submit.deployMode", "client")
-
- logger.info("Using " + appName + " as Spark application name")
- conf.setAppName(appName)
-
- KeyValuePairUtils.stringToKeyValuePairSeq(
- config.getString("spark_configuration")
- ).foreach { keyValuePair =>
- logger.info(s"Setting ${keyValuePair.key} to ${keyValuePair.value}")
- Try(conf.set(keyValuePair.key, keyValuePair.value))
- }
-
- // TODO: Move SparkIMain to private and insert in a different way
- logger.warn("Locked to Scala interpreter with SparkIMain until decoupled!")
-
- // TODO: Construct class server outside of SparkIMain
- logger.warn("Unable to control initialization of REPL class server!")
- logger.info("REPL Class Server Uri: " + interpreter.classServerURI)
- conf.set("spark.repl.class.uri", interpreter.classServerURI)
-
- val sparkContext = reallyInitializeSparkContext(
- config, actorLoader, KMBuilder(), conf
- )
-
- updateInterpreterWithSparkContext(
- config, sparkContext, interpreter)
-
- sparkContext
- }
-
- // TODO: Think of a better way to test without exposing this
- protected[layer] def reallyInitializeSparkContext(
- config: Config, actorLoader: ActorLoader, kmBuilder: KMBuilder,
- sparkConf: SparkConf
- ): SparkContext = {
- logger.debug("Constructing new Spark Context")
- // TODO: Inject stream redirect headers in Spark dynamically
- var sparkContext: SparkContext = null
- val outStream = new KernelOutputStream(
- actorLoader, KMBuilder(), global.ScheduledTaskManager.instance,
- sendEmptyOutput = config.getBoolean("send_empty_output")
- )
-
- // Update global stream state and use it to set the Console local variables
- // for threads in the Spark threadpool
- global.StreamState.setStreams(System.in, outStream, outStream)
- global.StreamState.withStreams {
- sparkContext = new SparkContext(sparkConf)
- }
-
- sparkContext
- }
-
- // TODO: Think of a better way to test without exposing this
- protected[layer] def updateInterpreterWithSparkContext(
- config: Config, sparkContext: SparkContext, interpreter: Interpreter
- ) = {
- interpreter.doQuietly {
- logger.debug("Binding context into interpreter")
- interpreter.bind(
- "sc", "org.apache.spark.SparkContext",
- sparkContext, List( """@transient"""))
-
- // NOTE: This is needed because interpreter blows up after adding
- // dependencies to SparkContext and Interpreter before the
- // cluster has been used... not exactly sure why this is the case
- // TODO: Investigate why the cluster has to be initialized in the kernel
- // to avoid the kernel's interpreter blowing up (must be done
- // inside the interpreter)
- logger.debug("Initializing Spark cluster in interpreter")
-
- interpreter.doQuietly {
- interpreter.interpret("""
- | val $toBeNulled = {
- | var $toBeNulled = sc.emptyRDD.collect()
- | $toBeNulled = null
- | }
- |
- |""".stripMargin)
- }
- }
-
- // Add ourselves as a dependency
- // TODO: Provide ability to point to library as commandline argument
- // TODO: Provide better method to determine if can add ourselves
- // TODO: Avoid duplicating request for master twice (initializeSparkContext
- // also does this)
- val master = config.getString("spark.master")
- // If in local mode, do not need to add our jars as dependencies
- if (!master.toLowerCase.startsWith("local")) {
- @inline def getJarPathFor(klass: Class[_]): String =
- klass.getProtectionDomain.getCodeSource.getLocation.getPath
-
- // TODO: Provide less hard-coded solution in case additional dependencies
- // are added or classes are refactored to different projects
- val jarPaths = Seq(
- // Macro project
- classOf[com.ibm.spark.annotations.Experimental],
-
- // Protocol project
- classOf[com.ibm.spark.kernel.protocol.v5.KernelMessage],
-
- // Communication project
- classOf[com.ibm.spark.communication.SocketManager],
-
- // Kernel-api project
- classOf[com.ibm.spark.kernel.api.KernelLike],
-
- // Scala-interpreter project
- classOf[com.ibm.spark.kernel.interpreter.scala.ScalaInterpreter],
-
- // PySpark-interpreter project
- classOf[com.ibm.spark.kernel.interpreter.pyspark.PySparkInterpreter],
-
- // SparkR-interpreter project
- classOf[com.ibm.spark.kernel.interpreter.sparkr.SparkRInterpreter],
-
- // Kernel project
- classOf[com.ibm.spark.boot.KernelBootstrap]
- ).map(getJarPathFor)
-
- logger.info("Adding kernel jars to cluster:\n- " +
- jarPaths.mkString("\n- "))
- jarPaths.foreach(sparkContext.addJar)
- } else {
- logger.info("Running in local mode! Not adding self as dependency!")
- }
- }
- */
-
protected[layer] def initializeSqlContext(sparkContext: SparkContext) = {
val sqlContext: SQLContext = try {
logger.info("Attempting to create Hive Context")
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/708180ad/kernel/src/test/scala/com/ibm/spark/boot/layer/StandardComponentInitializationSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/com/ibm/spark/boot/layer/StandardComponentInitializationSpec.scala b/kernel/src/test/scala/com/ibm/spark/boot/layer/StandardComponentInitializationSpec.scala
index f6a9235..68ee0cc 100644
--- a/kernel/src/test/scala/com/ibm/spark/boot/layer/StandardComponentInitializationSpec.scala
+++ b/kernel/src/test/scala/com/ibm/spark/boot/layer/StandardComponentInitializationSpec.scala
@@ -16,8 +16,9 @@
package com.ibm.spark.boot.layer
-import com.ibm.spark.boot.KernelBootstrap
+import com.ibm.spark.boot.{CommandLineOptions, KernelBootstrap}
import com.ibm.spark.interpreter.Interpreter
+import com.ibm.spark.kernel.api.KernelLike
import com.ibm.spark.kernel.protocol.v5.KMBuilder
import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
import com.ibm.spark.utils.LogLike
@@ -29,6 +30,8 @@ import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
import org.scalatest.{BeforeAndAfter, FunSpec, Matchers}
+import scala.collection.mutable
+import scala.collection.JavaConverters._
class StandardComponentInitializationSpec extends FunSpec with Matchers
with MockitoSugar with BeforeAndAfter
{
@@ -38,6 +41,7 @@ class StandardComponentInitializationSpec extends FunSpec with Matchers
private var mockActorLoader: ActorLoader = _
private var mockSparkContext: SparkContext = _
private var mockInterpreter: Interpreter = _
+ private var mockKernel: KernelLike = _
private var spyComponentInitialization: StandardComponentInitialization = _
private class TestComponentInitialization
@@ -48,67 +52,33 @@ class StandardComponentInitializationSpec extends FunSpec with Matchers
mockActorLoader = mock[ActorLoader]
mockSparkContext = mock[SparkContext]
mockInterpreter = mock[Interpreter]
+ mockKernel = mock[KernelLike]
spyComponentInitialization = spy(new TestComponentInitialization())
}
- /*
describe("StandardComponentInitialization") {
- describe("when spark.master is set in config") {
- it("should set spark.master in SparkConf") {
- val expected = "some value"
- doReturn(expected).when(mockConfig).getString("spark.master")
- doReturn("").when(mockConfig).getString("spark_configuration")
-
- // Stub out other helper methods to avoid long init process and to
- // avoid failure when creating SparkContext
- doReturn(mockSparkContext).when(spyComponentInitialization)
- .reallyInitializeSparkContext(
- any[Config], any[ActorLoader], any[KMBuilder], any[SparkConf])
- doNothing().when(spyComponentInitialization)
- .updateInterpreterWithSparkContext(
- any[Config], any[SparkContext], any[Interpreter])
-
- // Provide stub for interpreter classServerURI since also executed
- doReturn("").when(mockInterpreter).classServerURI
-
- val sparkContext = spyComponentInitialization.initializeSparkContext(
- mockConfig, TestAppName, mockActorLoader, mockInterpreter)
-
- val sparkConf = {
- val sparkConfCaptor = ArgumentCaptor.forClass(classOf[SparkConf])
- verify(spyComponentInitialization).reallyInitializeSparkContext(
- any[Config], any[ActorLoader], any[KMBuilder],
- sparkConfCaptor.capture()
- )
- sparkConfCaptor.getValue
- }
-
- sparkConf.get("spark.master") should be (expected)
+ describe("#initializeInterpreterPlugins") {
+ it("should return a map with the DummyInterpreter") {
+ val conf = new CommandLineOptions(List(
+ "--interpreter-plugin", "dummy:test.utils.DummyInterpreter",
+ "--interpreter-plugin", "dummy2:test.utils.DummyInterpreter"
+ )).toConfig
+
+ val m = spyComponentInitialization
+ .initializeInterpreterPlugins(mockKernel, conf)
+
+ m.get("dummy") should not be None
+ m.get("dummy2") should not be None
}
+ it("should return an empty map") {
+ val conf = new CommandLineOptions(List()).toConfig
- it("should not add ourselves as a jar if spark.master is not local") {
- doReturn("local[*]").when(mockConfig).getString("spark.master")
-
- spyComponentInitialization.updateInterpreterWithSparkContext(
- mockConfig, mockSparkContext, mockInterpreter)
- verify(mockSparkContext, never()).addJar(anyString())
- }
-
- it("should add ourselves as a jar if spark.master is not local") {
- doReturn("notlocal").when(mockConfig).getString("spark.master")
-
- // TODO: This is going to be outdated when we determine a way to
- // re-include all jars
- val expected =
- com.ibm.spark.SparkKernel.getClass.getProtectionDomain
- .getCodeSource.getLocation.getPath
+ val m = spyComponentInitialization
+ .initializeInterpreterPlugins(mockKernel, conf)
- spyComponentInitialization.updateInterpreterWithSparkContext(
- mockConfig, mockSparkContext, mockInterpreter)
- verify(mockSparkContext).addJar(expected)
+ m.isEmpty shouldBe true
}
}
}
- */
}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/708180ad/kernel/src/test/scala/test/utils/DummyInterpreter.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/test/utils/DummyInterpreter.scala b/kernel/src/test/scala/test/utils/DummyInterpreter.scala
new file mode 100644
index 0000000..1ab8cac
--- /dev/null
+++ b/kernel/src/test/scala/test/utils/DummyInterpreter.scala
@@ -0,0 +1,105 @@
+package test.utils
+
+import java.net.URL
+
+import com.ibm.spark.interpreter.{ExecuteFailure, ExecuteOutput, Interpreter}
+import com.ibm.spark.interpreter.Results.Result
+import com.ibm.spark.kernel.api.KernelLike
+
+import scala.tools.nsc.interpreter.{OutputStream, InputStream}
+
+class DummyInterpreter(kernel: KernelLike) extends Interpreter {
+ /**
+ * Starts the interpreter, initializing any internal state.
+ * @return A reference to the interpreter
+ */
+ override def start(): Interpreter = ???
+
+ /**
+ * Executes body and will not print anything to the console during the execution
+ * @param body The function to execute
+ * @tparam T The return type of body
+ * @return The return value of body
+ */
+ override def doQuietly[T](body: => T): T = ???
+
+ /**
+ * Stops the interpreter, removing any previous internal state.
+ * @return A reference to the interpreter
+ */
+ override def stop(): Interpreter = ???
+
+ /**
+ * Adds external jars to the internal classpaths of the interpreter.
+ * @param jars The list of jar locations
+ */
+ override def addJars(jars: URL*): Unit = ???
+
+ /**
+ * @return Returns a string to reference the URI of where the interpreted class files are created
+ */
+ override def classServerURI: String = ???
+
+ /**
+ * Returns the name of the variable created from the last execution.
+ * @return Some String name if a variable was created, otherwise None
+ */
+ override def lastExecutionVariableName: Option[String] = ???
+
+ /**
+ * Mask the Console and System objects with our wrapper implementations
+ * and dump the Console methods into the public namespace (similar to
+ * the Predef approach).
+ * @param in The new input stream
+ * @param out The new output stream
+ * @param err The new error stream
+ */
+ override def updatePrintStreams(in: InputStream, out: OutputStream, err: OutputStream): Unit = ???
+
+ /**
+ * Returns the class loader used by this interpreter.
+ * @return The runtime class loader used by this interpreter
+ */
+ override def classLoader: ClassLoader = ???
+
+ /**
+ * Retrieves the contents of the variable with the provided name from the
+ * interpreter.
+ * @param variableName The name of the variable whose contents to read
+ * @return An option containing the variable contents or None if the
+ * variable does not exist
+ */
+ override def read(variableName: String): Option[AnyRef] = ???
+
+ /**
+ * Interrupts the current code being interpreted.
+ * @return A reference to the interpreter
+ */
+ override def interrupt(): Interpreter = ???
+
+ /**
+ * Binds a variable in the interpreter to a value.
+ * @param variableName The name to expose the value in the interpreter
+ * @param typeName The type of the variable, must be the fully qualified class name
+ * @param value The value of the variable binding
+ * @param modifiers Any annotation, scoping modifiers, etc on the variable
+ */
+ override def bind(variableName: String, typeName: String, value: Any, modifiers: List[String]): Unit = ???
+
+ /**
+ * Executes the provided code with the option to silence output.
+ * @param code The code to execute
+ * @param silent Whether or not to execute the code silently (no output)
+ * @return The success/failure of the interpretation and the output from the
+ * execution or the failure
+ */
+ override def interpret(code: String, silent: Boolean): (Result, Either[ExecuteOutput, ExecuteFailure]) = ???
+
+ /**
+ * Attempts to perform code completion via the <TAB> command.
+ * @param code The current cell to complete
+ * @param pos The cursor position
+ * @return The cursor position and list of possible completions
+ */
+ override def completion(code: String, pos: Int): (Int, List[String]) = ???
+}