You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lb...@apache.org on 2016/01/22 23:07:10 UTC
[02/51] [abbrv] incubator-toree git commit: Moved scala files to new
locations based on new package
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/ReflectiveRBackend.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/ReflectiveRBackend.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/ReflectiveRBackend.scala
deleted file mode 100644
index 81cb86e..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/ReflectiveRBackend.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sparkr
-
-/**
- * Provides reflective access into the backend R component that is not
- * publically accessible.
- */
-class ReflectiveRBackend {
- private val rBackendClass = Class.forName("org.apache.spark.api.r.RBackend")
- private val rBackendInstance = rBackendClass.newInstance()
-
- /**
- * Initializes the underlying RBackend service.
- *
- * @return The port used by the service
- */
- def init(): Int = {
- val runMethod = rBackendClass.getDeclaredMethod("init")
-
- runMethod.invoke(rBackendInstance).asInstanceOf[Int]
- }
-
- /** Blocks until the service has finished. */
- def run(): Unit = {
- val runMethod = rBackendClass.getDeclaredMethod("run")
-
- runMethod.invoke(rBackendInstance)
- }
-
- /** Closes the underlying RBackend service. */
- def close(): Unit = {
- val runMethod = rBackendClass.getDeclaredMethod("close")
-
- runMethod.invoke(rBackendInstance)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRBridge.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRBridge.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRBridge.scala
deleted file mode 100644
index 44fa203..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRBridge.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sparkr
-
-import com.ibm.spark.interpreter.broker.producer.{StandardSQLContextProducer, StandardJavaSparkContextProducer, JavaSparkContextProducerLike, SQLContextProducerLike}
-import com.ibm.spark.interpreter.broker.{BrokerState, BrokerBridge}
-import com.ibm.spark.kernel.api.KernelLike
-import org.apache.spark.SparkContext
-
-/**
- * Represents constants for the SparkR bridge.
- */
-object SparkRBridge {
- /** Represents the maximum amount of code that can be queued for Python. */
- val MaxQueuedCode = 500
-
- /** Contains the bridge used by the current R process. */
- @volatile private var _sparkRBridge: Option[SparkRBridge] = None
-
- /** Allows kernel to set bridge dynamically. */
- private[sparkr] def sparkRBridge_=(newSparkRBridge: SparkRBridge): Unit = {
- _sparkRBridge = Some(newSparkRBridge)
- }
-
- /** Clears the bridge currently hosted statically. */
- private[sparkr] def reset(): Unit = _sparkRBridge = None
-
- /** Must be exposed in a static location for RBackend to access. */
- def sparkRBridge: SparkRBridge = {
- assert(_sparkRBridge.nonEmpty, "SparkRBridge has not been initialized!")
- _sparkRBridge.get
- }
-
- /**
- * Creates a new SparkRBridge instance.
- *
- * @param brokerState The container of broker state to expose
- * @param kernel The kernel API to expose through the bridge
- *
- * @return The new SparkR bridge
- */
- def apply(
- brokerState: BrokerState,
- kernel: KernelLike
- ): SparkRBridge = {
- new SparkRBridge(
- _brokerState = brokerState,
- _kernel = kernel
- ) with StandardJavaSparkContextProducer with StandardSQLContextProducer
- }
-}
-
-/**
- * Represents the API available to SparkR to act as the bridge for data
- * between the JVM and R.
- *
- * @param _brokerState The container of broker state to expose
- * @param _kernel The kernel API to expose through the bridge
- */
-class SparkRBridge private (
- private val _brokerState: BrokerState,
- private val _kernel: KernelLike
-) extends BrokerBridge(_brokerState, _kernel) {
- override val brokerName: String = "SparkR"
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRException.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRException.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRException.scala
deleted file mode 100644
index 0be8f61..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRException.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sparkr
-
-import com.ibm.spark.interpreter.broker.BrokerException
-
-/**
- * Represents a generic SparkR exception.
- *
- * @param message The message to associate with the exception
- */
-class SparkRException(message: String) extends BrokerException(message)
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRInterpreter.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRInterpreter.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRInterpreter.scala
deleted file mode 100644
index 45fe03c..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRInterpreter.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sparkr
-
-import java.net.URL
-
-import com.ibm.spark.interpreter.Results.Result
-import com.ibm.spark.interpreter._
-import com.ibm.spark.kernel.api.KernelLike
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
-import org.slf4j.LoggerFactory
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-import scala.tools.nsc.interpreter.{InputStream, OutputStream}
-
-/**
- * Represents an interpreter interface to SparkR. Requires a properly-set
- * SPARK_HOME pointing to a binary distribution (needs packaged SparkR library)
- * and an implementation of R on the path.
- *
- */
-class SparkRInterpreter(
-) extends Interpreter {
- private val logger = LoggerFactory.getLogger(this.getClass)
- private var _kernel: KernelLike = _
-
- // TODO: Replace hard-coded maximum queue count
- /** Represents the state used by this interpreter's R instance. */
- private lazy val sparkRState = new SparkRState(500)
-
- /** Represents the bridge used by this interpreter's R instance. */
- private lazy val sparkRBridge = SparkRBridge(
- sparkRState,
- _kernel
- )
-
- /** Represents the interface for R to talk to JVM Spark components. */
- private lazy val rBackend = new ReflectiveRBackend
-
- /** Represents the process handler used for the SparkR process. */
- private lazy val sparkRProcessHandler: SparkRProcessHandler =
- new SparkRProcessHandler(
- sparkRBridge,
- restartOnFailure = true,
- restartOnCompletion = true
- )
-
- private lazy val sparkRService = new SparkRService(
- rBackend,
- sparkRBridge,
- sparkRProcessHandler
- )
- private lazy val sparkRTransformer = new SparkRTransformer
-
- override def init(kernel: KernelLike): Interpreter = {
- _kernel = kernel
- this
- }
-
- /**
- * Executes the provided code with the option to silence output.
- * @param code The code to execute
- * @param silent Whether or not to execute the code silently (no output)
- * @return The success/failure of the interpretation and the output from the
- * execution or the failure
- */
- override def interpret(code: String, silent: Boolean):
- (Result, Either[ExecuteOutput, ExecuteFailure]) =
- {
- if (!sparkRService.isRunning) sparkRService.start()
-
- val futureResult = sparkRTransformer.transformToInterpreterResult(
- sparkRService.submitCode(code)
- )
-
- Await.result(futureResult, Duration.Inf)
- }
-
- /**
- * Starts the interpreter, initializing any internal state.
- * @return A reference to the interpreter
- */
- override def start(): Interpreter = {
- sparkRService.start()
-
- this
- }
-
- /**
- * Stops the interpreter, removing any previous internal state.
- * @return A reference to the interpreter
- */
- override def stop(): Interpreter = {
- sparkRService.stop()
-
- this
- }
-
- /**
- * Returns the class loader used by this interpreter.
- *
- * @return The runtime class loader used by this interpreter
- */
- override def classLoader: ClassLoader = this.getClass.getClassLoader
-
- // Unsupported (but can be invoked)
- override def lastExecutionVariableName: Option[String] = None
-
- // Unsupported (but can be invoked)
- override def read(variableName: String): Option[AnyRef] = None
-
- // Unsupported (but can be invoked)
- override def completion(code: String, pos: Int): (Int, List[String]) =
- (pos, Nil)
-
- // Unsupported
- override def updatePrintStreams(in: InputStream, out: OutputStream, err: OutputStream): Unit = ???
-
- // Unsupported
- override def classServerURI: String = ""
-
- // Unsupported (but can be invoked)
- override def bindSparkContext(sparkContext: SparkContext): Unit = {}
-
- // Unsupported (but can be invoked)
- override def bindSqlContext(sqlContext: SQLContext): Unit = {}
-
- // Unsupported
- override def interrupt(): Interpreter = ???
-
- // Unsupported
- override def bind(variableName: String, typeName: String, value: Any, modifiers: List[String]): Unit = ???
-
- // Unsupported
- override def addJars(jars: URL*): Unit = ???
-
- // Unsupported
- override def doQuietly[T](body: => T): T = ???
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcess.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcess.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcess.scala
deleted file mode 100644
index 2429dc4..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcess.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sparkr
-
-import com.ibm.spark.interpreter.broker.BrokerProcess
-import scala.collection.JavaConverters._
-
-/**
- * Represents the R process used to evaluate SparkR code.
- *
- * @param sparkRBridge The bridge to use to retrieve kernel output streams
- * and the Spark version to be verified
- * @param sparkRProcessHandler The handler to use when the process fails or
- * completes
- * @param port The port to provide to the SparkR process to use to connect
- * back to the JVM
- */
-class SparkRProcess(
- private val sparkRBridge: SparkRBridge,
- private val sparkRProcessHandler: SparkRProcessHandler,
- private val port: Int
-) extends BrokerProcess(
- processName = "Rscript",
- entryResource = "kernelR/sparkr_runner.R",
- otherResources = Seq("kernelR/sparkr_runner_utils.R", "sparkr_bundle.tar.gz"),
- brokerBridge = sparkRBridge,
- brokerProcessHandler = sparkRProcessHandler,
- arguments = Seq(
- "--default-packages=datasets,utils,grDevices,graphics,stats,methods"
- )
-) {
- override val brokerName: String = "SparkR"
- private val sparkHome = Option(System.getenv("SPARK_HOME"))
- .orElse(Option(System.getProperty("spark.home")))
-
- assert(sparkHome.nonEmpty, "SparkR process requires Spark Home to be set!")
-
- /**
- * Creates a new process environment to be used for environment variable
- * retrieval by the new process.
- *
- * @return The map of environment variables and their respective values
- */
- override protected def newProcessEnvironment(): Map[String, String] = {
- val baseEnvironment = super.newProcessEnvironment()
-
- // Note: Adding the new map values should override the old ones
- baseEnvironment ++ Map(
- "SPARK_HOME" -> sparkHome.get,
- "EXISTING_SPARKR_BACKEND_PORT" -> port.toString
- )
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcessHandler.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcessHandler.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcessHandler.scala
deleted file mode 100644
index d33d265..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcessHandler.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sparkr
-
-import com.ibm.spark.interpreter.broker.BrokerProcessHandler
-
-/**
- * Represents the handler for events triggered by the SparkR process.
- *
- * @param sparkRBridge The bridge to reset when the process fails or completes
- * @param restartOnFailure If true, restarts the process if it fails
- * @param restartOnCompletion If true, restarts the process if it completes
- */
-class SparkRProcessHandler(
- private val sparkRBridge: SparkRBridge,
- private val restartOnFailure: Boolean,
- private val restartOnCompletion: Boolean
-) extends BrokerProcessHandler(
- sparkRBridge,
- restartOnFailure,
- restartOnCompletion
-) {
- override val brokerName: String = "SparkR"
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRService.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRService.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRService.scala
deleted file mode 100644
index 71731a8..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRService.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sparkr
-
-import java.util.concurrent.{TimeUnit, Semaphore}
-
-import com.ibm.spark.interpreter.broker.BrokerService
-import com.ibm.spark.kernel.api.KernelLike
-import com.ibm.spark.kernel.interpreter.sparkr.SparkRTypes.{Code, CodeResults}
-import org.apache.spark.SparkContext
-import org.slf4j.LoggerFactory
-
-import scala.concurrent.{future, Future}
-
-/**
- * Represents the service that provides the high-level interface between the
- * JVM and R.
- *
- * @param rBackend The backend to start to communicate between the JVM and R
- * @param sparkRBridge The bridge to use for communication between the JVM and R
- * @param sparkRProcessHandler The handler used for events that occur with the
- * SparkR process
- */
-class SparkRService(
- private val rBackend: ReflectiveRBackend,
- private val sparkRBridge: SparkRBridge,
- private val sparkRProcessHandler: SparkRProcessHandler
-) extends BrokerService {
- private val logger = LoggerFactory.getLogger(this.getClass)
- @volatile private var rBackendPort: Int = -1
- @volatile private var _isRunning: Boolean = false
- override def isRunning: Boolean = _isRunning
-
- /** Represents the process used to execute R code via the bridge. */
- private lazy val sparkRProcess: SparkRProcess = {
- val p = new SparkRProcess(
- sparkRBridge,
- sparkRProcessHandler,
- rBackendPort
- )
-
- // Update handlers to correctly reset and restart the process
- sparkRProcessHandler.setResetMethod(message => {
- p.stop()
- sparkRBridge.state.reset(message)
- })
- sparkRProcessHandler.setRestartMethod(() => p.start())
-
- p
- }
-
- /** Starts the SparkR service. */
- override def start(): Unit = {
- logger.debug("Initializing statically-accessible SparkR bridge")
- SparkRBridge.sparkRBridge = sparkRBridge
-
- val initialized = new Semaphore(0)
- import scala.concurrent.ExecutionContext.Implicits.global
- val rBackendRun = future {
- logger.debug("Initializing RBackend")
- rBackendPort = rBackend.init()
- logger.debug(s"RBackend running on port $rBackendPort")
- initialized.release()
- logger.debug("Running RBackend")
- rBackend.run()
- logger.debug("RBackend has finished")
- }
-
- // Wait for backend to start before starting R process to connect
- val backendTimeout =
- sys.env.getOrElse("SPARKR_BACKEND_TIMEOUT", "120").toInt
- if (initialized.tryAcquire(backendTimeout, TimeUnit.SECONDS)) {
- // Start the R process used to execute code
- logger.debug("Launching process to execute R code")
- sparkRProcess.start()
- _isRunning = true
- } else {
- // Unable to initialize, so throw an exception
- throw new SparkRException(
- s"Unable to initialize R backend in $backendTimeout seconds!")
- }
- }
-
- /**
- * Submits code to the SparkR service to be executed and return a result.
- *
- * @param code The code to execute
- *
- * @return The result as a future to eventually return
- */
- override def submitCode(code: Code): Future[CodeResults] = {
- sparkRBridge.state.pushCode(code)
- }
-
- /** Stops the running SparkR service. */
- override def stop(): Unit = {
- // Stop the R process used to execute code
- sparkRProcess.stop()
-
- // Stop the server used as an entrypoint for R
- rBackend.close()
-
- // Clear the bridge
- SparkRBridge.reset()
-
- _isRunning = false
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRState.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRState.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRState.scala
deleted file mode 100644
index 60f67a3..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRState.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sparkr
-
-import com.ibm.spark.interpreter.broker.BrokerState
-
-/**
- * Represents the state structure of SparkR.
- *
- * @param maxQueuedCode The maximum amount of code to support being queued
- * at the same time for SparkR execution
- */
-class SparkRState(private val maxQueuedCode: Int)
- extends BrokerState(maxQueuedCode)
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTransformer.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTransformer.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTransformer.scala
deleted file mode 100644
index 45c44c0..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTransformer.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sparkr
-
-import com.ibm.spark.interpreter.broker.BrokerTransformer
-
-/**
- * Represents the transformer used by SparkR.
- */
-class SparkRTransformer extends BrokerTransformer
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTypes.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTypes.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTypes.scala
deleted file mode 100644
index 11f33c0..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTypes.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sparkr
-
-import com.ibm.spark.interpreter.broker.BrokerTypesProvider
-
-/**
- * Represents all types associated with the SparkR interface.
- */
-object SparkRTypes extends BrokerTypesProvider
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/package.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/package.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/package.scala
deleted file mode 100644
index 872d376..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/package.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter
-
-import com.ibm.spark.interpreter.broker.{BrokerCode, BrokerPromise}
-
-/**
- * Contains aliases to broker types.
- */
-package object sparkr {
- /**
- * Represents a promise made regarding the completion of SparkR code
- * execution.
- */
- type SparkRPromise = BrokerPromise
-
- /**
- * Represents a block of SparkR code to be evaluated.
- */
- type SparkRCode = BrokerCode
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/magic/builtin/SparkR.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/magic/builtin/SparkR.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/magic/builtin/SparkR.scala
deleted file mode 100644
index 7eba136..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/magic/builtin/SparkR.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.magic.builtin
-
-import com.ibm.spark.interpreter.{ExecuteError, ExecuteAborted}
-import com.ibm.spark.kernel.interpreter.sparkr.{SparkRInterpreter, SparkRException}
-import com.ibm.spark.kernel.protocol.v5.MIMEType
-import com.ibm.spark.magic.{CellMagicOutput, CellMagic}
-import com.ibm.spark.magic.dependencies.IncludeKernel
-
-/**
- * Represents the magic interface to use the SparkR interpreter.
- */
-class SparkR extends CellMagic with IncludeKernel {
- override def execute(code: String): CellMagicOutput = {
- val sparkR = kernel.interpreter("SparkR")
-
- if (sparkR.isEmpty || sparkR.get == null)
- throw new SparkRException("SparkR is not available!")
-
- sparkR.get match {
- case sparkRInterpreter: SparkRInterpreter =>
- val (_, output) = sparkRInterpreter.interpret(code)
- output match {
- case Left(executeOutput) =>
- CellMagicOutput(MIMEType.PlainText -> executeOutput)
- case Right(executeFailure) => executeFailure match {
- case executeAborted: ExecuteAborted =>
- throw new SparkRException("SparkR code was aborted!")
- case executeError: ExecuteError =>
- throw new SparkRException(executeError.value)
- }
- }
- case otherInterpreter =>
- val className = otherInterpreter.getClass.getName
- throw new SparkRException(s"Invalid SparkR interpreter: $className")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/ReflectiveRBackend.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/ReflectiveRBackend.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/ReflectiveRBackend.scala
new file mode 100644
index 0000000..81cb86e
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/ReflectiveRBackend.scala
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sparkr
+
+/**
+ * Provides reflective access into the backend R component that is not
+ * publically accessible.
+ */
+class ReflectiveRBackend {
+ private val rBackendClass = Class.forName("org.apache.spark.api.r.RBackend")
+ private val rBackendInstance = rBackendClass.newInstance()
+
+ /**
+ * Initializes the underlying RBackend service.
+ *
+ * @return The port used by the service
+ */
+ def init(): Int = {
+ val runMethod = rBackendClass.getDeclaredMethod("init")
+
+ runMethod.invoke(rBackendInstance).asInstanceOf[Int]
+ }
+
+ /** Blocks until the service has finished. */
+ def run(): Unit = {
+ val runMethod = rBackendClass.getDeclaredMethod("run")
+
+ runMethod.invoke(rBackendInstance)
+ }
+
+ /** Closes the underlying RBackend service. */
+ def close(): Unit = {
+ val runMethod = rBackendClass.getDeclaredMethod("close")
+
+ runMethod.invoke(rBackendInstance)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRBridge.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRBridge.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRBridge.scala
new file mode 100644
index 0000000..44fa203
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRBridge.scala
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sparkr
+
+import com.ibm.spark.interpreter.broker.producer.{StandardSQLContextProducer, StandardJavaSparkContextProducer, JavaSparkContextProducerLike, SQLContextProducerLike}
+import com.ibm.spark.interpreter.broker.{BrokerState, BrokerBridge}
+import com.ibm.spark.kernel.api.KernelLike
+import org.apache.spark.SparkContext
+
+/**
+ * Represents constants for the SparkR bridge.
+ */
+object SparkRBridge {
+ /** Represents the maximum amount of code that can be queued for Python. */
+ val MaxQueuedCode = 500
+
+ /** Contains the bridge used by the current R process. */
+ @volatile private var _sparkRBridge: Option[SparkRBridge] = None
+
+ /** Allows kernel to set bridge dynamically. */
+ private[sparkr] def sparkRBridge_=(newSparkRBridge: SparkRBridge): Unit = {
+ _sparkRBridge = Some(newSparkRBridge)
+ }
+
+ /** Clears the bridge currently hosted statically. */
+ private[sparkr] def reset(): Unit = _sparkRBridge = None
+
+ /** Must be exposed in a static location for RBackend to access. */
+ def sparkRBridge: SparkRBridge = {
+ assert(_sparkRBridge.nonEmpty, "SparkRBridge has not been initialized!")
+ _sparkRBridge.get
+ }
+
+ /**
+ * Creates a new SparkRBridge instance.
+ *
+ * @param brokerState The container of broker state to expose
+ * @param kernel The kernel API to expose through the bridge
+ *
+ * @return The new SparkR bridge
+ */
+ def apply(
+ brokerState: BrokerState,
+ kernel: KernelLike
+ ): SparkRBridge = {
+ new SparkRBridge(
+ _brokerState = brokerState,
+ _kernel = kernel
+ ) with StandardJavaSparkContextProducer with StandardSQLContextProducer
+ }
+}
+
+/**
+ * Represents the API available to SparkR to act as the bridge for data
+ * between the JVM and R.
+ *
+ * @param _brokerState The container of broker state to expose
+ * @param _kernel The kernel API to expose through the bridge
+ */
+class SparkRBridge private (
+ private val _brokerState: BrokerState,
+ private val _kernel: KernelLike
+) extends BrokerBridge(_brokerState, _kernel) {
+ override val brokerName: String = "SparkR"
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRException.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRException.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRException.scala
new file mode 100644
index 0000000..0be8f61
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRException.scala
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sparkr
+
+import com.ibm.spark.interpreter.broker.BrokerException
+
+/**
+ * Represents a generic SparkR exception.
+ *
+ * @param message The message to associate with the exception
+ */
+class SparkRException(message: String) extends BrokerException(message)
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
new file mode 100644
index 0000000..45fe03c
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sparkr
+
+import java.net.URL
+
+import com.ibm.spark.interpreter.Results.Result
+import com.ibm.spark.interpreter._
+import com.ibm.spark.kernel.api.KernelLike
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.slf4j.LoggerFactory
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.tools.nsc.interpreter.{InputStream, OutputStream}
+
+/**
+ * Represents an interpreter interface to SparkR. Requires a properly-set
+ * SPARK_HOME pointing to a binary distribution (needs packaged SparkR library)
+ * and an implementation of R on the path.
+ *
+ */
+class SparkRInterpreter(
+) extends Interpreter {
+ private val logger = LoggerFactory.getLogger(this.getClass)
+ private var _kernel: KernelLike = _
+
+ // TODO: Replace hard-coded maximum queue count
+ /** Represents the state used by this interpreter's R instance. */
+ private lazy val sparkRState = new SparkRState(500)
+
+ /** Represents the bridge used by this interpreter's R instance. */
+ private lazy val sparkRBridge = SparkRBridge(
+ sparkRState,
+ _kernel
+ )
+
+ /** Represents the interface for R to talk to JVM Spark components. */
+ private lazy val rBackend = new ReflectiveRBackend
+
+ /** Represents the process handler used for the SparkR process. */
+ private lazy val sparkRProcessHandler: SparkRProcessHandler =
+ new SparkRProcessHandler(
+ sparkRBridge,
+ restartOnFailure = true,
+ restartOnCompletion = true
+ )
+
+ private lazy val sparkRService = new SparkRService(
+ rBackend,
+ sparkRBridge,
+ sparkRProcessHandler
+ )
+ private lazy val sparkRTransformer = new SparkRTransformer
+
+ override def init(kernel: KernelLike): Interpreter = {
+ _kernel = kernel
+ this
+ }
+
+ /**
+ * Executes the provided code with the option to silence output.
+ * @param code The code to execute
+ * @param silent Whether or not to execute the code silently (no output)
+ * @return The success/failure of the interpretation and the output from the
+ * execution or the failure
+ */
+ override def interpret(code: String, silent: Boolean):
+ (Result, Either[ExecuteOutput, ExecuteFailure]) =
+ {
+ if (!sparkRService.isRunning) sparkRService.start()
+
+ val futureResult = sparkRTransformer.transformToInterpreterResult(
+ sparkRService.submitCode(code)
+ )
+
+ Await.result(futureResult, Duration.Inf)
+ }
+
+ /**
+ * Starts the interpreter, initializing any internal state.
+ * @return A reference to the interpreter
+ */
+ override def start(): Interpreter = {
+ sparkRService.start()
+
+ this
+ }
+
+ /**
+ * Stops the interpreter, removing any previous internal state.
+ * @return A reference to the interpreter
+ */
+ override def stop(): Interpreter = {
+ sparkRService.stop()
+
+ this
+ }
+
+ /**
+ * Returns the class loader used by this interpreter.
+ *
+ * @return The runtime class loader used by this interpreter
+ */
+ override def classLoader: ClassLoader = this.getClass.getClassLoader
+
+ // Unsupported (but can be invoked)
+ override def lastExecutionVariableName: Option[String] = None
+
+ // Unsupported (but can be invoked)
+ override def read(variableName: String): Option[AnyRef] = None
+
+ // Unsupported (but can be invoked)
+ override def completion(code: String, pos: Int): (Int, List[String]) =
+ (pos, Nil)
+
+ // Unsupported
+ override def updatePrintStreams(in: InputStream, out: OutputStream, err: OutputStream): Unit = ???
+
+ // Unsupported
+ override def classServerURI: String = ""
+
+ // Unsupported (but can be invoked)
+ override def bindSparkContext(sparkContext: SparkContext): Unit = {}
+
+ // Unsupported (but can be invoked)
+ override def bindSqlContext(sqlContext: SQLContext): Unit = {}
+
+ // Unsupported
+ override def interrupt(): Interpreter = ???
+
+ // Unsupported
+ override def bind(variableName: String, typeName: String, value: Any, modifiers: List[String]): Unit = ???
+
+ // Unsupported
+ override def addJars(jars: URL*): Unit = ???
+
+ // Unsupported
+ override def doQuietly[T](body: => T): T = ???
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala
new file mode 100644
index 0000000..2429dc4
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sparkr
+
+import com.ibm.spark.interpreter.broker.BrokerProcess
+import scala.collection.JavaConverters._
+
+/**
+ * Represents the R process used to evaluate SparkR code.
+ *
+ * @param sparkRBridge The bridge to use to retrieve kernel output streams
+ * and the Spark version to be verified
+ * @param sparkRProcessHandler The handler to use when the process fails or
+ * completes
+ * @param port The port to provide to the SparkR process to use to connect
+ * back to the JVM
+ */
+class SparkRProcess(
+ private val sparkRBridge: SparkRBridge,
+ private val sparkRProcessHandler: SparkRProcessHandler,
+ private val port: Int
+) extends BrokerProcess(
+ processName = "Rscript",
+ entryResource = "kernelR/sparkr_runner.R",
+ otherResources = Seq("kernelR/sparkr_runner_utils.R", "sparkr_bundle.tar.gz"),
+ brokerBridge = sparkRBridge,
+ brokerProcessHandler = sparkRProcessHandler,
+ arguments = Seq(
+ "--default-packages=datasets,utils,grDevices,graphics,stats,methods"
+ )
+) {
+ override val brokerName: String = "SparkR"
+ private val sparkHome = Option(System.getenv("SPARK_HOME"))
+ .orElse(Option(System.getProperty("spark.home")))
+
+ assert(sparkHome.nonEmpty, "SparkR process requires Spark Home to be set!")
+
+ /**
+ * Creates a new process environment to be used for environment variable
+ * retrieval by the new process.
+ *
+ * @return The map of environment variables and their respective values
+ */
+ override protected def newProcessEnvironment(): Map[String, String] = {
+ val baseEnvironment = super.newProcessEnvironment()
+
+ // Note: Adding the new map values should override the old ones
+ baseEnvironment ++ Map(
+ "SPARK_HOME" -> sparkHome.get,
+ "EXISTING_SPARKR_BACKEND_PORT" -> port.toString
+ )
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcessHandler.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcessHandler.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcessHandler.scala
new file mode 100644
index 0000000..d33d265
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcessHandler.scala
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sparkr
+
+import com.ibm.spark.interpreter.broker.BrokerProcessHandler
+
+/**
+ * Represents the handler for events triggered by the SparkR process.
+ *
+ * @param sparkRBridge The bridge to reset when the process fails or completes
+ * @param restartOnFailure If true, restarts the process if it fails
+ * @param restartOnCompletion If true, restarts the process if it completes
+ */
+class SparkRProcessHandler(
+ private val sparkRBridge: SparkRBridge,
+ private val restartOnFailure: Boolean,
+ private val restartOnCompletion: Boolean
+) extends BrokerProcessHandler(
+ sparkRBridge,
+ restartOnFailure,
+ restartOnCompletion
+) {
+ override val brokerName: String = "SparkR"
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala
new file mode 100644
index 0000000..71731a8
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sparkr
+
+import java.util.concurrent.{TimeUnit, Semaphore}
+
+import com.ibm.spark.interpreter.broker.BrokerService
+import com.ibm.spark.kernel.api.KernelLike
+import com.ibm.spark.kernel.interpreter.sparkr.SparkRTypes.{Code, CodeResults}
+import org.apache.spark.SparkContext
+import org.slf4j.LoggerFactory
+
+import scala.concurrent.{future, Future}
+
+/**
+ * Represents the service that provides the high-level interface between the
+ * JVM and R.
+ *
+ * @param rBackend The backend to start to communicate between the JVM and R
+ * @param sparkRBridge The bridge to use for communication between the JVM and R
+ * @param sparkRProcessHandler The handler used for events that occur with the
+ * SparkR process
+ */
+class SparkRService(
+ private val rBackend: ReflectiveRBackend,
+ private val sparkRBridge: SparkRBridge,
+ private val sparkRProcessHandler: SparkRProcessHandler
+) extends BrokerService {
+ private val logger = LoggerFactory.getLogger(this.getClass)
+ @volatile private var rBackendPort: Int = -1
+ @volatile private var _isRunning: Boolean = false
+ override def isRunning: Boolean = _isRunning
+
+ /** Represents the process used to execute R code via the bridge. */
+ private lazy val sparkRProcess: SparkRProcess = {
+ val p = new SparkRProcess(
+ sparkRBridge,
+ sparkRProcessHandler,
+ rBackendPort
+ )
+
+ // Update handlers to correctly reset and restart the process
+ sparkRProcessHandler.setResetMethod(message => {
+ p.stop()
+ sparkRBridge.state.reset(message)
+ })
+ sparkRProcessHandler.setRestartMethod(() => p.start())
+
+ p
+ }
+
+ /** Starts the SparkR service. */
+ override def start(): Unit = {
+ logger.debug("Initializing statically-accessible SparkR bridge")
+ SparkRBridge.sparkRBridge = sparkRBridge
+
+ val initialized = new Semaphore(0)
+ import scala.concurrent.ExecutionContext.Implicits.global
+ val rBackendRun = future {
+ logger.debug("Initializing RBackend")
+ rBackendPort = rBackend.init()
+ logger.debug(s"RBackend running on port $rBackendPort")
+ initialized.release()
+ logger.debug("Running RBackend")
+ rBackend.run()
+ logger.debug("RBackend has finished")
+ }
+
+ // Wait for backend to start before starting R process to connect
+ val backendTimeout =
+ sys.env.getOrElse("SPARKR_BACKEND_TIMEOUT", "120").toInt
+ if (initialized.tryAcquire(backendTimeout, TimeUnit.SECONDS)) {
+ // Start the R process used to execute code
+ logger.debug("Launching process to execute R code")
+ sparkRProcess.start()
+ _isRunning = true
+ } else {
+ // Unable to initialize, so throw an exception
+ throw new SparkRException(
+ s"Unable to initialize R backend in $backendTimeout seconds!")
+ }
+ }
+
+ /**
+ * Submits code to the SparkR service to be executed and return a result.
+ *
+ * @param code The code to execute
+ *
+ * @return The result as a future to eventually return
+ */
+ override def submitCode(code: Code): Future[CodeResults] = {
+ sparkRBridge.state.pushCode(code)
+ }
+
+ /** Stops the running SparkR service. */
+ override def stop(): Unit = {
+ // Stop the R process used to execute code
+ sparkRProcess.stop()
+
+ // Stop the server used as an entrypoint for R
+ rBackend.close()
+
+ // Clear the bridge
+ SparkRBridge.reset()
+
+ _isRunning = false
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRState.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRState.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRState.scala
new file mode 100644
index 0000000..60f67a3
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRState.scala
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sparkr
+
+import com.ibm.spark.interpreter.broker.BrokerState
+
+/**
+ * Represents the state structure of SparkR.
+ *
+ * @param maxQueuedCode The maximum amount of code to support being queued
+ * at the same time for SparkR execution
+ */
+class SparkRState(private val maxQueuedCode: Int)
+ extends BrokerState(maxQueuedCode)
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTransformer.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTransformer.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTransformer.scala
new file mode 100644
index 0000000..45c44c0
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTransformer.scala
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sparkr
+
+import com.ibm.spark.interpreter.broker.BrokerTransformer
+
+/**
+ * Represents the transformer used by SparkR.
+ */
+class SparkRTransformer extends BrokerTransformer
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTypes.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTypes.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTypes.scala
new file mode 100644
index 0000000..11f33c0
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTypes.scala
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sparkr
+
+import com.ibm.spark.interpreter.broker.BrokerTypesProvider
+
+/**
+ * Represents all types associated with the SparkR interface.
+ */
+object SparkRTypes extends BrokerTypesProvider
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/package.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/package.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/package.scala
new file mode 100644
index 0000000..872d376
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/package.scala
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter
+
+import com.ibm.spark.interpreter.broker.{BrokerCode, BrokerPromise}
+
+/**
+ * Contains aliases to broker types.
+ */
+package object sparkr {
+ /**
+ * Represents a promise made regarding the completion of SparkR code
+ * execution.
+ */
+ type SparkRPromise = BrokerPromise
+
+ /**
+ * Represents a block of SparkR code to be evaluated.
+ */
+ type SparkRCode = BrokerCode
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/magic/builtin/SparkR.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/magic/builtin/SparkR.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/magic/builtin/SparkR.scala
new file mode 100644
index 0000000..7eba136
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/magic/builtin/SparkR.scala
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.magic.builtin
+
+import com.ibm.spark.interpreter.{ExecuteError, ExecuteAborted}
+import com.ibm.spark.kernel.interpreter.sparkr.{SparkRInterpreter, SparkRException}
+import com.ibm.spark.kernel.protocol.v5.MIMEType
+import com.ibm.spark.magic.{CellMagicOutput, CellMagic}
+import com.ibm.spark.magic.dependencies.IncludeKernel
+
+/**
+ * Represents the magic interface to use the SparkR interpreter.
+ */
+class SparkR extends CellMagic with IncludeKernel {
+ override def execute(code: String): CellMagicOutput = {
+ val sparkR = kernel.interpreter("SparkR")
+
+ if (sparkR.isEmpty || sparkR.get == null)
+ throw new SparkRException("SparkR is not available!")
+
+ sparkR.get match {
+ case sparkRInterpreter: SparkRInterpreter =>
+ val (_, output) = sparkRInterpreter.interpret(code)
+ output match {
+ case Left(executeOutput) =>
+ CellMagicOutput(MIMEType.PlainText -> executeOutput)
+ case Right(executeFailure) => executeFailure match {
+ case executeAborted: ExecuteAborted =>
+ throw new SparkRException("SparkR code was aborted!")
+ case executeError: ExecuteError =>
+ throw new SparkRException(executeError.value)
+ }
+ }
+ case otherInterpreter =>
+ val className = otherInterpreter.getClass.getName
+ throw new SparkRException(s"Invalid SparkR interpreter: $className")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlException.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlException.scala b/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlException.scala
deleted file mode 100644
index 2c0b4d5..0000000
--- a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlException.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sql
-
-import com.ibm.spark.interpreter.broker.BrokerException
-
-/**
- * Represents a generic SQL exception.
- *
- * @param message The message to associate with the exception
- */
-class SqlException(message: String) extends BrokerException(message)
-
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlInterpreter.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlInterpreter.scala b/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlInterpreter.scala
deleted file mode 100644
index 889d4a6..0000000
--- a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlInterpreter.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sql
-
-import java.net.URL
-
-import com.ibm.spark.interpreter.{ExecuteFailure, ExecuteOutput, Interpreter}
-import com.ibm.spark.interpreter.Results.Result
-import com.ibm.spark.kernel.api.KernelLike
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
-
-import scala.concurrent.duration._
-import scala.concurrent.Await
-import scala.tools.nsc.interpreter.{OutputStream, InputStream}
-
-/**
- * Represents an interpreter interface to Spark SQL.
- */
-class SqlInterpreter() extends Interpreter {
- private var _kernel: KernelLike = _
- private lazy val sqlService = new SqlService(_kernel)
- private lazy val sqlTransformer = new SqlTransformer
-
- override def init(kernel: KernelLike): Interpreter = {
- _kernel = kernel
- this
- }
-
- /**
- * Executes the provided code with the option to silence output.
- * @param code The code to execute
- * @param silent Whether or not to execute the code silently (no output)
- * @return The success/failure of the interpretation and the output from the
- * execution or the failure
- */
- override def interpret(code: String, silent: Boolean):
- (Result, Either[ExecuteOutput, ExecuteFailure]) =
- {
- if (!sqlService.isRunning) sqlService.start()
-
- val futureResult = sqlTransformer.transformToInterpreterResult(
- sqlService.submitCode(code)
- )
-
- Await.result(futureResult, Duration.Inf)
- }
-
- /**
- * Starts the interpreter, initializing any internal state.
- * @return A reference to the interpreter
- */
- override def start(): Interpreter = {
- sqlService.start()
-
- this
- }
-
- /**
- * Stops the interpreter, removing any previous internal state.
- * @return A reference to the interpreter
- */
- override def stop(): Interpreter = {
- sqlService.stop()
-
- this
- }
-
- /**
- * Returns the class loader used by this interpreter.
- *
- * @return The runtime class loader used by this interpreter
- */
- override def classLoader: ClassLoader = this.getClass.getClassLoader
-
- // Unsupported (but can be invoked)
- override def lastExecutionVariableName: Option[String] = None
-
- // Unsupported (but can be invoked)
- override def read(variableName: String): Option[AnyRef] = None
-
- // Unsupported (but can be invoked)
- override def completion(code: String, pos: Int): (Int, List[String]) =
- (pos, Nil)
-
- // Unsupported
- override def updatePrintStreams(in: InputStream, out: OutputStream, err: OutputStream): Unit = ???
-
- // Unsupported
- override def classServerURI: String = ""
-
- // Unsupported (but can be invoked)
- override def bindSparkContext(sparkContext: SparkContext): Unit = {}
-
- // Unsupported (but can be invoked)
- override def bindSqlContext(sqlContext: SQLContext): Unit = {}
-
- // Unsupported
- override def interrupt(): Interpreter = ???
-
- // Unsupported
- override def bind(variableName: String, typeName: String, value: Any, modifiers: List[String]): Unit = ???
-
- // Unsupported
- override def addJars(jars: URL*): Unit = ???
-
- // Unsupported
- override def doQuietly[T](body: => T): T = ???
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlService.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlService.scala b/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlService.scala
deleted file mode 100644
index 2f2fed6..0000000
--- a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlService.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sql
-
-import com.ibm.spark.kernel.api.KernelLike
-import java.io.ByteArrayOutputStream
-
-import com.ibm.spark.interpreter.broker.BrokerService
-import com.ibm.spark.kernel.interpreter.sql.SqlTypes._
-import org.apache.spark.sql.SQLContext
-
-import scala.concurrent.{Future, future}
-
-/**
- * Represents the service that provides the high-level interface between the
- * JVM and Spark SQL.
- *
- * @param kernel The SQL Context of Apache Spark to use to perform SQL
- * queries
- */
-class SqlService(private val kernel: KernelLike) extends BrokerService {
- import scala.concurrent.ExecutionContext.Implicits.global
-
- @volatile private var _isRunning: Boolean = false
- override def isRunning: Boolean = _isRunning
-
- /**
- * Submits code to the broker service to be executed and return a result.
- *
- * @param code The code to execute
- *
- * @return The result as a future to eventually return
- */
- override def submitCode(code: Code): Future[CodeResults] = future {
- println(s"Executing: '${code.trim}'")
- val result = kernel.sqlContext.sql(code.trim)
-
- // TODO: There is an internal method used for show called showString that
- // supposedly is only for the Python API, look into why
- val stringOutput = {
- val outputStream = new ByteArrayOutputStream()
- Console.withOut(outputStream) {
- // TODO: Provide some way to change the number of records shown
- result.show(10)
- }
- outputStream.toString("UTF-8")
- }
-
- stringOutput
- }
-
- /** Stops the running broker service. */
- override def stop(): Unit = _isRunning = false
-
- /** Starts the broker service. */
- override def start(): Unit = _isRunning = true
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTransformer.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTransformer.scala b/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTransformer.scala
deleted file mode 100644
index 114c97f..0000000
--- a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTransformer.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sql
-
-import com.ibm.spark.interpreter.broker.BrokerTransformer
-
-/**
- * Represents the transformer used by Apache SQL.
- */
-class SqlTransformer extends BrokerTransformer
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTypes.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTypes.scala b/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTypes.scala
deleted file mode 100644
index a2fbd10..0000000
--- a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTypes.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sql
-
-import com.ibm.spark.interpreter.broker.BrokerTypesProvider
-
-/**
- * Represents all types associated with the SQL interface.
- */
-object SqlTypes extends BrokerTypesProvider
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/com/ibm/spark/magic/builtin/Sql.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/com/ibm/spark/magic/builtin/Sql.scala b/sql-interpreter/src/main/scala/com/ibm/spark/magic/builtin/Sql.scala
deleted file mode 100644
index a8f439c..0000000
--- a/sql-interpreter/src/main/scala/com/ibm/spark/magic/builtin/Sql.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.magic.builtin
-
-import com.ibm.spark.interpreter.{ExecuteError, ExecuteAborted}
-import com.ibm.spark.kernel.interpreter.sql.{SqlInterpreter, SqlException}
-import com.ibm.spark.kernel.protocol.v5.MIMEType
-import com.ibm.spark.magic.{CellMagicOutput, CellMagic}
-import com.ibm.spark.magic.dependencies.IncludeKernel
-
-/**
- * Represents the magic interface to use the SQL interpreter.
- */
-class Sql extends CellMagic with IncludeKernel {
- override def execute(code: String): CellMagicOutput = {
- val sparkR = kernel.interpreter("SQL")
-
- if (sparkR.isEmpty || sparkR.get == null)
- throw new SqlException("SQL is not available!")
-
- sparkR.get match {
- case sparkRInterpreter: SqlInterpreter =>
- val (_, output) = sparkRInterpreter.interpret(code)
- output match {
- case Left(executeOutput) =>
- CellMagicOutput(MIMEType.PlainText -> executeOutput)
- case Right(executeFailure) => executeFailure match {
- case executeAborted: ExecuteAborted =>
- throw new SqlException("SQL code was aborted!")
- case executeError: ExecuteError =>
- throw new SqlException(executeError.value)
- }
- }
- case otherInterpreter =>
- val className = otherInterpreter.getClass.getName
- throw new SqlException(s"Invalid SQL interpreter: $className")
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlException.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlException.scala b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlException.scala
new file mode 100644
index 0000000..2c0b4d5
--- /dev/null
+++ b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlException.scala
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sql
+
+import com.ibm.spark.interpreter.broker.BrokerException
+
+/**
+ * Represents a generic SQL exception.
+ *
+ * @param message The message to associate with the exception
+ */
+class SqlException(message: String) extends BrokerException(message)
+
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala
new file mode 100644
index 0000000..889d4a6
--- /dev/null
+++ b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sql
+
+import java.net.URL
+
+import com.ibm.spark.interpreter.{ExecuteFailure, ExecuteOutput, Interpreter}
+import com.ibm.spark.interpreter.Results.Result
+import com.ibm.spark.kernel.api.KernelLike
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+
+import scala.concurrent.duration._
+import scala.concurrent.Await
+import scala.tools.nsc.interpreter.{OutputStream, InputStream}
+
+/**
+ * Represents an interpreter interface to Spark SQL.
+ */
+class SqlInterpreter() extends Interpreter {
+ private var _kernel: KernelLike = _
+ private lazy val sqlService = new SqlService(_kernel)
+ private lazy val sqlTransformer = new SqlTransformer
+
+ override def init(kernel: KernelLike): Interpreter = {
+ _kernel = kernel
+ this
+ }
+
+ /**
+ * Executes the provided code with the option to silence output.
+ * @param code The code to execute
+ * @param silent Whether or not to execute the code silently (no output)
+ * @return The success/failure of the interpretation and the output from the
+ * execution or the failure
+ */
+ override def interpret(code: String, silent: Boolean):
+ (Result, Either[ExecuteOutput, ExecuteFailure]) =
+ {
+ if (!sqlService.isRunning) sqlService.start()
+
+ val futureResult = sqlTransformer.transformToInterpreterResult(
+ sqlService.submitCode(code)
+ )
+
+ Await.result(futureResult, Duration.Inf)
+ }
+
+ /**
+ * Starts the interpreter, initializing any internal state.
+ * @return A reference to the interpreter
+ */
+ override def start(): Interpreter = {
+ sqlService.start()
+
+ this
+ }
+
+ /**
+ * Stops the interpreter, removing any previous internal state.
+ * @return A reference to the interpreter
+ */
+ override def stop(): Interpreter = {
+ sqlService.stop()
+
+ this
+ }
+
+ /**
+ * Returns the class loader used by this interpreter.
+ *
+ * @return The runtime class loader used by this interpreter
+ */
+ override def classLoader: ClassLoader = this.getClass.getClassLoader
+
+ // Unsupported (but can be invoked)
+ override def lastExecutionVariableName: Option[String] = None
+
+ // Unsupported (but can be invoked)
+ override def read(variableName: String): Option[AnyRef] = None
+
+ // Unsupported (but can be invoked)
+ override def completion(code: String, pos: Int): (Int, List[String]) =
+ (pos, Nil)
+
+ // Unsupported
+ override def updatePrintStreams(in: InputStream, out: OutputStream, err: OutputStream): Unit = ???
+
+ // Unsupported
+ override def classServerURI: String = ""
+
+ // Unsupported (but can be invoked)
+ override def bindSparkContext(sparkContext: SparkContext): Unit = {}
+
+ // Unsupported (but can be invoked)
+ override def bindSqlContext(sqlContext: SQLContext): Unit = {}
+
+ // Unsupported
+ override def interrupt(): Interpreter = ???
+
+ // Unsupported
+ override def bind(variableName: String, typeName: String, value: Any, modifiers: List[String]): Unit = ???
+
+ // Unsupported
+ override def addJars(jars: URL*): Unit = ???
+
+ // Unsupported
+ override def doQuietly[T](body: => T): T = ???
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlService.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlService.scala b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlService.scala
new file mode 100644
index 0000000..2f2fed6
--- /dev/null
+++ b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlService.scala
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sql
+
+import com.ibm.spark.kernel.api.KernelLike
+import java.io.ByteArrayOutputStream
+
+import com.ibm.spark.interpreter.broker.BrokerService
+import com.ibm.spark.kernel.interpreter.sql.SqlTypes._
+import org.apache.spark.sql.SQLContext
+
+import scala.concurrent.{Future, future}
+
+/**
+ * Represents the service that provides the high-level interface between the
+ * JVM and Spark SQL.
+ *
+ * @param kernel The SQL Context of Apache Spark to use to perform SQL
+ * queries
+ */
+class SqlService(private val kernel: KernelLike) extends BrokerService {
+ import scala.concurrent.ExecutionContext.Implicits.global
+
+ @volatile private var _isRunning: Boolean = false
+ override def isRunning: Boolean = _isRunning
+
+ /**
+ * Submits code to the broker service to be executed and return a result.
+ *
+ * @param code The code to execute
+ *
+ * @return The result as a future to eventually return
+ */
+ override def submitCode(code: Code): Future[CodeResults] = future {
+ println(s"Executing: '${code.trim}'")
+ val result = kernel.sqlContext.sql(code.trim)
+
+ // TODO: There is an internal method used for show called showString that
+ // supposedly is only for the Python API, look into why
+ val stringOutput = {
+ val outputStream = new ByteArrayOutputStream()
+ Console.withOut(outputStream) {
+ // TODO: Provide some way to change the number of records shown
+ result.show(10)
+ }
+ outputStream.toString("UTF-8")
+ }
+
+ stringOutput
+ }
+
+ /** Stops the running broker service. */
+ override def stop(): Unit = _isRunning = false
+
+ /** Starts the broker service. */
+ override def start(): Unit = _isRunning = true
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTransformer.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTransformer.scala b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTransformer.scala
new file mode 100644
index 0000000..114c97f
--- /dev/null
+++ b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTransformer.scala
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sql
+
+import com.ibm.spark.interpreter.broker.BrokerTransformer
+
+/**
+ * Represents the transformer used by Apache SQL.
+ */
+class SqlTransformer extends BrokerTransformer
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTypes.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTypes.scala b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTypes.scala
new file mode 100644
index 0000000..a2fbd10
--- /dev/null
+++ b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTypes.scala
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sql
+
+import com.ibm.spark.interpreter.broker.BrokerTypesProvider
+
+/**
+ * Represents all types associated with the SQL interface.
+ */
+object SqlTypes extends BrokerTypesProvider