You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lr...@apache.org on 2016/01/11 22:01:44 UTC

[02/50] [abbrv] incubator-toree git commit: Begin Interpreter Plugin

Begin Interpreter Plugin


Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/708180ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/708180ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/708180ad

Branch: refs/heads/master
Commit: 708180ad0d7dc10eda8ce3d4ab199ea8a8f67952
Parents: 9db161f
Author: Brian Burns <bb...@us.ibm.com>
Authored: Wed Nov 4 13:44:22 2015 -0500
Committer: Brian Burns <bb...@us.ibm.com>
Committed: Wed Nov 4 13:44:22 2015 -0500

----------------------------------------------------------------------
 .../com/ibm/spark/boot/CommandLineOptions.scala |  11 +-
 .../boot/layer/ComponentInitialization.scala    | 187 ++++---------------
 .../StandardComponentInitializationSpec.scala   |  76 +++-----
 .../scala/test/utils/DummyInterpreter.scala     | 105 +++++++++++
 4 files changed, 173 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/708180ad/kernel/src/main/scala/com/ibm/spark/boot/CommandLineOptions.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/boot/CommandLineOptions.scala b/kernel/src/main/scala/com/ibm/spark/boot/CommandLineOptions.scala
index 069cab7..27c5c68 100644
--- a/kernel/src/main/scala/com/ibm/spark/boot/CommandLineOptions.scala
+++ b/kernel/src/main/scala/com/ibm/spark/boot/CommandLineOptions.scala
@@ -96,6 +96,9 @@ class CommandLineOptions(args: Seq[String]) {
   private val _nosparkcontext =
     parser.accepts("nosparkcontext", "kernel should not create a spark context")
 
+  private val _plugins = parser.accepts(
+    "interpreter-plugin"
+  ).withRequiredArg().ofType(classOf[String])
 
   private val options = parser.parse(args.map(_.trim): _*)
 
@@ -152,7 +155,8 @@ class CommandLineOptions(args: Seq[String]) {
       "max_interpreter_threads" -> get(_max_interpreter_threads),
       "jar_dir" -> get(_jar_dir),
       "default_interpreter" -> get(_default_interpreter),
-      "nosparkcontext" -> (if (has(_nosparkcontext)) Some(true) else Some(false))
+      "nosparkcontext" -> (if (has(_nosparkcontext)) Some(true) else Some(false)),
+      "interpreter_plugins" -> interpreterPlugins
     ).flatMap(removeEmptyOptions).asInstanceOf[Map[String, AnyRef]].asJava)
 
     commandLineConfig.withFallback(profileConfig).withFallback(ConfigFactory.load)
@@ -173,6 +177,11 @@ class CommandLineOptions(args: Seq[String]) {
     }
   }
 
+  private def interpreterPlugins: Option[java.util.List[String]] = {
+    val p = getAll(_plugins)
+    p.map(_.asJava)
+  }
+
   /**
    * Prints the help message to the output stream provided.
    * @param out The output stream to direct the help message

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/708180ad/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala b/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala
index 8ce6360..55b133d 100644
--- a/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala
+++ b/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala
@@ -16,6 +16,7 @@
 
 package com.ibm.spark.boot.layer
 
+import java.util
 import java.util.concurrent.ConcurrentHashMap
 
 import akka.actor.ActorRef
@@ -23,7 +24,7 @@ import com.ibm.spark.comm.{CommManager, KernelCommManager, CommRegistrar, CommSt
 import com.ibm.spark.dependencies.{DependencyDownloader, IvyDependencyDownloader}
 import com.ibm.spark.global
 import com.ibm.spark.interpreter._
-import com.ibm.spark.kernel.api.Kernel
+import com.ibm.spark.kernel.api.{KernelLike, Kernel}
 import com.ibm.spark.kernel.interpreter.pyspark.PySparkInterpreter
 import com.ibm.spark.kernel.interpreter.sparkr.SparkRInterpreter
 import com.ibm.spark.kernel.interpreter.scala.{TaskManagerProducerLike, StandardSparkIMainProducer, StandardSettingsProducer, ScalaInterpreter}
@@ -82,13 +83,6 @@ trait StandardComponentInitialization extends ComponentInitialization {
       initializeCommObjects(actorLoader)
     val interpreter = initializeInterpreter(config)
 
-    //val sparkContext = null
-    //val sparkContext = initializeSparkContext(
-    //  config, appName, actorLoader, interpreter)
-    //val sqlContext = null
-    //val sqlContext = initializeSqlContext(sparkContext)
-    //updateInterpreterWithSqlContext(sqlContext, interpreter)
-
     val dependencyDownloader = initializeDependencyDownloader(config)
     val magicLoader = initializeMagicLoader(
       config, interpreter, dependencyDownloader)
@@ -113,6 +107,11 @@ trait StandardComponentInitialization extends ComponentInitialization {
     //sqlInterpreter.start()
     kernel.data.put("SQL", sqlInterpreter)
 
+
+    val plugins = initializeInterpreterPlugins(kernel, config)
+
+    kernel.data.putAll(plugins.asJava)
+
     // Add Scala to available data map
     kernel.data.put("Scala", interpreter)
     val defaultInterpreter: Interpreter =
@@ -129,6 +128,8 @@ trait StandardComponentInitialization extends ComponentInitialization {
         case "sql" =>
           logger.info("Using SQL interpreter as default!")
           sqlInterpreter
+        case p if(kernel.data.containsKey(p)) =>
+          kernel.data.get(p).asInstanceOf[Interpreter]
         case unknown =>
           logger.warn(s"Unknown interpreter '$unknown'! Defaulting to Scala!")
           interpreter
@@ -141,6 +142,32 @@ trait StandardComponentInitialization extends ComponentInitialization {
 
   }
 
+  def initializeInterpreterPlugins(
+    kernel: KernelLike,
+    config: Config
+  ): Map[String, Interpreter] = {
+    val p = config
+      .getStringList("interpreter_plugins")
+      .listIterator().asScala
+
+    p.foldLeft(Map[String, Interpreter]())( (acc, v) => {
+      v.split(":") match {
+        case Array(name, className) =>
+          try {
+            acc + (name -> Class
+              .forName(className)
+              .getConstructor(classOf[KernelLike])
+              .newInstance(kernel)
+              .asInstanceOf[Interpreter])
+          }
+          catch {
+            case _:Throwable => acc
+          }
+        case _ => acc
+      }
+    })
+  }
+
   def initializeSparkContext(config:Config, kernel:Kernel, appName:String) = {
     if(!config.getBoolean("nosparkcontext")) {
       kernel.createSparkContext(config.getString("spark.master"), appName)
@@ -190,150 +217,6 @@ trait StandardComponentInitialization extends ComponentInitialization {
     interpreter
   }
 
-  // TODO: Think of a better way to test without exposing this
-  /*
-  protected[layer] def initializeSparkContext(
-    config: Config, appName: String, actorLoader: ActorLoader,
-    interpreter: Interpreter
-  ) = {
-    logger.debug("Creating Spark Configuration")
-    val conf = new SparkConf()
-
-    val master = config.getString("spark.master")
-    logger.info("Using " + master + " as Spark Master")
-    conf.setMaster(master)
-
-    logger.info("Setting deployMode to client")
-    conf.set("spark.submit.deployMode", "client")
-
-    logger.info("Using " + appName + " as Spark application name")
-    conf.setAppName(appName)
-
-    KeyValuePairUtils.stringToKeyValuePairSeq(
-      config.getString("spark_configuration")
-    ).foreach { keyValuePair =>
-      logger.info(s"Setting ${keyValuePair.key} to ${keyValuePair.value}")
-      Try(conf.set(keyValuePair.key, keyValuePair.value))
-    }
-
-    // TODO: Move SparkIMain to private and insert in a different way
-    logger.warn("Locked to Scala interpreter with SparkIMain until decoupled!")
-
-    // TODO: Construct class server outside of SparkIMain
-    logger.warn("Unable to control initialization of REPL class server!")
-    logger.info("REPL Class Server Uri: " + interpreter.classServerURI)
-    conf.set("spark.repl.class.uri", interpreter.classServerURI)
-
-    val sparkContext = reallyInitializeSparkContext(
-      config, actorLoader, KMBuilder(), conf
-    )
-
-    updateInterpreterWithSparkContext(
-      config, sparkContext, interpreter)
-
-    sparkContext
-  }
-
-  // TODO: Think of a better way to test without exposing this
-  protected[layer] def reallyInitializeSparkContext(
-    config: Config, actorLoader: ActorLoader, kmBuilder: KMBuilder,
-    sparkConf: SparkConf
-  ): SparkContext = {
-    logger.debug("Constructing new Spark Context")
-    // TODO: Inject stream redirect headers in Spark dynamically
-    var sparkContext: SparkContext = null
-    val outStream = new KernelOutputStream(
-      actorLoader, KMBuilder(), global.ScheduledTaskManager.instance,
-      sendEmptyOutput = config.getBoolean("send_empty_output")
-    )
-
-    // Update global stream state and use it to set the Console local variables
-    // for threads in the Spark threadpool
-    global.StreamState.setStreams(System.in, outStream, outStream)
-    global.StreamState.withStreams {
-      sparkContext = new SparkContext(sparkConf)
-    }
-
-    sparkContext
-  }
-
-  // TODO: Think of a better way to test without exposing this
-  protected[layer] def updateInterpreterWithSparkContext(
-    config: Config, sparkContext: SparkContext, interpreter: Interpreter
-  ) = {
-    interpreter.doQuietly {
-      logger.debug("Binding context into interpreter")
-      interpreter.bind(
-        "sc", "org.apache.spark.SparkContext",
-        sparkContext, List( """@transient"""))
-
-      // NOTE: This is needed because interpreter blows up after adding
-      //       dependencies to SparkContext and Interpreter before the
-      //       cluster has been used... not exactly sure why this is the case
-      // TODO: Investigate why the cluster has to be initialized in the kernel
-      //       to avoid the kernel's interpreter blowing up (must be done
-      //       inside the interpreter)
-      logger.debug("Initializing Spark cluster in interpreter")
-
-       interpreter.doQuietly {
-        interpreter.interpret("""
-        | val $toBeNulled = {
-        | var $toBeNulled = sc.emptyRDD.collect()
-        | $toBeNulled = null
-        |  }
-        |
-        |""".stripMargin)
-      }
-    }
-
-    // Add ourselves as a dependency
-    // TODO: Provide ability to point to library as commandline argument
-    // TODO: Provide better method to determine if can add ourselves
-    // TODO: Avoid duplicating request for master twice (initializeSparkContext
-    //       also does this)
-    val master = config.getString("spark.master")
-    // If in local mode, do not need to add our jars as dependencies
-    if (!master.toLowerCase.startsWith("local")) {
-      @inline def getJarPathFor(klass: Class[_]): String =
-        klass.getProtectionDomain.getCodeSource.getLocation.getPath
-
-      // TODO: Provide less hard-coded solution in case additional dependencies
-      //       are added or classes are refactored to different projects
-      val jarPaths = Seq(
-        // Macro project
-        classOf[com.ibm.spark.annotations.Experimental],
-
-        // Protocol project
-        classOf[com.ibm.spark.kernel.protocol.v5.KernelMessage],
-
-        // Communication project
-        classOf[com.ibm.spark.communication.SocketManager],
-
-        // Kernel-api project
-        classOf[com.ibm.spark.kernel.api.KernelLike],
-
-        // Scala-interpreter project
-        classOf[com.ibm.spark.kernel.interpreter.scala.ScalaInterpreter],
-
-        // PySpark-interpreter project
-        classOf[com.ibm.spark.kernel.interpreter.pyspark.PySparkInterpreter],
-
-        // SparkR-interpreter project
-        classOf[com.ibm.spark.kernel.interpreter.sparkr.SparkRInterpreter],
-
-        // Kernel project
-        classOf[com.ibm.spark.boot.KernelBootstrap]
-      ).map(getJarPathFor)
-
-      logger.info("Adding kernel jars to cluster:\n- " +
-        jarPaths.mkString("\n- "))
-      jarPaths.foreach(sparkContext.addJar)
-    } else {
-      logger.info("Running in local mode! Not adding self as dependency!")
-    }
-  }
-  */
-
   protected[layer] def initializeSqlContext(sparkContext: SparkContext) = {
     val sqlContext: SQLContext = try {
       logger.info("Attempting to create Hive Context")

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/708180ad/kernel/src/test/scala/com/ibm/spark/boot/layer/StandardComponentInitializationSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/com/ibm/spark/boot/layer/StandardComponentInitializationSpec.scala b/kernel/src/test/scala/com/ibm/spark/boot/layer/StandardComponentInitializationSpec.scala
index f6a9235..68ee0cc 100644
--- a/kernel/src/test/scala/com/ibm/spark/boot/layer/StandardComponentInitializationSpec.scala
+++ b/kernel/src/test/scala/com/ibm/spark/boot/layer/StandardComponentInitializationSpec.scala
@@ -16,8 +16,9 @@
 
 package com.ibm.spark.boot.layer
 
-import com.ibm.spark.boot.KernelBootstrap
+import com.ibm.spark.boot.{CommandLineOptions, KernelBootstrap}
 import com.ibm.spark.interpreter.Interpreter
+import com.ibm.spark.kernel.api.KernelLike
 import com.ibm.spark.kernel.protocol.v5.KMBuilder
 import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
 import com.ibm.spark.utils.LogLike
@@ -29,6 +30,8 @@ import org.mockito.Mockito._
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{BeforeAndAfter, FunSpec, Matchers}
 
+import scala.collection.mutable
+import scala.collection.JavaConverters._
 class StandardComponentInitializationSpec extends FunSpec with Matchers
   with MockitoSugar with BeforeAndAfter
 {
@@ -38,6 +41,7 @@ class StandardComponentInitializationSpec extends FunSpec with Matchers
   private var mockActorLoader: ActorLoader = _
   private var mockSparkContext: SparkContext = _
   private var mockInterpreter: Interpreter = _
+  private var mockKernel: KernelLike = _
   private var spyComponentInitialization: StandardComponentInitialization = _
 
   private class TestComponentInitialization
@@ -48,67 +52,33 @@ class StandardComponentInitializationSpec extends FunSpec with Matchers
     mockActorLoader = mock[ActorLoader]
     mockSparkContext = mock[SparkContext]
     mockInterpreter = mock[Interpreter]
+    mockKernel = mock[KernelLike]
 
     spyComponentInitialization = spy(new TestComponentInitialization())
   }
 
-  /*
   describe("StandardComponentInitialization") {
-    describe("when spark.master is set in config") {
-      it("should set spark.master in SparkConf") {
-        val expected = "some value"
-        doReturn(expected).when(mockConfig).getString("spark.master")
-        doReturn("").when(mockConfig).getString("spark_configuration")
-
-        // Stub out other helper methods to avoid long init process and to
-        // avoid failure when creating SparkContext
-        doReturn(mockSparkContext).when(spyComponentInitialization)
-          .reallyInitializeSparkContext(
-            any[Config], any[ActorLoader], any[KMBuilder], any[SparkConf])
-        doNothing().when(spyComponentInitialization)
-          .updateInterpreterWithSparkContext(
-            any[Config], any[SparkContext], any[Interpreter])
-
-        // Provide stub for interpreter classServerURI since also executed
-        doReturn("").when(mockInterpreter).classServerURI
-
-        val sparkContext = spyComponentInitialization.initializeSparkContext(
-          mockConfig, TestAppName, mockActorLoader, mockInterpreter)
-
-        val sparkConf = {
-          val sparkConfCaptor = ArgumentCaptor.forClass(classOf[SparkConf])
-          verify(spyComponentInitialization).reallyInitializeSparkContext(
-            any[Config], any[ActorLoader], any[KMBuilder],
-            sparkConfCaptor.capture()
-          )
-          sparkConfCaptor.getValue
-        }
-
-        sparkConf.get("spark.master") should be (expected)
+    describe("#initializeInterpreterPlugins") {
+      it("should return a map with the DummyInterpreter") {
+        val conf = new CommandLineOptions(List(
+          "--interpreter-plugin", "dummy:test.utils.DummyInterpreter",
+          "--interpreter-plugin", "dummy2:test.utils.DummyInterpreter"
+        )).toConfig
+
+        val m = spyComponentInitialization
+          .initializeInterpreterPlugins(mockKernel, conf)
+
+        m.get("dummy") should not be None
+        m.get("dummy2") should not be None
       }
+      it("should return an empty map") {
+        val conf = new CommandLineOptions(List()).toConfig
 
-      it("should not add ourselves as a jar if spark.master is not local") {
-        doReturn("local[*]").when(mockConfig).getString("spark.master")
-
-        spyComponentInitialization.updateInterpreterWithSparkContext(
-          mockConfig, mockSparkContext, mockInterpreter)
-        verify(mockSparkContext, never()).addJar(anyString())
-      }
-
-      it("should add ourselves as a jar if spark.master is not local") {
-        doReturn("notlocal").when(mockConfig).getString("spark.master")
-
-        // TODO: This is going to be outdated when we determine a way to
-        //       re-include all jars
-        val expected =
-          com.ibm.spark.SparkKernel.getClass.getProtectionDomain
-            .getCodeSource.getLocation.getPath
+        val m = spyComponentInitialization
+          .initializeInterpreterPlugins(mockKernel, conf)
 
-        spyComponentInitialization.updateInterpreterWithSparkContext(
-          mockConfig, mockSparkContext, mockInterpreter)
-        verify(mockSparkContext).addJar(expected)
+        m.isEmpty shouldBe true
       }
     }
   }
-  */
 }

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/708180ad/kernel/src/test/scala/test/utils/DummyInterpreter.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/test/utils/DummyInterpreter.scala b/kernel/src/test/scala/test/utils/DummyInterpreter.scala
new file mode 100644
index 0000000..1ab8cac
--- /dev/null
+++ b/kernel/src/test/scala/test/utils/DummyInterpreter.scala
@@ -0,0 +1,105 @@
+package test.utils
+
+import java.net.URL
+
+import com.ibm.spark.interpreter.{ExecuteFailure, ExecuteOutput, Interpreter}
+import com.ibm.spark.interpreter.Results.Result
+import com.ibm.spark.kernel.api.KernelLike
+
+import scala.tools.nsc.interpreter.{OutputStream, InputStream}
+
+class DummyInterpreter(kernel: KernelLike) extends Interpreter {
+  /**
+   * Starts the interpreter, initializing any internal state.
+   * @return A reference to the interpreter
+   */
+  override def start(): Interpreter = ???
+
+  /**
+   * Executes body and will not print anything to the console during the execution
+   * @param body The function to execute
+   * @tparam T The return type of body
+   * @return The return value of body
+   */
+  override def doQuietly[T](body: => T): T = ???
+
+  /**
+   * Stops the interpreter, removing any previous internal state.
+   * @return A reference to the interpreter
+   */
+  override def stop(): Interpreter = ???
+
+  /**
+   * Adds external jars to the internal classpaths of the interpreter.
+   * @param jars The list of jar locations
+   */
+  override def addJars(jars: URL*): Unit = ???
+
+  /**
+   * @return Returns a string to reference the URI of where the interpreted class files are created
+   */
+  override def classServerURI: String = ???
+
+  /**
+   * Returns the name of the variable created from the last execution.
+   * @return Some String name if a variable was created, otherwise None
+   */
+  override def lastExecutionVariableName: Option[String] = ???
+
+  /**
+   * Mask the Console and System objects with our wrapper implementations
+   * and dump the Console methods into the public namespace (similar to
+   * the Predef approach).
+   * @param in The new input stream
+   * @param out The new output stream
+   * @param err The new error stream
+   */
+  override def updatePrintStreams(in: InputStream, out: OutputStream, err: OutputStream): Unit = ???
+
+  /**
+   * Returns the class loader used by this interpreter.
+   * @return The runtime class loader used by this interpreter
+   */
+  override def classLoader: ClassLoader = ???
+
+  /**
+   * Retrieves the contents of the variable with the provided name from the
+   * interpreter.
+   * @param variableName The name of the variable whose contents to read
+   * @return An option containing the variable contents or None if the
+   *         variable does not exist
+   */
+  override def read(variableName: String): Option[AnyRef] = ???
+
+  /**
+   * Interrupts the current code being interpreted.
+   * @return A reference to the interpreter
+   */
+  override def interrupt(): Interpreter = ???
+
+  /**
+   * Binds a variable in the interpreter to a value.
+   * @param variableName The name to expose the value in the interpreter
+   * @param typeName The type of the variable, must be the fully qualified class name
+   * @param value The value of the variable binding
+   * @param modifiers Any annotation, scoping modifiers, etc on the variable
+   */
+  override def bind(variableName: String, typeName: String, value: Any, modifiers: List[String]): Unit = ???
+
+  /**
+   * Executes the provided code with the option to silence output.
+   * @param code The code to execute
+   * @param silent Whether or not to execute the code silently (no output)
+   * @return The success/failure of the interpretation and the output from the
+   *         execution or the failure
+   */
+  override def interpret(code: String, silent: Boolean): (Result, Either[ExecuteOutput, ExecuteFailure]) = ???
+
+  /**
+   * Attempts to perform code completion via the <TAB> command.
+   * @param code The current cell to complete
+   * @param pos The cursor position
+   * @return The cursor position and list of possible completions
+   */
+  override def completion(code: String, pos: Int): (Int, List[String]) = ???
+}