You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lb...@apache.org on 2016/01/22 23:07:10 UTC

[02/51] [abbrv] incubator-toree git commit: Moved scala files to new locations based on new package

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/ReflectiveRBackend.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/ReflectiveRBackend.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/ReflectiveRBackend.scala
deleted file mode 100644
index 81cb86e..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/ReflectiveRBackend.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sparkr
-
-/**
- * Provides reflective access into the backend R component that is not
- * publically accessible.
- */
-class ReflectiveRBackend {
-  private val rBackendClass = Class.forName("org.apache.spark.api.r.RBackend")
-  private val rBackendInstance = rBackendClass.newInstance()
-
-  /**
-   * Initializes the underlying RBackend service.
-   *
-   * @return The port used by the service
-   */
-  def init(): Int = {
-    val runMethod = rBackendClass.getDeclaredMethod("init")
-
-    runMethod.invoke(rBackendInstance).asInstanceOf[Int]
-  }
-
-  /** Blocks until the service has finished. */
-  def run(): Unit = {
-    val runMethod = rBackendClass.getDeclaredMethod("run")
-
-    runMethod.invoke(rBackendInstance)
-  }
-
-  /** Closes the underlying RBackend service. */
-  def close(): Unit = {
-    val runMethod = rBackendClass.getDeclaredMethod("close")
-
-    runMethod.invoke(rBackendInstance)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRBridge.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRBridge.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRBridge.scala
deleted file mode 100644
index 44fa203..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRBridge.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sparkr
-
-import com.ibm.spark.interpreter.broker.producer.{StandardSQLContextProducer, StandardJavaSparkContextProducer, JavaSparkContextProducerLike, SQLContextProducerLike}
-import com.ibm.spark.interpreter.broker.{BrokerState, BrokerBridge}
-import com.ibm.spark.kernel.api.KernelLike
-import org.apache.spark.SparkContext
-
-/**
- * Represents constants for the SparkR bridge.
- */
-object SparkRBridge {
-  /** Represents the maximum amount of code that can be queued for Python. */
-  val MaxQueuedCode = 500
-
-  /** Contains the bridge used by the current R process. */
-  @volatile private var _sparkRBridge: Option[SparkRBridge] = None
-
-  /** Allows kernel to set bridge dynamically. */
-  private[sparkr] def sparkRBridge_=(newSparkRBridge: SparkRBridge): Unit = {
-    _sparkRBridge = Some(newSparkRBridge)
-  }
-
-  /** Clears the bridge currently hosted statically. */
-  private[sparkr] def reset(): Unit = _sparkRBridge = None
-
-  /** Must be exposed in a static location for RBackend to access. */
-  def sparkRBridge: SparkRBridge = {
-    assert(_sparkRBridge.nonEmpty, "SparkRBridge has not been initialized!")
-    _sparkRBridge.get
-  }
-
-  /**
-   * Creates a new SparkRBridge instance.
-   *
-   * @param brokerState The container of broker state to expose
-   * @param kernel The kernel API to expose through the bridge
-   *
-   * @return The new SparkR bridge
-   */
-  def apply(
-    brokerState: BrokerState,
-    kernel: KernelLike
-    ): SparkRBridge = {
-    new SparkRBridge(
-      _brokerState = brokerState,
-      _kernel = kernel
-    ) with StandardJavaSparkContextProducer with StandardSQLContextProducer
-  }
-}
-
-/**
- * Represents the API available to SparkR to act as the bridge for data
- * between the JVM and R.
- *
- * @param _brokerState The container of broker state to expose
- * @param _kernel The kernel API to expose through the bridge
- */
-class SparkRBridge private (
-  private val _brokerState: BrokerState,
-  private val _kernel: KernelLike
-) extends BrokerBridge(_brokerState, _kernel) {
-  override val brokerName: String = "SparkR"
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRException.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRException.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRException.scala
deleted file mode 100644
index 0be8f61..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRException.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sparkr
-
-import com.ibm.spark.interpreter.broker.BrokerException
-
-/**
- * Represents a generic SparkR exception.
- *
- * @param message The message to associate with the exception
- */
-class SparkRException(message: String) extends BrokerException(message)

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRInterpreter.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRInterpreter.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRInterpreter.scala
deleted file mode 100644
index 45fe03c..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRInterpreter.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sparkr
-
-import java.net.URL
-
-import com.ibm.spark.interpreter.Results.Result
-import com.ibm.spark.interpreter._
-import com.ibm.spark.kernel.api.KernelLike
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
-import org.slf4j.LoggerFactory
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-import scala.tools.nsc.interpreter.{InputStream, OutputStream}
-
-/**
- * Represents an interpreter interface to SparkR. Requires a properly-set
- * SPARK_HOME pointing to a binary distribution (needs packaged SparkR library)
- * and an implementation of R on the path.
- *
- */
-class SparkRInterpreter(
-) extends Interpreter {
-  private val logger = LoggerFactory.getLogger(this.getClass)
-  private var _kernel: KernelLike = _
-
-  // TODO: Replace hard-coded maximum queue count
-  /** Represents the state used by this interpreter's R instance. */
-  private lazy val sparkRState = new SparkRState(500)
-
-  /** Represents the bridge used by this interpreter's R instance. */
-  private lazy val sparkRBridge = SparkRBridge(
-    sparkRState,
-    _kernel
-  )
-
-  /** Represents the interface for R to talk to JVM Spark components. */
-  private lazy val rBackend = new ReflectiveRBackend
-
-  /** Represents the process handler used for the SparkR process. */
-  private lazy val sparkRProcessHandler: SparkRProcessHandler =
-    new SparkRProcessHandler(
-      sparkRBridge,
-      restartOnFailure = true,
-      restartOnCompletion = true
-    )
-
-  private lazy val sparkRService = new SparkRService(
-    rBackend,
-    sparkRBridge,
-    sparkRProcessHandler
-  )
-  private lazy val sparkRTransformer = new SparkRTransformer
-
-  override def init(kernel: KernelLike): Interpreter = {
-    _kernel = kernel
-    this
-  }
-
-  /**
-   * Executes the provided code with the option to silence output.
-   * @param code The code to execute
-   * @param silent Whether or not to execute the code silently (no output)
-   * @return The success/failure of the interpretation and the output from the
-   *         execution or the failure
-   */
-  override def interpret(code: String, silent: Boolean):
-    (Result, Either[ExecuteOutput, ExecuteFailure]) =
-  {
-    if (!sparkRService.isRunning) sparkRService.start()
-
-    val futureResult = sparkRTransformer.transformToInterpreterResult(
-      sparkRService.submitCode(code)
-    )
-
-    Await.result(futureResult, Duration.Inf)
-  }
-
-  /**
-   * Starts the interpreter, initializing any internal state.
-   * @return A reference to the interpreter
-   */
-  override def start(): Interpreter = {
-    sparkRService.start()
-
-    this
-  }
-
-  /**
-   * Stops the interpreter, removing any previous internal state.
-   * @return A reference to the interpreter
-   */
-  override def stop(): Interpreter = {
-    sparkRService.stop()
-
-    this
-  }
-
-  /**
-   * Returns the class loader used by this interpreter.
-   *
-   * @return The runtime class loader used by this interpreter
-   */
-  override def classLoader: ClassLoader = this.getClass.getClassLoader
-
-  // Unsupported (but can be invoked)
-  override def lastExecutionVariableName: Option[String] = None
-
-  // Unsupported (but can be invoked)
-  override def read(variableName: String): Option[AnyRef] = None
-
-  // Unsupported (but can be invoked)
-  override def completion(code: String, pos: Int): (Int, List[String]) =
-    (pos, Nil)
-
-  // Unsupported
-  override def updatePrintStreams(in: InputStream, out: OutputStream, err: OutputStream): Unit = ???
-
-  // Unsupported
-  override def classServerURI: String = ""
-
-  // Unsupported (but can be invoked)
-  override def bindSparkContext(sparkContext: SparkContext): Unit = {}
-
-  // Unsupported (but can be invoked)
-  override def bindSqlContext(sqlContext: SQLContext): Unit = {}
-
-  // Unsupported
-  override def interrupt(): Interpreter = ???
-
-  // Unsupported
-  override def bind(variableName: String, typeName: String, value: Any, modifiers: List[String]): Unit = ???
-
-  // Unsupported
-  override def addJars(jars: URL*): Unit = ???
-
-  // Unsupported
-  override def doQuietly[T](body: => T): T = ???
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcess.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcess.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcess.scala
deleted file mode 100644
index 2429dc4..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcess.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sparkr
-
-import com.ibm.spark.interpreter.broker.BrokerProcess
-import scala.collection.JavaConverters._
-
-/**
- * Represents the R process used to evaluate SparkR code.
- *
- * @param sparkRBridge The bridge to use to retrieve kernel output streams
- *                      and the Spark version to be verified
- * @param sparkRProcessHandler The handler to use when the process fails or
- *                             completes
- * @param port The port to provide to the SparkR process to use to connect
- *             back to the JVM
- */
-class SparkRProcess(
-  private val sparkRBridge: SparkRBridge,
-  private val sparkRProcessHandler: SparkRProcessHandler,
-  private val port: Int
-) extends BrokerProcess(
-  processName = "Rscript",
-  entryResource = "kernelR/sparkr_runner.R",
-  otherResources = Seq("kernelR/sparkr_runner_utils.R", "sparkr_bundle.tar.gz"),
-  brokerBridge = sparkRBridge,
-  brokerProcessHandler = sparkRProcessHandler,
-  arguments = Seq(
-    "--default-packages=datasets,utils,grDevices,graphics,stats,methods"
-  )
-) {
-  override val brokerName: String = "SparkR"
-  private val sparkHome = Option(System.getenv("SPARK_HOME"))
-    .orElse(Option(System.getProperty("spark.home")))
-
-  assert(sparkHome.nonEmpty, "SparkR process requires Spark Home to be set!")
-
-  /**
-   * Creates a new process environment to be used for environment variable
-   * retrieval by the new process.
-   *
-   * @return The map of environment variables and their respective values
-   */
-  override protected def newProcessEnvironment(): Map[String, String] = {
-    val baseEnvironment = super.newProcessEnvironment()
-
-    // Note: Adding the new map values should override the old ones
-    baseEnvironment ++ Map(
-      "SPARK_HOME"                    -> sparkHome.get,
-      "EXISTING_SPARKR_BACKEND_PORT"  -> port.toString
-    )
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcessHandler.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcessHandler.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcessHandler.scala
deleted file mode 100644
index d33d265..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcessHandler.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sparkr
-
-import com.ibm.spark.interpreter.broker.BrokerProcessHandler
-
-/**
- * Represents the handler for events triggered by the SparkR process.
- *
- * @param sparkRBridge The bridge to reset when the process fails or completes
- * @param restartOnFailure If true, restarts the process if it fails
- * @param restartOnCompletion If true, restarts the process if it completes
- */
-class SparkRProcessHandler(
-  private val sparkRBridge: SparkRBridge,
-  private val restartOnFailure: Boolean,
-  private val restartOnCompletion: Boolean
-) extends BrokerProcessHandler(
-  sparkRBridge,
-  restartOnFailure,
-  restartOnCompletion
-) {
-  override val brokerName: String = "SparkR"
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRService.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRService.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRService.scala
deleted file mode 100644
index 71731a8..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRService.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sparkr
-
-import java.util.concurrent.{TimeUnit, Semaphore}
-
-import com.ibm.spark.interpreter.broker.BrokerService
-import com.ibm.spark.kernel.api.KernelLike
-import com.ibm.spark.kernel.interpreter.sparkr.SparkRTypes.{Code, CodeResults}
-import org.apache.spark.SparkContext
-import org.slf4j.LoggerFactory
-
-import scala.concurrent.{future, Future}
-
-/**
- * Represents the service that provides the high-level interface between the
- * JVM and R.
- *
- * @param rBackend The backend to start to communicate between the JVM and R
- * @param sparkRBridge The bridge to use for communication between the JVM and R
- * @param sparkRProcessHandler The handler used for events that occur with the
- *                             SparkR process
- */
-class SparkRService(
-  private val rBackend: ReflectiveRBackend,
-  private val sparkRBridge: SparkRBridge,
-  private val sparkRProcessHandler: SparkRProcessHandler
-) extends BrokerService {
-  private val logger = LoggerFactory.getLogger(this.getClass)
-  @volatile private var rBackendPort: Int = -1
-  @volatile private var _isRunning: Boolean = false
-  override def isRunning: Boolean = _isRunning
-
-  /** Represents the process used to execute R code via the bridge. */
-  private lazy val sparkRProcess: SparkRProcess = {
-    val p = new SparkRProcess(
-      sparkRBridge,
-      sparkRProcessHandler,
-      rBackendPort
-    )
-
-    // Update handlers to correctly reset and restart the process
-    sparkRProcessHandler.setResetMethod(message => {
-      p.stop()
-      sparkRBridge.state.reset(message)
-    })
-    sparkRProcessHandler.setRestartMethod(() => p.start())
-
-    p
-  }
-
-  /** Starts the SparkR service. */
-  override def start(): Unit = {
-    logger.debug("Initializing statically-accessible SparkR bridge")
-    SparkRBridge.sparkRBridge = sparkRBridge
-
-    val initialized = new Semaphore(0)
-    import scala.concurrent.ExecutionContext.Implicits.global
-    val rBackendRun = future {
-      logger.debug("Initializing RBackend")
-      rBackendPort = rBackend.init()
-      logger.debug(s"RBackend running on port $rBackendPort")
-      initialized.release()
-      logger.debug("Running RBackend")
-      rBackend.run()
-      logger.debug("RBackend has finished")
-    }
-
-    // Wait for backend to start before starting R process to connect
-    val backendTimeout =
-      sys.env.getOrElse("SPARKR_BACKEND_TIMEOUT", "120").toInt
-    if (initialized.tryAcquire(backendTimeout, TimeUnit.SECONDS)) {
-      // Start the R process used to execute code
-      logger.debug("Launching process to execute R code")
-      sparkRProcess.start()
-      _isRunning = true
-    } else {
-      // Unable to initialize, so throw an exception
-      throw new SparkRException(
-        s"Unable to initialize R backend in $backendTimeout seconds!")
-    }
-  }
-
-  /**
-   * Submits code to the SparkR service to be executed and return a result.
-   *
-   * @param code The code to execute
-   *
-   * @return The result as a future to eventually return
-   */
-  override def submitCode(code: Code): Future[CodeResults] = {
-    sparkRBridge.state.pushCode(code)
-  }
-
-  /** Stops the running SparkR service. */
-  override def stop(): Unit = {
-    // Stop the R process used to execute code
-    sparkRProcess.stop()
-
-    // Stop the server used as an entrypoint for R
-    rBackend.close()
-
-    // Clear the bridge
-    SparkRBridge.reset()
-
-    _isRunning = false
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRState.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRState.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRState.scala
deleted file mode 100644
index 60f67a3..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRState.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sparkr
-
-import com.ibm.spark.interpreter.broker.BrokerState
-
-/**
- * Represents the state structure of SparkR.
- *
- * @param maxQueuedCode The maximum amount of code to support being queued
- *                      at the same time for SparkR execution
- */
-class SparkRState(private val maxQueuedCode: Int)
-  extends BrokerState(maxQueuedCode)

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTransformer.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTransformer.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTransformer.scala
deleted file mode 100644
index 45c44c0..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTransformer.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sparkr
-
-import com.ibm.spark.interpreter.broker.BrokerTransformer
-
-/**
- * Represents the transformer used by SparkR.
- */
-class SparkRTransformer extends BrokerTransformer

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTypes.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTypes.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTypes.scala
deleted file mode 100644
index 11f33c0..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTypes.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sparkr
-
-import com.ibm.spark.interpreter.broker.BrokerTypesProvider
-
-/**
- * Represents all types associated with the SparkR interface.
- */
-object SparkRTypes extends BrokerTypesProvider

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/package.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/package.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/package.scala
deleted file mode 100644
index 872d376..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/package.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter
-
-import com.ibm.spark.interpreter.broker.{BrokerCode, BrokerPromise}
-
-/**
- * Contains aliases to broker types.
- */
-package object sparkr {
-  /**
-   * Represents a promise made regarding the completion of SparkR code
-   * execution.
-   */
-  type SparkRPromise = BrokerPromise
-
-  /**
-   * Represents a block of SparkR code to be evaluated.
-   */
-  type SparkRCode = BrokerCode
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/magic/builtin/SparkR.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/magic/builtin/SparkR.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/magic/builtin/SparkR.scala
deleted file mode 100644
index 7eba136..0000000
--- a/sparkr-interpreter/src/main/scala/com/ibm/spark/magic/builtin/SparkR.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.magic.builtin
-
-import com.ibm.spark.interpreter.{ExecuteError, ExecuteAborted}
-import com.ibm.spark.kernel.interpreter.sparkr.{SparkRInterpreter, SparkRException}
-import com.ibm.spark.kernel.protocol.v5.MIMEType
-import com.ibm.spark.magic.{CellMagicOutput, CellMagic}
-import com.ibm.spark.magic.dependencies.IncludeKernel
-
-/**
- * Represents the magic interface to use the SparkR interpreter.
- */
-class SparkR extends CellMagic with IncludeKernel {
-  override def execute(code: String): CellMagicOutput = {
-    val sparkR = kernel.interpreter("SparkR")
-
-    if (sparkR.isEmpty || sparkR.get == null)
-      throw new SparkRException("SparkR is not available!")
-
-    sparkR.get match {
-      case sparkRInterpreter: SparkRInterpreter =>
-        val (_, output) = sparkRInterpreter.interpret(code)
-        output match {
-          case Left(executeOutput) =>
-            CellMagicOutput(MIMEType.PlainText -> executeOutput)
-          case Right(executeFailure) => executeFailure match {
-            case executeAborted: ExecuteAborted =>
-              throw new SparkRException("SparkR code was aborted!")
-            case executeError: ExecuteError =>
-              throw new SparkRException(executeError.value)
-          }
-        }
-      case otherInterpreter =>
-        val className = otherInterpreter.getClass.getName
-        throw new SparkRException(s"Invalid SparkR interpreter: $className")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/ReflectiveRBackend.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/ReflectiveRBackend.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/ReflectiveRBackend.scala
new file mode 100644
index 0000000..81cb86e
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/ReflectiveRBackend.scala
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sparkr
+
+/**
+ * Provides reflective access into the backend R component that is not
+ * publically accessible.
+ */
+class ReflectiveRBackend {
+  private val rBackendClass = Class.forName("org.apache.spark.api.r.RBackend")
+  private val rBackendInstance = rBackendClass.newInstance()
+
+  /**
+   * Initializes the underlying RBackend service.
+   *
+   * @return The port used by the service
+   */
+  def init(): Int = {
+    val runMethod = rBackendClass.getDeclaredMethod("init")
+
+    runMethod.invoke(rBackendInstance).asInstanceOf[Int]
+  }
+
+  /** Blocks until the service has finished. */
+  def run(): Unit = {
+    val runMethod = rBackendClass.getDeclaredMethod("run")
+
+    runMethod.invoke(rBackendInstance)
+  }
+
+  /** Closes the underlying RBackend service. */
+  def close(): Unit = {
+    val runMethod = rBackendClass.getDeclaredMethod("close")
+
+    runMethod.invoke(rBackendInstance)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRBridge.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRBridge.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRBridge.scala
new file mode 100644
index 0000000..44fa203
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRBridge.scala
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sparkr
+
+import com.ibm.spark.interpreter.broker.producer.{StandardSQLContextProducer, StandardJavaSparkContextProducer, JavaSparkContextProducerLike, SQLContextProducerLike}
+import com.ibm.spark.interpreter.broker.{BrokerState, BrokerBridge}
+import com.ibm.spark.kernel.api.KernelLike
+import org.apache.spark.SparkContext
+
+/**
+ * Represents constants for the SparkR bridge.
+ */
+object SparkRBridge {
+  /** Represents the maximum amount of code that can be queued for Python. */
+  val MaxQueuedCode = 500
+
+  /** Contains the bridge used by the current R process. */
+  @volatile private var _sparkRBridge: Option[SparkRBridge] = None
+
+  /** Allows kernel to set bridge dynamically. */
+  private[sparkr] def sparkRBridge_=(newSparkRBridge: SparkRBridge): Unit = {
+    _sparkRBridge = Some(newSparkRBridge)
+  }
+
+  /** Clears the bridge currently hosted statically. */
+  private[sparkr] def reset(): Unit = _sparkRBridge = None
+
+  /** Must be exposed in a static location for RBackend to access. */
+  def sparkRBridge: SparkRBridge = {
+    assert(_sparkRBridge.nonEmpty, "SparkRBridge has not been initialized!")
+    _sparkRBridge.get
+  }
+
+  /**
+   * Creates a new SparkRBridge instance.
+   *
+   * @param brokerState The container of broker state to expose
+   * @param kernel The kernel API to expose through the bridge
+   *
+   * @return The new SparkR bridge
+   */
+  def apply(
+    brokerState: BrokerState,
+    kernel: KernelLike
+    ): SparkRBridge = {
+    new SparkRBridge(
+      _brokerState = brokerState,
+      _kernel = kernel
+    ) with StandardJavaSparkContextProducer with StandardSQLContextProducer
+  }
+}
+
+/**
+ * Represents the API available to SparkR to act as the bridge for data
+ * between the JVM and R.
+ *
+ * @param _brokerState The container of broker state to expose
+ * @param _kernel The kernel API to expose through the bridge
+ */
+class SparkRBridge private (
+  private val _brokerState: BrokerState,
+  private val _kernel: KernelLike
+) extends BrokerBridge(_brokerState, _kernel) {
+  override val brokerName: String = "SparkR"
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRException.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRException.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRException.scala
new file mode 100644
index 0000000..0be8f61
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRException.scala
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sparkr
+
+import com.ibm.spark.interpreter.broker.BrokerException
+
+/**
+ * Represents a generic SparkR exception.
+ *
+ * @param message The message to associate with the exception
+ */
+class SparkRException(message: String) extends BrokerException(message)

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
new file mode 100644
index 0000000..45fe03c
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sparkr
+
+import java.net.URL
+
+import com.ibm.spark.interpreter.Results.Result
+import com.ibm.spark.interpreter._
+import com.ibm.spark.kernel.api.KernelLike
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.slf4j.LoggerFactory
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.tools.nsc.interpreter.{InputStream, OutputStream}
+
+/**
+ * Represents an interpreter interface to SparkR. Requires a properly-set
+ * SPARK_HOME pointing to a binary distribution (needs packaged SparkR library)
+ * and an implementation of R on the path.
+ *
+ */
+class SparkRInterpreter(
+) extends Interpreter {
+  private val logger = LoggerFactory.getLogger(this.getClass)
+  private var _kernel: KernelLike = _
+
+  // TODO: Replace hard-coded maximum queue count
+  /** Represents the state used by this interpreter's R instance. */
+  private lazy val sparkRState = new SparkRState(500)
+
+  /** Represents the bridge used by this interpreter's R instance. */
+  private lazy val sparkRBridge = SparkRBridge(
+    sparkRState,
+    _kernel
+  )
+
+  /** Represents the interface for R to talk to JVM Spark components. */
+  private lazy val rBackend = new ReflectiveRBackend
+
+  /** Represents the process handler used for the SparkR process. */
+  private lazy val sparkRProcessHandler: SparkRProcessHandler =
+    new SparkRProcessHandler(
+      sparkRBridge,
+      restartOnFailure = true,
+      restartOnCompletion = true
+    )
+
+  private lazy val sparkRService = new SparkRService(
+    rBackend,
+    sparkRBridge,
+    sparkRProcessHandler
+  )
+  private lazy val sparkRTransformer = new SparkRTransformer
+
+  override def init(kernel: KernelLike): Interpreter = {
+    _kernel = kernel
+    this
+  }
+
+  /**
+   * Executes the provided code with the option to silence output.
+   * @param code The code to execute
+   * @param silent Whether or not to execute the code silently (no output)
+   * @return The success/failure of the interpretation and the output from the
+   *         execution or the failure
+   */
+  override def interpret(code: String, silent: Boolean):
+    (Result, Either[ExecuteOutput, ExecuteFailure]) =
+  {
+    if (!sparkRService.isRunning) sparkRService.start()
+
+    val futureResult = sparkRTransformer.transformToInterpreterResult(
+      sparkRService.submitCode(code)
+    )
+
+    Await.result(futureResult, Duration.Inf)
+  }
+
+  /**
+   * Starts the interpreter, initializing any internal state.
+   * @return A reference to the interpreter
+   */
+  override def start(): Interpreter = {
+    sparkRService.start()
+
+    this
+  }
+
+  /**
+   * Stops the interpreter, removing any previous internal state.
+   * @return A reference to the interpreter
+   */
+  override def stop(): Interpreter = {
+    sparkRService.stop()
+
+    this
+  }
+
+  /**
+   * Returns the class loader used by this interpreter.
+   *
+   * @return The runtime class loader used by this interpreter
+   */
+  override def classLoader: ClassLoader = this.getClass.getClassLoader
+
+  // Unsupported (but can be invoked)
+  override def lastExecutionVariableName: Option[String] = None
+
+  // Unsupported (but can be invoked)
+  override def read(variableName: String): Option[AnyRef] = None
+
+  // Unsupported (but can be invoked)
+  override def completion(code: String, pos: Int): (Int, List[String]) =
+    (pos, Nil)
+
+  // Unsupported
+  override def updatePrintStreams(in: InputStream, out: OutputStream, err: OutputStream): Unit = ???
+
+  // Unsupported
+  override def classServerURI: String = ""
+
+  // Unsupported (but can be invoked)
+  override def bindSparkContext(sparkContext: SparkContext): Unit = {}
+
+  // Unsupported (but can be invoked)
+  override def bindSqlContext(sqlContext: SQLContext): Unit = {}
+
+  // Unsupported
+  override def interrupt(): Interpreter = ???
+
+  // Unsupported
+  override def bind(variableName: String, typeName: String, value: Any, modifiers: List[String]): Unit = ???
+
+  // Unsupported
+  override def addJars(jars: URL*): Unit = ???
+
+  // Unsupported
+  override def doQuietly[T](body: => T): T = ???
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala
new file mode 100644
index 0000000..2429dc4
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sparkr
+
+import com.ibm.spark.interpreter.broker.BrokerProcess
+import scala.collection.JavaConverters._
+
+/**
+ * Represents the R process used to evaluate SparkR code.
+ *
+ * @param sparkRBridge The bridge to use to retrieve kernel output streams
+ *                      and the Spark version to be verified
+ * @param sparkRProcessHandler The handler to use when the process fails or
+ *                             completes
+ * @param port The port to provide to the SparkR process to use to connect
+ *             back to the JVM
+ */
+class SparkRProcess(
+  private val sparkRBridge: SparkRBridge,
+  private val sparkRProcessHandler: SparkRProcessHandler,
+  private val port: Int
+) extends BrokerProcess(
+  processName = "Rscript",
+  entryResource = "kernelR/sparkr_runner.R",
+  otherResources = Seq("kernelR/sparkr_runner_utils.R", "sparkr_bundle.tar.gz"),
+  brokerBridge = sparkRBridge,
+  brokerProcessHandler = sparkRProcessHandler,
+  arguments = Seq(
+    "--default-packages=datasets,utils,grDevices,graphics,stats,methods"
+  )
+) {
+  override val brokerName: String = "SparkR"
+  private val sparkHome = Option(System.getenv("SPARK_HOME"))
+    .orElse(Option(System.getProperty("spark.home")))
+
+  assert(sparkHome.nonEmpty, "SparkR process requires Spark Home to be set!")
+
+  /**
+   * Creates a new process environment to be used for environment variable
+   * retrieval by the new process.
+   *
+   * @return The map of environment variables and their respective values
+   */
+  override protected def newProcessEnvironment(): Map[String, String] = {
+    val baseEnvironment = super.newProcessEnvironment()
+
+    // Note: Adding the new map values should override the old ones
+    baseEnvironment ++ Map(
+      "SPARK_HOME"                    -> sparkHome.get,
+      "EXISTING_SPARKR_BACKEND_PORT"  -> port.toString
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcessHandler.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcessHandler.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcessHandler.scala
new file mode 100644
index 0000000..d33d265
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcessHandler.scala
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sparkr
+
+import com.ibm.spark.interpreter.broker.BrokerProcessHandler
+
+/**
+ * Represents the handler for events triggered by the SparkR process.
+ *
+ * @param sparkRBridge The bridge to reset when the process fails or completes
+ * @param restartOnFailure If true, restarts the process if it fails
+ * @param restartOnCompletion If true, restarts the process if it completes
+ */
+class SparkRProcessHandler(
+  private val sparkRBridge: SparkRBridge,
+  private val restartOnFailure: Boolean,
+  private val restartOnCompletion: Boolean
+) extends BrokerProcessHandler(
+  sparkRBridge,
+  restartOnFailure,
+  restartOnCompletion
+) {
+  override val brokerName: String = "SparkR"
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala
new file mode 100644
index 0000000..71731a8
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sparkr
+
+import java.util.concurrent.{TimeUnit, Semaphore}
+
+import com.ibm.spark.interpreter.broker.BrokerService
+import com.ibm.spark.kernel.api.KernelLike
+import com.ibm.spark.kernel.interpreter.sparkr.SparkRTypes.{Code, CodeResults}
+import org.apache.spark.SparkContext
+import org.slf4j.LoggerFactory
+
+import scala.concurrent.{future, Future}
+
+/**
+ * Represents the service that provides the high-level interface between the
+ * JVM and R.
+ *
+ * @param rBackend The backend to start to communicate between the JVM and R
+ * @param sparkRBridge The bridge to use for communication between the JVM and R
+ * @param sparkRProcessHandler The handler used for events that occur with the
+ *                             SparkR process
+ */
+class SparkRService(
+  private val rBackend: ReflectiveRBackend,
+  private val sparkRBridge: SparkRBridge,
+  private val sparkRProcessHandler: SparkRProcessHandler
+) extends BrokerService {
+  private val logger = LoggerFactory.getLogger(this.getClass)
+  @volatile private var rBackendPort: Int = -1
+  @volatile private var _isRunning: Boolean = false
+  override def isRunning: Boolean = _isRunning
+
+  /** Represents the process used to execute R code via the bridge. */
+  private lazy val sparkRProcess: SparkRProcess = {
+    val p = new SparkRProcess(
+      sparkRBridge,
+      sparkRProcessHandler,
+      rBackendPort
+    )
+
+    // Update handlers to correctly reset and restart the process
+    sparkRProcessHandler.setResetMethod(message => {
+      p.stop()
+      sparkRBridge.state.reset(message)
+    })
+    sparkRProcessHandler.setRestartMethod(() => p.start())
+
+    p
+  }
+
+  /** Starts the SparkR service. */
+  override def start(): Unit = {
+    logger.debug("Initializing statically-accessible SparkR bridge")
+    SparkRBridge.sparkRBridge = sparkRBridge
+
+    val initialized = new Semaphore(0)
+    import scala.concurrent.ExecutionContext.Implicits.global
+    val rBackendRun = future {
+      logger.debug("Initializing RBackend")
+      rBackendPort = rBackend.init()
+      logger.debug(s"RBackend running on port $rBackendPort")
+      initialized.release()
+      logger.debug("Running RBackend")
+      rBackend.run()
+      logger.debug("RBackend has finished")
+    }
+
+    // Wait for backend to start before starting R process to connect
+    val backendTimeout =
+      sys.env.getOrElse("SPARKR_BACKEND_TIMEOUT", "120").toInt
+    if (initialized.tryAcquire(backendTimeout, TimeUnit.SECONDS)) {
+      // Start the R process used to execute code
+      logger.debug("Launching process to execute R code")
+      sparkRProcess.start()
+      _isRunning = true
+    } else {
+      // Unable to initialize, so throw an exception
+      throw new SparkRException(
+        s"Unable to initialize R backend in $backendTimeout seconds!")
+    }
+  }
+
+  /**
+   * Submits code to the SparkR service to be executed and return a result.
+   *
+   * @param code The code to execute
+   *
+   * @return The result as a future to eventually return
+   */
+  override def submitCode(code: Code): Future[CodeResults] = {
+    sparkRBridge.state.pushCode(code)
+  }
+
+  /** Stops the running SparkR service. */
+  override def stop(): Unit = {
+    // Stop the R process used to execute code
+    sparkRProcess.stop()
+
+    // Stop the server used as an entrypoint for R
+    rBackend.close()
+
+    // Clear the bridge
+    SparkRBridge.reset()
+
+    _isRunning = false
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRState.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRState.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRState.scala
new file mode 100644
index 0000000..60f67a3
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRState.scala
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sparkr
+
+import com.ibm.spark.interpreter.broker.BrokerState
+
+/**
+ * Represents the state structure of SparkR.
+ *
+ * @param maxQueuedCode The maximum amount of code to support being queued
+ *                      at the same time for SparkR execution
+ */
+class SparkRState(private val maxQueuedCode: Int)
+  extends BrokerState(maxQueuedCode)

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTransformer.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTransformer.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTransformer.scala
new file mode 100644
index 0000000..45c44c0
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTransformer.scala
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sparkr
+
+import com.ibm.spark.interpreter.broker.BrokerTransformer
+
+/**
+ * Represents the transformer used by SparkR.
+ */
+class SparkRTransformer extends BrokerTransformer

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTypes.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTypes.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTypes.scala
new file mode 100644
index 0000000..11f33c0
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTypes.scala
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sparkr
+
+import com.ibm.spark.interpreter.broker.BrokerTypesProvider
+
+/**
+ * Represents all types associated with the SparkR interface.
+ */
+object SparkRTypes extends BrokerTypesProvider

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/package.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/package.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/package.scala
new file mode 100644
index 0000000..872d376
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/package.scala
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter
+
+import com.ibm.spark.interpreter.broker.{BrokerCode, BrokerPromise}
+
+/**
+ * Contains aliases to broker types.
+ */
+package object sparkr {
+  /**
+   * Represents a promise made regarding the completion of SparkR code
+   * execution.
+   */
+  type SparkRPromise = BrokerPromise
+
+  /**
+   * Represents a block of SparkR code to be evaluated.
+   */
+  type SparkRCode = BrokerCode
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/magic/builtin/SparkR.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/magic/builtin/SparkR.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/magic/builtin/SparkR.scala
new file mode 100644
index 0000000..7eba136
--- /dev/null
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/magic/builtin/SparkR.scala
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.magic.builtin
+
+import com.ibm.spark.interpreter.{ExecuteError, ExecuteAborted}
+import com.ibm.spark.kernel.interpreter.sparkr.{SparkRInterpreter, SparkRException}
+import com.ibm.spark.kernel.protocol.v5.MIMEType
+import com.ibm.spark.magic.{CellMagicOutput, CellMagic}
+import com.ibm.spark.magic.dependencies.IncludeKernel
+
+/**
+ * Represents the magic interface to use the SparkR interpreter.
+ */
+class SparkR extends CellMagic with IncludeKernel {
+  override def execute(code: String): CellMagicOutput = {
+    val sparkR = kernel.interpreter("SparkR")
+
+    if (sparkR.isEmpty || sparkR.get == null)
+      throw new SparkRException("SparkR is not available!")
+
+    sparkR.get match {
+      case sparkRInterpreter: SparkRInterpreter =>
+        val (_, output) = sparkRInterpreter.interpret(code)
+        output match {
+          case Left(executeOutput) =>
+            CellMagicOutput(MIMEType.PlainText -> executeOutput)
+          case Right(executeFailure) => executeFailure match {
+            case executeAborted: ExecuteAborted =>
+              throw new SparkRException("SparkR code was aborted!")
+            case executeError: ExecuteError =>
+              throw new SparkRException(executeError.value)
+          }
+        }
+      case otherInterpreter =>
+        val className = otherInterpreter.getClass.getName
+        throw new SparkRException(s"Invalid SparkR interpreter: $className")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlException.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlException.scala b/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlException.scala
deleted file mode 100644
index 2c0b4d5..0000000
--- a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlException.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sql
-
-import com.ibm.spark.interpreter.broker.BrokerException
-
-/**
- * Represents a generic SQL exception.
- *
- * @param message The message to associate with the exception
- */
-class SqlException(message: String) extends BrokerException(message)
-

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlInterpreter.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlInterpreter.scala b/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlInterpreter.scala
deleted file mode 100644
index 889d4a6..0000000
--- a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlInterpreter.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sql
-
-import java.net.URL
-
-import com.ibm.spark.interpreter.{ExecuteFailure, ExecuteOutput, Interpreter}
-import com.ibm.spark.interpreter.Results.Result
-import com.ibm.spark.kernel.api.KernelLike
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
-
-import scala.concurrent.duration._
-import scala.concurrent.Await
-import scala.tools.nsc.interpreter.{OutputStream, InputStream}
-
-/**
- * Represents an interpreter interface to Spark SQL.
- */
-class SqlInterpreter() extends Interpreter {
-  private var _kernel: KernelLike = _
-  private lazy val sqlService = new SqlService(_kernel)
-  private lazy val sqlTransformer = new SqlTransformer
-
-  override def init(kernel: KernelLike): Interpreter = {
-    _kernel = kernel
-    this
-  }
-
-  /**
-   * Executes the provided code with the option to silence output.
-   * @param code The code to execute
-   * @param silent Whether or not to execute the code silently (no output)
-   * @return The success/failure of the interpretation and the output from the
-   *         execution or the failure
-   */
-  override def interpret(code: String, silent: Boolean):
-    (Result, Either[ExecuteOutput, ExecuteFailure]) =
-  {
-    if (!sqlService.isRunning) sqlService.start()
-
-    val futureResult = sqlTransformer.transformToInterpreterResult(
-      sqlService.submitCode(code)
-    )
-
-    Await.result(futureResult, Duration.Inf)
-  }
-
-  /**
-   * Starts the interpreter, initializing any internal state.
-   * @return A reference to the interpreter
-   */
-  override def start(): Interpreter = {
-    sqlService.start()
-
-    this
-  }
-
-  /**
-   * Stops the interpreter, removing any previous internal state.
-   * @return A reference to the interpreter
-   */
-  override def stop(): Interpreter = {
-    sqlService.stop()
-
-    this
-  }
-
-  /**
-   * Returns the class loader used by this interpreter.
-   *
-   * @return The runtime class loader used by this interpreter
-   */
-  override def classLoader: ClassLoader = this.getClass.getClassLoader
-
-  // Unsupported (but can be invoked)
-  override def lastExecutionVariableName: Option[String] = None
-
-  // Unsupported (but can be invoked)
-  override def read(variableName: String): Option[AnyRef] = None
-
-  // Unsupported (but can be invoked)
-  override def completion(code: String, pos: Int): (Int, List[String]) =
-    (pos, Nil)
-
-  // Unsupported
-  override def updatePrintStreams(in: InputStream, out: OutputStream, err: OutputStream): Unit = ???
-
-  // Unsupported
-  override def classServerURI: String = ""
-
-  // Unsupported (but can be invoked)
-  override def bindSparkContext(sparkContext: SparkContext): Unit = {}
-
-  // Unsupported (but can be invoked)
-  override def bindSqlContext(sqlContext: SQLContext): Unit = {}
-
-  // Unsupported
-  override def interrupt(): Interpreter = ???
-
-  // Unsupported
-  override def bind(variableName: String, typeName: String, value: Any, modifiers: List[String]): Unit = ???
-
-  // Unsupported
-  override def addJars(jars: URL*): Unit = ???
-
-  // Unsupported
-  override def doQuietly[T](body: => T): T = ???
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlService.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlService.scala b/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlService.scala
deleted file mode 100644
index 2f2fed6..0000000
--- a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlService.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sql
-
-import com.ibm.spark.kernel.api.KernelLike
-import java.io.ByteArrayOutputStream
-
-import com.ibm.spark.interpreter.broker.BrokerService
-import com.ibm.spark.kernel.interpreter.sql.SqlTypes._
-import org.apache.spark.sql.SQLContext
-
-import scala.concurrent.{Future, future}
-
-/**
- * Represents the service that provides the high-level interface between the
- * JVM and Spark SQL.
- *
- * @param kernel The SQL Context of Apache Spark to use to perform SQL
- *                   queries
- */
-class SqlService(private val kernel: KernelLike) extends BrokerService {
-  import scala.concurrent.ExecutionContext.Implicits.global
-
-  @volatile private var _isRunning: Boolean = false
-  override def isRunning: Boolean = _isRunning
-
-  /**
-   * Submits code to the broker service to be executed and return a result.
-   *
-   * @param code The code to execute
-   *
-   * @return The result as a future to eventually return
-   */
-  override def submitCode(code: Code): Future[CodeResults] = future {
-    println(s"Executing: '${code.trim}'")
-    val result = kernel.sqlContext.sql(code.trim)
-
-    // TODO: There is an internal method used for show called showString that
-    //       supposedly is only for the Python API, look into why
-    val stringOutput = {
-      val outputStream = new ByteArrayOutputStream()
-      Console.withOut(outputStream) {
-        // TODO: Provide some way to change the number of records shown
-        result.show(10)
-      }
-      outputStream.toString("UTF-8")
-    }
-
-    stringOutput
-  }
-
-  /** Stops the running broker service. */
-  override def stop(): Unit = _isRunning = false
-
-  /** Starts the broker service. */
-  override def start(): Unit = _isRunning = true
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTransformer.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTransformer.scala b/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTransformer.scala
deleted file mode 100644
index 114c97f..0000000
--- a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTransformer.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sql
-
-import com.ibm.spark.interpreter.broker.BrokerTransformer
-
-/**
- * Represents the transformer used by Apache SQL.
- */
-class SqlTransformer extends BrokerTransformer

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTypes.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTypes.scala b/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTypes.scala
deleted file mode 100644
index a2fbd10..0000000
--- a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTypes.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.sql
-
-import com.ibm.spark.interpreter.broker.BrokerTypesProvider
-
-/**
- * Represents all types associated with the SQL interface.
- */
-object SqlTypes extends BrokerTypesProvider

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/com/ibm/spark/magic/builtin/Sql.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/com/ibm/spark/magic/builtin/Sql.scala b/sql-interpreter/src/main/scala/com/ibm/spark/magic/builtin/Sql.scala
deleted file mode 100644
index a8f439c..0000000
--- a/sql-interpreter/src/main/scala/com/ibm/spark/magic/builtin/Sql.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.magic.builtin
-
-import com.ibm.spark.interpreter.{ExecuteError, ExecuteAborted}
-import com.ibm.spark.kernel.interpreter.sql.{SqlInterpreter, SqlException}
-import com.ibm.spark.kernel.protocol.v5.MIMEType
-import com.ibm.spark.magic.{CellMagicOutput, CellMagic}
-import com.ibm.spark.magic.dependencies.IncludeKernel
-
-/**
- * Represents the magic interface to use the SQL interpreter.
- */
-class Sql extends CellMagic with IncludeKernel {
-  override def execute(code: String): CellMagicOutput = {
-    val sparkR = kernel.interpreter("SQL")
-
-    if (sparkR.isEmpty || sparkR.get == null)
-      throw new SqlException("SQL is not available!")
-
-    sparkR.get match {
-      case sparkRInterpreter: SqlInterpreter =>
-        val (_, output) = sparkRInterpreter.interpret(code)
-        output match {
-          case Left(executeOutput) =>
-            CellMagicOutput(MIMEType.PlainText -> executeOutput)
-          case Right(executeFailure) => executeFailure match {
-            case executeAborted: ExecuteAborted =>
-              throw new SqlException("SQL code was aborted!")
-            case executeError: ExecuteError =>
-              throw new SqlException(executeError.value)
-          }
-        }
-      case otherInterpreter =>
-        val className = otherInterpreter.getClass.getName
-        throw new SqlException(s"Invalid SQL interpreter: $className")
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlException.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlException.scala b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlException.scala
new file mode 100644
index 0000000..2c0b4d5
--- /dev/null
+++ b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlException.scala
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sql
+
+import com.ibm.spark.interpreter.broker.BrokerException
+
+/**
+ * Represents a generic SQL exception.
+ *
+ * @param message The message to associate with the exception
+ */
+class SqlException(message: String) extends BrokerException(message)
+

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala
new file mode 100644
index 0000000..889d4a6
--- /dev/null
+++ b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sql
+
+import java.net.URL
+
+import com.ibm.spark.interpreter.{ExecuteFailure, ExecuteOutput, Interpreter}
+import com.ibm.spark.interpreter.Results.Result
+import com.ibm.spark.kernel.api.KernelLike
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+
+import scala.concurrent.duration._
+import scala.concurrent.Await
+import scala.tools.nsc.interpreter.{OutputStream, InputStream}
+
+/**
+ * Represents an interpreter interface to Spark SQL.
+ */
+class SqlInterpreter() extends Interpreter {
+  private var _kernel: KernelLike = _
+  private lazy val sqlService = new SqlService(_kernel)
+  private lazy val sqlTransformer = new SqlTransformer
+
+  override def init(kernel: KernelLike): Interpreter = {
+    _kernel = kernel
+    this
+  }
+
+  /**
+   * Executes the provided code with the option to silence output.
+   * @param code The code to execute
+   * @param silent Whether or not to execute the code silently (no output)
+   * @return The success/failure of the interpretation and the output from the
+   *         execution or the failure
+   */
+  override def interpret(code: String, silent: Boolean):
+    (Result, Either[ExecuteOutput, ExecuteFailure]) =
+  {
+    if (!sqlService.isRunning) sqlService.start()
+
+    val futureResult = sqlTransformer.transformToInterpreterResult(
+      sqlService.submitCode(code)
+    )
+
+    Await.result(futureResult, Duration.Inf)
+  }
+
+  /**
+   * Starts the interpreter, initializing any internal state.
+   * @return A reference to the interpreter
+   */
+  override def start(): Interpreter = {
+    sqlService.start()
+
+    this
+  }
+
+  /**
+   * Stops the interpreter, removing any previous internal state.
+   * @return A reference to the interpreter
+   */
+  override def stop(): Interpreter = {
+    sqlService.stop()
+
+    this
+  }
+
+  /**
+   * Returns the class loader used by this interpreter.
+   *
+   * @return The runtime class loader used by this interpreter
+   */
+  override def classLoader: ClassLoader = this.getClass.getClassLoader
+
+  // Unsupported (but can be invoked)
+  override def lastExecutionVariableName: Option[String] = None
+
+  // Unsupported (but can be invoked)
+  override def read(variableName: String): Option[AnyRef] = None
+
+  // Unsupported (but can be invoked)
+  override def completion(code: String, pos: Int): (Int, List[String]) =
+    (pos, Nil)
+
+  // Unsupported
+  override def updatePrintStreams(in: InputStream, out: OutputStream, err: OutputStream): Unit = ???
+
+  // Unsupported
+  override def classServerURI: String = ""
+
+  // Unsupported (but can be invoked)
+  override def bindSparkContext(sparkContext: SparkContext): Unit = {}
+
+  // Unsupported (but can be invoked)
+  override def bindSqlContext(sqlContext: SQLContext): Unit = {}
+
+  // Unsupported
+  override def interrupt(): Interpreter = ???
+
+  // Unsupported
+  override def bind(variableName: String, typeName: String, value: Any, modifiers: List[String]): Unit = ???
+
+  // Unsupported
+  override def addJars(jars: URL*): Unit = ???
+
+  // Unsupported
+  override def doQuietly[T](body: => T): T = ???
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlService.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlService.scala b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlService.scala
new file mode 100644
index 0000000..2f2fed6
--- /dev/null
+++ b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlService.scala
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sql
+
+import com.ibm.spark.kernel.api.KernelLike
+import java.io.ByteArrayOutputStream
+
+import com.ibm.spark.interpreter.broker.BrokerService
+import com.ibm.spark.kernel.interpreter.sql.SqlTypes._
+import org.apache.spark.sql.SQLContext
+
+import scala.concurrent.{Future, future}
+
+/**
+ * Represents the service that provides the high-level interface between the
+ * JVM and Spark SQL.
+ *
+ * @param kernel The SQL Context of Apache Spark to use to perform SQL
+ *                   queries
+ */
+class SqlService(private val kernel: KernelLike) extends BrokerService {
+  import scala.concurrent.ExecutionContext.Implicits.global
+
+  @volatile private var _isRunning: Boolean = false
+  override def isRunning: Boolean = _isRunning
+
+  /**
+   * Submits code to the broker service to be executed and return a result.
+   *
+   * @param code The code to execute
+   *
+   * @return The result as a future to eventually return
+   */
+  override def submitCode(code: Code): Future[CodeResults] = future {
+    println(s"Executing: '${code.trim}'")
+    val result = kernel.sqlContext.sql(code.trim)
+
+    // TODO: There is an internal method used for show called showString that
+    //       supposedly is only for the Python API, look into why
+    val stringOutput = {
+      val outputStream = new ByteArrayOutputStream()
+      Console.withOut(outputStream) {
+        // TODO: Provide some way to change the number of records shown
+        result.show(10)
+      }
+      outputStream.toString("UTF-8")
+    }
+
+    stringOutput
+  }
+
+  /** Stops the running broker service. */
+  override def stop(): Unit = _isRunning = false
+
+  /** Starts the broker service. */
+  override def start(): Unit = _isRunning = true
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTransformer.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTransformer.scala b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTransformer.scala
new file mode 100644
index 0000000..114c97f
--- /dev/null
+++ b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTransformer.scala
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sql
+
+import com.ibm.spark.interpreter.broker.BrokerTransformer
+
+/**
+ * Represents the transformer used by Apache SQL.
+ */
+class SqlTransformer extends BrokerTransformer

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTypes.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTypes.scala b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTypes.scala
new file mode 100644
index 0000000..a2fbd10
--- /dev/null
+++ b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTypes.scala
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.sql
+
+import com.ibm.spark.interpreter.broker.BrokerTypesProvider
+
+/**
+ * Represents all types associated with the SQL interface.
+ */
+object SqlTypes extends BrokerTypesProvider