You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lb...@apache.org on 2016/01/22 23:07:24 UTC

[16/51] [abbrv] incubator-toree git commit: Moved scala files to new locations based on new package

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/InterpreterActor.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/InterpreterActor.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/InterpreterActor.scala
new file mode 100644
index 0000000..b5ae174
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/InterpreterActor.scala
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.interpreter
+
+import java.io.OutputStream
+
+import akka.actor.{Actor, ActorRef, Props}
+import akka.pattern.{ask, pipe}
+import akka.util.Timeout
+import com.ibm.spark.interpreter.Interpreter
+import com.ibm.spark.kernel.protocol.v5.KernelMessage
+import com.ibm.spark.kernel.protocol.v5.interpreter.tasks._
+import com.ibm.spark.kernel.protocol.v5.content._
+import com.ibm.spark.interpreter._
+import com.ibm.spark.utils.LogLike
+
+import scala.concurrent.duration._
+
+object InterpreterActor {
+  def props(interpreter: Interpreter): Props =
+    Props(classOf[InterpreterActor], interpreter)
+}
+
+// TODO: Investigate restart sequence
+//
+// http://doc.akka.io/docs/akka/2.2.3/general/supervision.html
+//
+// "create new actor instance by invoking the originally provided factory again"
+//
+// Does this mean that the interpreter instance is not gc and is passed in?
+//
+class InterpreterActor(
+  interpreterTaskFactory: InterpreterTaskFactory
+) extends Actor with LogLike {
+  // NOTE: Required to provide the execution context for futures with akka
+  import context._
+
+  // NOTE: Required for ask (?) to function... maybe can define elsewhere?
+  implicit val timeout = Timeout(21474835.seconds)
+
+  //
+  // List of child actors that the interpreter contains
+  //
+  private var executeRequestTask: ActorRef = _
+  private var completeCodeTask: ActorRef = _
+
+  /**
+   * Initializes all child actors performing tasks for the interpreter.
+   */
+  override def preStart = {
+    executeRequestTask = interpreterTaskFactory.ExecuteRequestTask(
+      context, InterpreterChildActorType.ExecuteRequestTask.toString)
+    completeCodeTask = interpreterTaskFactory.CodeCompleteTask(
+      context, InterpreterChildActorType.CodeCompleteTask.toString)
+  }
+
+  override def receive: Receive = {
+    case (executeRequest: ExecuteRequest, parentMessage: KernelMessage,
+      outputStream: OutputStream) =>
+      val data = (executeRequest, parentMessage, outputStream)
+      (executeRequestTask ? data) recover {
+        case ex: Throwable =>
+          logger.error(s"Could not execute code ${executeRequest.code} because "
+            + s"of exception: ${ex.getMessage}")
+          Right(ExecuteError(
+            ex.getClass.getName,
+            ex.getLocalizedMessage,
+            ex.getStackTrace.map(_.toString).toList)
+          )
+      } pipeTo sender
+    case (completeRequest: CompleteRequest) =>
+      logger.debug(s"InterpreterActor requesting code completion for code " +
+        s"${completeRequest.code}")
+      (completeCodeTask ? completeRequest) recover {
+        case ex: Throwable =>
+          logger.error(s"Could not complete code ${completeRequest.code}: " +
+            s"${ex.getMessage}")
+          Right(ExecuteError(
+            ex.getClass.getName,
+            ex.getLocalizedMessage,
+            ex.getStackTrace.map(_.toString).toList)
+          )
+      } pipeTo sender
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/package.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/package.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/package.scala
new file mode 100644
index 0000000..6738520
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/package.scala
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5
+
+package object interpreter {
+  object InterpreterChildActorType extends Enumeration {
+    type InterpreterChildActorType = Value
+
+    val ExecuteRequestTask = Value("execute_request_task")
+    val CodeCompleteTask = Value("code_complete_task")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/CodeCompleteTaskActor.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/CodeCompleteTaskActor.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/CodeCompleteTaskActor.scala
new file mode 100644
index 0000000..87f37b0
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/CodeCompleteTaskActor.scala
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.interpreter.tasks
+
+import akka.actor.{Actor, Props}
+import com.ibm.spark.interpreter.Interpreter
+import com.ibm.spark.kernel.protocol.v5.content.CompleteRequest
+import com.ibm.spark.utils.LogLike
+
+object CodeCompleteTaskActor {
+  def props(interpreter: Interpreter): Props =
+    Props(classOf[CodeCompleteTaskActor], interpreter)
+}
+
+class CodeCompleteTaskActor(interpreter: Interpreter)
+  extends Actor with LogLike {
+  require(interpreter != null)
+
+  override def receive: Receive = {
+    case completeRequest: CompleteRequest =>
+      logger.debug("Invoking the interpreter completion")
+      sender ! interpreter.completion(completeRequest.code, completeRequest.cursor_pos)
+    case _ =>
+      sender ! "Unknown message" // TODO: Provide a failure message type to be passed around?
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/ExecuteRequestTaskActor.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/ExecuteRequestTaskActor.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/ExecuteRequestTaskActor.scala
new file mode 100644
index 0000000..dc22b21
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/ExecuteRequestTaskActor.scala
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.interpreter.tasks
+
+import java.io.OutputStream
+
+import akka.actor.{Props, Actor}
+import com.ibm.spark.global.StreamState
+import com.ibm.spark.interpreter.{ExecuteAborted, Results, ExecuteError, Interpreter}
+import com.ibm.spark.kernel.api.StreamInfo
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.kernel.protocol.v5.content._
+import com.ibm.spark.security.KernelSecurityManager
+import com.ibm.spark.utils.{ConditionalOutputStream, MultiOutputStream, LogLike}
+
+object ExecuteRequestTaskActor {
+  def props(interpreter: Interpreter): Props =
+    Props(classOf[ExecuteRequestTaskActor], interpreter)
+}
+
+class ExecuteRequestTaskActor(interpreter: Interpreter) extends Actor with LogLike {
+  require(interpreter != null)
+
+  override def receive: Receive = {
+    case (executeRequest: ExecuteRequest, parentMessage: KernelMessage,
+      outputStream: OutputStream) =>
+      // If the cell is not empty, then interpret.
+      if(executeRequest.code.trim != "") {
+        //interpreter.updatePrintStreams(System.in, outputStream, outputStream)
+        val newInputStream = System.in
+        val newOutputStream = buildOutputStream(outputStream, System.out)
+        val newErrorStream = buildOutputStream(outputStream, System.err)
+
+        // Update our global streams to be used by future output
+        // NOTE: This is not async-safe! This is expected to be broken when
+        //       running asynchronously! Use an alternative for data
+        //       communication!
+        StreamState.setStreams(newInputStream, newOutputStream, newErrorStream)
+
+        val (success, result) = {
+            // Add our parent message with StreamInfo type included
+//            interpreter.doQuietly {
+//              interpreter.bind(
+//                "$streamInfo",
+//                "com.ibm.spark.kernel.api.StreamInfo",
+//                new KernelMessage(
+//                  ids = parentMessage.ids,
+//                  signature = parentMessage.signature,
+//                  header = parentMessage.header,
+//                  parentHeader = parentMessage.parentHeader,
+//                  metadata = parentMessage.metadata,
+//                  contentString = parentMessage.contentString
+//                ) with StreamInfo,
+//                List( """@transient""", """implicit""")
+//              )
+              // TODO: Think of a cleaner wrapper to handle updating the Console
+              //       input and output streams
+//              interpreter.interpret(
+//                """val $updateOutput = {
+//                Console.setIn(System.in)
+//                Console.setOut(System.out)
+//                Console.setErr(System.err)
+//              }""".trim)
+//            }
+            interpreter.interpret(executeRequest.code.trim)
+          }
+
+        logger.debug(s"Interpreter execution result was ${success}")
+        success match {
+          case Results.Success =>
+            val output = result.left.get
+            sender ! Left(output)
+          case Results.Error =>
+            val error = result.right.get
+            sender ! Right(error)
+          case Results.Aborted =>
+            sender ! Right(new ExecuteAborted)
+          case Results.Incomplete =>
+            // If we get an incomplete it's most likely a syntax error, so
+            // let the user know.
+            sender ! Right(new ExecuteError("Syntax Error.", "", List()))
+        }
+      } else {
+        // If we get empty code from a cell then just return ExecuteReplyOk
+        sender ! Left("")
+      }
+    case unknownValue =>
+      logger.warn(s"Received unknown message type ${unknownValue}")
+      sender ! "Unknown message" // TODO: Provide a failure message type to be passed around?
+  }
+
+  private def buildOutputStream(
+    newOutput: OutputStream,
+    defaultOutput: OutputStream
+  ) = {
+    def isRestrictedThread = {
+      val currentGroup = Thread.currentThread().getThreadGroup
+      val restrictedGroupName =
+        KernelSecurityManager.RestrictedGroupName
+
+      currentGroup != null && currentGroup.getName == restrictedGroupName
+    }
+
+    new MultiOutputStream(List[OutputStream](
+      new ConditionalOutputStream(newOutput, isRestrictedThread),
+      new ConditionalOutputStream(defaultOutput, !isRestrictedThread)
+    ))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/InterpreterTaskFactory.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/InterpreterTaskFactory.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/InterpreterTaskFactory.scala
new file mode 100644
index 0000000..5f5a7ad
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/InterpreterTaskFactory.scala
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.interpreter.tasks
+
+import akka.actor.{ActorRefFactory, ActorRef}
+import com.ibm.spark.interpreter.Interpreter
+
+class InterpreterTaskFactory(interpreter: Interpreter) {
+  /**
+   * Creates a new actor representing this specific task.
+   * @param actorRefFactory The factory used to task actor will belong
+   * @return The ActorRef created for the task
+   */
+  def ExecuteRequestTask(actorRefFactory: ActorRefFactory, name: String): ActorRef =
+    actorRefFactory.actorOf(ExecuteRequestTaskActor.props(interpreter), name)
+
+  /**
+   *
+   */
+  def CodeCompleteTask(actorRefFactory: ActorRefFactory, name: String): ActorRef =
+    actorRefFactory.actorOf(CodeCompleteTaskActor.props(interpreter), name)
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/ActorLoader.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/ActorLoader.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/ActorLoader.scala
new file mode 100644
index 0000000..171934d
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/ActorLoader.scala
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.kernel
+
+import akka.actor.{ActorRefFactory, ActorSelection}
+
+/**
+ * This trait defines the interface for loading actors based on some value
+ * (enum, attribute, etc...). The thought is to allow external consumers
+ * acquire actors through a common interface, minimizing the spread of the
+ * logic about the Actors, ActorSystem, and other similar concepts.
+ */
+trait ActorLoader {
+  /**
+   * This method is meant to find an actor associated with an enum value. This
+   * enum value can map to an actor associated with handling a specific kernel
+   * message, a socket type, or other functionality.
+   *
+   * @param actorEnum The enum value used to load the actor
+   *
+   * @return An ActorSelection to pass messages to
+   */
+  def load(actorEnum: Enumeration#Value): ActorSelection
+}
+
+case class SimpleActorLoader(actorRefFactory: ActorRefFactory)
+  extends ActorLoader
+{
+  private val userActorDirectory: String = "/user/%s"
+
+  override def load(actorEnum: Enumeration#Value): ActorSelection = {
+    actorRefFactory.actorSelection(
+      userActorDirectory.format(actorEnum.toString)
+    )
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/Utilities.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/Utilities.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/Utilities.scala
new file mode 100644
index 0000000..73cf1b1
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/Utilities.scala
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.kernel
+
+import java.nio.charset.Charset
+
+import akka.util.{ByteString, Timeout}
+import com.ibm.spark.communication.ZMQMessage
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.utils.LogLike
+import play.api.data.validation.ValidationError
+import play.api.libs.json.{JsPath, Json, Reads}
+
+import scala.concurrent.duration._
+
+object Utilities extends LogLike {
+  //
+  // NOTE: This is brought in to remove feature warnings regarding the use of
+  //       implicit conversions regarding the following:
+  //
+  //       1. ByteStringToString
+  //       2. ZMQMessageToKernelMessage
+  //
+  import scala.language.implicitConversions
+
+  /**
+   * This timeout needs to be defined for the Akka asks to timeout
+   */
+  implicit val timeout = Timeout(21474835.seconds)
+
+  implicit def ByteStringToString(byteString : ByteString) : String = {
+    new String(byteString.toArray, Charset.forName("UTF-8"))
+  }
+
+  implicit def StringToByteString(string : String) : ByteString = {
+    ByteString(string.getBytes)
+  }
+
+  implicit def ZMQMessageToKernelMessage(message: ZMQMessage): KernelMessage = {
+    val delimiterIndex: Int =
+      message.frames.indexOf(ByteString("<IDS|MSG>".getBytes))
+    //  TODO Handle the case where there is no delimiter
+    val ids: Seq[String] =
+      message.frames.take(delimiterIndex).map(
+        (byteString : ByteString) =>  { new String(byteString.toArray) }
+      )
+    val header = Json.parse(message.frames(delimiterIndex + 2)).as[Header]
+    // TODO: Investigate better solution than setting parentHeader to null for {}
+    val parentHeader = parseAndHandle(message.frames(delimiterIndex + 3),
+                                  ParentHeader.headerReads,
+                                  handler = (valid: ParentHeader) => valid,
+                                  errHandler = _ => null
+    )
+    val metadata = Json.parse(message.frames(delimiterIndex + 4)).as[Metadata]
+
+    KMBuilder().withIds(ids.toList)
+               .withSignature(message.frame(delimiterIndex + 1))
+               .withHeader(header)
+               .withParentHeader(parentHeader)
+               .withMetadata(metadata)
+               .withContentString(message.frame(delimiterIndex + 5)).build(false)
+  }
+
+  implicit def KernelMessageToZMQMessage(kernelMessage : KernelMessage) : ZMQMessage = {
+    val frames: scala.collection.mutable.ListBuffer[ByteString] = scala.collection.mutable.ListBuffer()
+    kernelMessage.ids.map((id : String) => frames += id )
+    frames += "<IDS|MSG>"
+    frames += kernelMessage.signature
+    frames += Json.toJson(kernelMessage.header).toString()
+    frames += Json.toJson(kernelMessage.parentHeader).toString()
+    frames += Json.toJson(kernelMessage.metadata).toString
+    frames += kernelMessage.contentString
+    ZMQMessage(frames  : _*)
+  }
+
+  def parseAndHandle[T, U](json: String, reads: Reads[T],
+                           handler: T => U) : U = {
+    parseAndHandle(json, reads, handler,
+      (invalid: Seq[(JsPath, Seq[ValidationError])]) => {
+        logger.error(s"Could not parse JSON, ${json}")
+        throw new Throwable(s"Could not parse JSON, ${json}")
+      }
+    )
+  }
+
+  def parseAndHandle[T, U](json: String, reads: Reads[T],
+                           handler: T => U,
+                           errHandler: Seq[(JsPath, Seq[ValidationError])] => U) : U = {
+    Json.parse(json).validate[T](reads).fold(
+      errHandler,
+      (content: T) => handler(content)
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Control.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Control.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Control.scala
new file mode 100644
index 0000000..df42b03
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Control.scala
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.kernel.socket
+
+import com.ibm.spark.kernel.protocol.v5.SystemActorType
+import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
+
+/**
+ * The server endpoint for control messages specified in the IPython Kernel Spec
+ * @param socketFactory A factory to create the ZeroMQ socket connection
+ * @param actorLoader The actor loader to use to load the relay for kernel
+ *                    messages
+ */
+class Control(socketFactory: SocketFactory, actorLoader: ActorLoader)
+  extends ZeromqKernelMessageSocket(
+    socketFactory.Control,
+    () => actorLoader.load(SystemActorType.KernelMessageRelay)
+  )
+{
+  logger.trace("Created new Control actor")
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Heartbeat.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Heartbeat.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Heartbeat.scala
new file mode 100644
index 0000000..d17c360
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Heartbeat.scala
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.kernel.socket
+
+import akka.actor.Actor
+import akka.util.ByteString
+import com.ibm.spark.communication.ZMQMessage
+import com.ibm.spark.utils.LogLike
+
+/**
+ * The server endpoint for heartbeat messages specified in the IPython Kernel Spec
+ * @param socketFactory A factory to create the ZeroMQ socket connection
+ */
+class Heartbeat(socketFactory : SocketFactory) extends Actor with LogLike {
+  logger.debug("Created new Heartbeat actor")
+  val socket = socketFactory.Heartbeat(context.system, self)
+
+  override def receive: Receive = {
+    case message: ZMQMessage =>
+      logger.trace("Heartbeat received message: " +
+        message.frames.map((byteString: ByteString) =>
+          new String(byteString.toArray)).mkString("\n"))
+      socket ! message
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/IOPub.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/IOPub.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/IOPub.scala
new file mode 100644
index 0000000..d2ff8e9
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/IOPub.scala
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.kernel.socket
+
+import akka.actor.Actor
+import akka.util.ByteString
+import com.ibm.spark.communication.ZMQMessage
+import com.ibm.spark.communication.utils.OrderedSupport
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.kernel.protocol.v5.kernel.Utilities
+import Utilities._
+import com.ibm.spark.utils.{MessageLogSupport, LogLike}
+
+/**
+ * The server endpoint for IOPub messages specified in the IPython Kernel Spec
+ * @param socketFactory A factory to create the ZeroMQ socket connection
+ */
+class IOPub(socketFactory: SocketFactory)
+  extends Actor with MessageLogSupport with OrderedSupport
+{
+  logger.trace("Created new IOPub actor")
+  val socket = socketFactory.IOPub(context.system)
+  override def receive: Receive = {
+    case message: KernelMessage => withProcessing {
+      val zmqMessage: ZMQMessage = message
+      logMessage(message)
+      socket ! zmqMessage
+    }
+  }
+
+  /**
+   * Defines the types that will be stashed by {@link #waiting() waiting}
+   * while the Actor is in processing state.
+   * @return
+   */
+  override def orderedTypes(): Seq[Class[_]] = Seq(classOf[KernelMessage])
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Shell.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Shell.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Shell.scala
new file mode 100644
index 0000000..b5c4bfb
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Shell.scala
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.kernel.socket
+
+import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
+import com.ibm.spark.kernel.protocol.v5.SystemActorType
+
+/**
+ * The server endpoint for shell messages specified in the IPython Kernel Spec
+ * @param socketFactory A factory to create the ZeroMQ socket connection
+ * @param actorLoader The actor loader to use to load the relay for kernel
+ *                    messages
+ */
+class Shell(socketFactory: SocketFactory, actorLoader: ActorLoader)
+  extends ZeromqKernelMessageSocket(
+    socketFactory.Shell,
+    () => actorLoader.load(SystemActorType.KernelMessageRelay)
+  )
+{
+  logger.trace("Created new Shell actor")
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketConfig.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketConfig.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketConfig.scala
new file mode 100644
index 0000000..5fe3e39
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketConfig.scala
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.kernel.socket
+
+import com.typesafe.config.Config
+import play.api.libs.json.Json
+
+case class SocketConfig (
+  stdin_port: Int,
+  control_port: Int,
+  hb_port: Int,
+  shell_port: Int,
+  iopub_port: Int,
+  ip : String,
+  transport: String,
+  signature_scheme: String,
+  key: String
+)
+
+object SocketConfig {
+  implicit val socketConfigReads = Json.reads[SocketConfig]
+  implicit val socketConfigWrites = Json.writes[SocketConfig]
+
+  def fromConfig(config: Config) = {
+    new SocketConfig(
+      config.getInt("stdin_port"),
+      config.getInt("control_port"),
+      config.getInt("hb_port"),
+      config.getInt("shell_port"),
+      config.getInt("iopub_port"),
+      config.getString("ip"),
+      config.getString("transport"),
+      config.getString("signature_scheme"),
+      config.getString("key")
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketConnection.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketConnection.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketConnection.scala
new file mode 100644
index 0000000..865c383
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketConnection.scala
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.kernel.socket
+
+object SocketConnection {
+  def apply(protocol: String, ip: String, port: Int) = new SocketConnection(protocol, ip, port)
+}
+
+/**
+ * Represent a connection string for a socket
+ * @param protocol The protocol portion of the connection (e.g. tcp, akka, udp)
+ * @param ip The hostname or ip address to bind on (e.g. *, myhost, 127.0.0.1)
+ * @param port The port for the socket to listen on
+ */
+class SocketConnection(protocol: String, ip: String, port: Int) {
+  private val SocketConnectionString : String = "%s://%s:%d"
+
+  override def toString: String = {
+    SocketConnectionString.format(protocol, ip, port)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketFactory.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketFactory.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketFactory.scala
new file mode 100644
index 0000000..ef2001f
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketFactory.scala
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.kernel.socket
+
+import akka.actor.{Props, ActorRef, ActorSystem}
+import com.ibm.spark.communication.actors.{RouterSocketActor, RepSocketActor, PubSocketActor}
+
+object SocketFactory {
+  def apply(socketConfig: SocketConfig) = {
+    new SocketFactory(socketConfig)
+  }
+}
+
+/**
+ * A Factory class to provide various socket connections for IPython Kernel Spec
+ * @param socketConfig The configuration for the sockets to be properly
+ *                     instantiated
+ */
+class SocketFactory(socketConfig: SocketConfig) {
+  val HeartbeatConnection = SocketConnection(
+    socketConfig.transport, socketConfig.ip, socketConfig.hb_port)
+  val ShellConnection = SocketConnection(
+    socketConfig.transport, socketConfig.ip, socketConfig.shell_port)
+  val ControlConnection = SocketConnection(
+    socketConfig.transport, socketConfig.ip, socketConfig.control_port)
+  val IOPubConnection = SocketConnection(
+    socketConfig.transport, socketConfig.ip, socketConfig.iopub_port)
+  val StdinConnection = SocketConnection(
+    socketConfig.transport, socketConfig.ip, socketConfig.stdin_port)
+
+  /**
+   * Creates a ZeroMQ reply socket representing the server endpoint for
+   * heartbeat messages
+   * @param system The actor system the socket actor will belong
+   * @param listener The actor who will receive
+   * @return The ActorRef created for the socket connection
+   */
+  def Heartbeat(system: ActorSystem, listener: ActorRef) : ActorRef =
+    system.actorOf(Props(classOf[RepSocketActor], HeartbeatConnection.toString, listener))
+//    ZeroMQExtension(system).newRepSocket(
+//      Array(Listener(listener), Bind(HeartbeatConnection.toString))
+//    )
+
+  /**
+   * Creates a ZeroMQ reply socket representing the server endpoint for shell
+   * messages
+   * @param system The actor system the socket actor will belong
+   * @param listener The actor who will receive
+   * @return The ActorRef created for the socket connection
+   */
+  def Shell(system: ActorSystem, listener: ActorRef) : ActorRef =
+    system.actorOf(Props(classOf[RouterSocketActor], ShellConnection.toString, listener))
+//    ZeroMQExtension(system).newRouterSocket(
+//      Array(Listener(listener), Bind(ShellConnection.toString))
+//    )
+
+  /**
+   * Creates a ZeroMQ reply socket representing the server endpoint for control
+   * messages
+   * @param system The actor system the socket actor will belong
+   * @param listener The actor who will receive
+   * @return The ActorRef created for the socket connection
+   */
+  def Control(system: ActorSystem, listener: ActorRef) : ActorRef =
+    system.actorOf(Props(classOf[RouterSocketActor], ControlConnection.toString, listener))
+
+  /**
+   * Creates a ZeroMQ reply socket representing the server endpoint for stdin
+   * messages
+   * @param system The actor system the socket actor will belong
+   * @param listener The actor who will receive
+   * @return The ActorRef created for the socket connection
+   */
+  def Stdin(system: ActorSystem, listener: ActorRef) : ActorRef =
+    system.actorOf(Props(classOf[RouterSocketActor], StdinConnection.toString, listener))
+//    ZeroMQExtension(system).newRouterSocket(
+//      Array(Listener(listener), Bind(StdinConnection.toString))
+//    )
+
+  /**
+   * Creates a ZeroMQ reply socket representing the server endpoint for IOPub
+   * messages
+   * @param system The actor system the socket actor will belong
+   * @return The ActorRef created for the socket connection
+   */
+  def IOPub(system: ActorSystem) : ActorRef =
+    system.actorOf(Props(classOf[PubSocketActor], IOPubConnection.toString))
+//    ZeroMQExtension(system).newPubSocket(
+//      Bind(IOPubConnection.toString)
+//    )
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Stdin.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Stdin.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Stdin.scala
new file mode 100644
index 0000000..261151e
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Stdin.scala
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.kernel.socket
+
+import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
+import com.ibm.spark.kernel.protocol.v5.SystemActorType
+
+/**
+ * The server endpoint for stdin messages specified in the IPython Kernel Spec
+ * @param socketFactory A factory to create the ZeroMQ socket connection
+ * @param actorLoader The actor loader to use to load the relay for kernel
+ *                    messages
+ */
+class Stdin(socketFactory: SocketFactory, actorLoader: ActorLoader)
+  extends ZeromqKernelMessageSocket(
+    socketFactory.Stdin,
+    () => actorLoader.load(SystemActorType.KernelMessageRelay)
+  )
+{
+  logger.trace("Created new Stdin actor")
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/ZeromqKernelMessageSocket.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/ZeromqKernelMessageSocket.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/ZeromqKernelMessageSocket.scala
new file mode 100644
index 0000000..3d02f0b
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/ZeromqKernelMessageSocket.scala
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.kernel.socket
+
+import java.nio.charset.Charset
+
+import akka.actor.{ActorSelection, ActorSystem, ActorRef, Actor}
+import akka.util.ByteString
+import com.ibm.spark.communication.ZMQMessage
+
+//import com.ibm.spark.kernel.protocol.v5.kernel.ZMQMessage
+import com.ibm.spark.kernel.protocol.v5.KernelMessage
+import com.ibm.spark.kernel.protocol.v5.kernel.Utilities._
+import com.ibm.spark.utils.MessageLogSupport
+
+/**
+ * Represents a generic socket geared toward two-way communication using
+ * ZeroMQ and KernelMessage structures.
+ * @param actorSocketFunc The function used to retrieve the actor for outgoing
+ *                        communication via sockets
+ * @param actorForwardFunc The function used to retrieve the actor for incoming
+ *                         kernel messages
+ */
+abstract class ZeromqKernelMessageSocket(
+  actorSocketFunc: (ActorSystem, ActorRef) => ActorRef,
+  actorForwardFunc: () => ActorSelection
+) extends Actor with MessageLogSupport {
+  val actorSocketRef = actorSocketFunc(context.system, self)
+  val actorForwardRef = actorForwardFunc()
+
+  override def receive: Receive = {
+    case message: ZMQMessage =>
+      val kernelMessage: KernelMessage = message
+      logMessage(kernelMessage)
+
+      // Grab the strings to use for signature verification
+      val zmqStrings = message.frames.map((byteString: ByteString) =>
+        new String(byteString.toArray, Charset.forName("UTF-8"))
+      ).takeRight(4) // TODO: This assumes NO extra buffers, refactor?
+
+      // Forward along our message (along with the strings used for
+      // signatures)
+      actorForwardRef ! ((zmqStrings, kernelMessage))
+
+    case message: KernelMessage =>
+      val zmqMessage: ZMQMessage = message
+      logMessage(message)
+      actorSocketRef ! zmqMessage
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/magic/MagicParser.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/magic/MagicParser.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/magic/MagicParser.scala
new file mode 100644
index 0000000..a24c062
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/magic/MagicParser.scala
@@ -0,0 +1,145 @@
+package com.ibm.spark.kernel.protocol.v5.magic
+
+import com.ibm.spark.magic.MagicLoader
+
+class MagicParser(magicLoader: MagicLoader) {
+  private val magicRegex = """^[%]{1,2}(\w+)""".r
+  protected[magic] val kernelObjectName = "kernel.magics"
+
+  /**
+   * Determines whether a given line of code represents a line magic.
+   * @param codeLine a single line of code
+   * @return
+   */
+  private def isLineMagic(codeLine: String) = codeLine.startsWith("%") &&
+    !isCellMagic(codeLine)
+
+  /**
+   * Determines whether a given string of code represents a cell magic.
+   * @param codeBlob a string of code separated by newlines
+   * @return
+   */
+  private def isCellMagic(codeBlob: String) = codeBlob.startsWith("%%")
+
+  /**
+   * Finds the first occurrence of a "magic string" i.e. "%%magic" or "%magic"
+   * in a given code string, and separates the magic name from the code that
+   * follows it.
+   *
+   * E.g.
+   * "%magic foo bar" -> ("magic", "foo bar")
+   * @param codeBlob a string of code separated by newlines
+   * @return (magicName, args)
+   */
+  protected[magic] def parseMagic(codeBlob: String): Option[(String, String)] = {
+    val matchData =
+      magicRegex.findFirstMatchIn(codeBlob)
+
+    matchData match {
+      case Some(m) => Some((m.group(1), m.after(1).toString.trim))
+      case None => None
+    }
+  }
+
+  /**
+   * Given a line of code representing a magic invocation determines whether
+   * the magic has an implementation.
+   * @param codeLine a single line of code
+   * @return true if the magic exists and is a line magic
+   */
+  protected[magic] def isValidLineMagic(codeLine: String): Boolean = {
+    parseMagic(codeLine) match {
+      case Some((magicName, _)) =>
+        isLineMagic(codeLine) && magicLoader.hasLineMagic(magicName)
+      case None => false
+    }
+  }
+
+  /**
+   * Given a blob of code, finds any magic invocations of magics that don't
+   * exist.
+   * @param codeBlob a string of code separated by newlines
+   * @return invalid magic names from the given code blob
+   */
+  protected[magic] def parseOutInvalidMagics(codeBlob: String): List[String] = {
+    val lineMagics = codeBlob.split("\n").toList.filter(isLineMagic)
+    lineMagics.filterNot(isValidLineMagic).map(line => {
+      val (magicName, _) = parseMagic(line).get
+      magicName
+    })
+  }
+
+  /**
+   * Formats a given magic name and args to code for a kernel method call.
+   * @param magicName the name of the magic
+   * @param args the arguments to the magic
+   * @return equivalent kernel method call
+   */
+  protected[magic] def substitute(magicName: String, args: String): String =
+    s"""$kernelObjectName.$magicName(\"\"\"$args\"\"\")"""
+
+  /**
+   * Formats a given line of code representing a line magic invocation into an
+   * equivalent kernel object call if the magic invocation is valid.
+   * @param codeLine the line of code to convert.
+   * @return a substituted line of code if valid else the original line
+   */
+  protected[magic] def substituteLine(codeLine: String): String = {
+    isValidLineMagic(codeLine) match {
+      case true =>
+        val (magicName, args) = parseMagic(codeLine).get
+        substitute(magicName, args)
+      case false => codeLine
+    }
+  }
+
+  /**
+   * Formats a given code blob representing a cell magic invocation into an
+   * equivalent kernel object call if the cell magic invocation is valid. An
+   * error message is returned if not.
+   * @param codeBlob the blob of code representing a cell magic invocation
+   * @return Left(the substituted code) or Right(error message)
+   */
+  protected[magic] def parseCell(codeBlob: String): Either[String, String] = {
+    parseMagic(codeBlob.trim) match {
+      case Some((cellMagicName, args)) =>
+        magicLoader.hasCellMagic(cellMagicName) match {
+          case true => Left(substitute(cellMagicName, args))
+          case false => Right(s"Magic $cellMagicName does not exist!")
+        }
+      case None => Left(codeBlob)
+    }
+  }
+
+  /**
+   * Parses all lines in a given code blob and either substitutes equivalent
+   * kernel object calls for each line magic in the code blob OR returns
+   * an error message if any of the line magic invocations were invalid.
+   * @param codeBlob a string of code separated by newlines
+   * @return Left(code blob with substitutions) or Right(error message)
+   */
+  protected[magic] def parseLines(codeBlob: String): Either[String, String] = {
+    val invalidMagics = parseOutInvalidMagics(codeBlob.trim)
+    invalidMagics match {
+      case Nil =>
+        val substitutedCode = codeBlob.trim.split("\n").map(substituteLine)
+        Left(substitutedCode.mkString("\n"))
+      case _ =>
+        Right(s"Magics [${invalidMagics.mkString(", ")}] do not exist!")
+    }
+  }
+
+  /**
+   * Parses a given code blob and returns an equivalent blob with substitutions
+   * for magic invocations, if any, or an error string.
+   * @param codeBlob the blob of code to parse
+   * @return Left(parsed code) or Right(error message)
+   */
+  def parse(codeBlob: String): Either[String, String] = {
+    val trimCodeBlob = codeBlob.trim
+    isCellMagic(trimCodeBlob) match {
+      case true => parseCell(trimCodeBlob)
+      case false => parseLines(trimCodeBlob)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/magic/PostProcessor.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/magic/PostProcessor.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/magic/PostProcessor.scala
new file mode 100644
index 0000000..73c3c9f
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/magic/PostProcessor.scala
@@ -0,0 +1,35 @@
+package com.ibm.spark.kernel.protocol.v5.magic
+
+import com.ibm.spark.interpreter.{ExecuteOutput, Interpreter}
+import com.ibm.spark.kernel.protocol.v5.{Data, MIMEType}
+import com.ibm.spark.magic.{CellMagicOutput, LineMagicOutput}
+import com.ibm.spark.utils.LogLike
+
+class PostProcessor(interpreter: Interpreter) extends LogLike {
+  val defaultErr = "Something went wrong in postprocessor!"
+
+  def process(codeOutput: ExecuteOutput): Data = {
+    interpreter.lastExecutionVariableName.flatMap(interpreter.read) match {
+      case Some(l: Left[_, _]) => matchCellMagic(codeOutput, l)
+      case Some(r: Right[_, _]) => matchLineMagic(codeOutput, r)
+      case _ => Data(MIMEType.PlainText -> codeOutput)
+    }
+  }
+
+  protected[magic] def matchCellMagic(code: String, l: Left[_,_]) =
+    l.left.getOrElse(None) match {
+      case cmo: CellMagicOutput => cmo
+      case _ => Data(MIMEType.PlainText -> code)
+    }
+
+  protected[magic] def matchLineMagic(code: String, r: Right[_,_]) =
+    r.right.getOrElse(None) match {
+      case lmo: LineMagicOutput => processLineMagic(code)
+      case _ => Data(MIMEType.PlainText -> code)
+    }
+
+  protected[magic] def processLineMagic(code: String): Data = {
+    val parts = code.split("\n")
+    Data(MIMEType.PlainText -> parts.take(parts.size - 1).mkString("\n"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/relay/ExecuteRequestRelay.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/relay/ExecuteRequestRelay.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/relay/ExecuteRequestRelay.scala
new file mode 100644
index 0000000..a1b846a
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/relay/ExecuteRequestRelay.scala
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.relay
+
+import java.io.OutputStream
+
+import akka.actor.Actor
+import akka.pattern._
+import akka.util.Timeout
+import com.ibm.spark.interpreter.{ExecuteAborted, ExecuteError, ExecuteFailure, ExecuteOutput}
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.kernel.protocol.v5.content._
+import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
+import com.ibm.spark.kernel.protocol.v5.magic.{PostProcessor, MagicParser}
+import com.ibm.spark.magic.MagicLoader
+import com.ibm.spark.utils.LogLike
+
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+case class ExecuteRequestRelay(
+  actorLoader: ActorLoader,
+  magicLoader: MagicLoader,
+  magicParser: MagicParser,
+  postProcessor: PostProcessor
+)
+  extends Actor with LogLike
+{
+  import context._
+  implicit val timeout = Timeout(21474835.seconds)
+
+  /**
+   * Takes an ExecuteFailure and (ExecuteReply, ExecuteResult) with contents
+   * dictated by the type of failure (either an error or an abort).
+   * @param failure the failure
+   * @return (ExecuteReply, ExecuteResult)
+   */
+  private def failureMatch(failure: ExecuteFailure) =
+    failure match {
+      case err: ExecuteError =>
+        val error = ExecuteReplyError(
+          1, Some(err.name), Some(err.value), Some(err.stackTrace)
+        )
+        val result =
+          ExecuteResult(1, Data(MIMEType.PlainText -> err.toString), Metadata())
+        (error, result)
+
+      case _: ExecuteAborted =>
+        val abort = ExecuteReplyAbort(1)
+        val result = ExecuteResult(1, Data(), Metadata())
+        (abort, result)
+    }
+
+  /**
+   * Packages the response into an ExecuteReply,ExecuteResult tuple.
+   * @param future The future containing either the output or failure
+   * @return The tuple representing the proper response
+   */
+  private def packageFutureResponse(
+    future: Future[Either[ExecuteOutput, ExecuteFailure]]
+  ): Future[(ExecuteReply, ExecuteResult)] = future.map { value =>
+    if (value.isLeft) {
+      val output = value.left.get
+      val data = postProcessor.process(output)
+      (
+        ExecuteReplyOk(1, Some(Payloads()), Some(UserExpressions())),
+        ExecuteResult(1, data, Metadata())
+      )
+    } else {
+      failureMatch(value.right.get)
+    }
+  }
+
+  override def receive: Receive = {
+    case (executeRequest: ExecuteRequest, parentMessage: KernelMessage,
+      outputStream: OutputStream) =>
+      val interpreterActor = actorLoader.load(SystemActorType.Interpreter)
+
+      // Store our old sender so we don't lose it in the callback
+      // NOTE: Should point back to our KernelMessageRelay
+      val oldSender = sender()
+
+      // Sets the outputStream for this particular ExecuteRequest
+      magicLoader.dependencyMap.setOutputStream(outputStream)
+
+      // Parse the code for magics before sending it to the interpreter and
+      // pipe the response to sender
+      (magicParser.parse(executeRequest.code) match {
+        case Left(code) =>
+          val parsedRequest =
+            (executeRequest.copy(code = code), parentMessage, outputStream)
+          val interpreterFuture = (interpreterActor ? parsedRequest)
+            .mapTo[Either[ExecuteOutput, ExecuteFailure]]
+          packageFutureResponse(interpreterFuture)
+
+        case Right(error) =>
+          val failure = ExecuteError("Error parsing magics!", error, Nil)
+          Future { failureMatch(failure) }
+      }) pipeTo oldSender
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/relay/KernelMessageRelay.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/relay/KernelMessageRelay.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/relay/KernelMessageRelay.scala
new file mode 100644
index 0000000..cc45479
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/relay/KernelMessageRelay.scala
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.relay
+
+import akka.pattern.ask
+import akka.util.Timeout
+import com.ibm.spark.communication.security.SecurityActorType
+import com.ibm.spark.communication.utils.OrderedSupport
+import com.ibm.spark.kernel.protocol.v5.MessageType.MessageType
+import com.ibm.spark.kernel.protocol.v5.content.ShutdownRequest
+import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
+import com.ibm.spark.kernel.protocol.v5.{KernelMessage, MessageType, _}
+import com.ibm.spark.utils.MessageLogSupport
+import scala.collection.immutable.HashMap
+import scala.concurrent.duration._
+import scala.util.{Random, Failure, Success}
+
+/**
+ * This class is meant to be a relay for send KernelMessages through kernel
+ * system.
+ * @param actorLoader The ActorLoader used by this class for finding actors for
+ *                    relaying messages
+ * @param incomingSpecialCases The special cases for incoming messages
+ * @param outgoingSpecialCases The special cases for outgoing messages
+ * @param useSignatureManager Whether or not to use signature verification and
+ *                            generation
+ */
+case class KernelMessageRelay(
+  actorLoader: ActorLoader,
+  useSignatureManager: Boolean,
+  incomingSpecialCases: Map[String, String] = new HashMap[String, String](),
+  outgoingSpecialCases: Map[String, String] = new HashMap[String, String]()
+) extends OrderedSupport with MessageLogSupport  {
+  // NOTE: Required to provide the execution context for futures with akka
+  import context._
+
+  // NOTE: Required for ask (?) to function... maybe can define elsewhere?
+  implicit val timeout = Timeout(5.seconds)
+
+  // Flag indicating if can receive messages (or add them to buffer)
+  var isReady = false
+
+  def this(actorLoader: ActorLoader) =
+    this(actorLoader, true)
+
+  /**
+   * Relays a KernelMessage to a specific actor to handle that message.
+   *
+   * @param messageType The enumeration representing the message type
+   * @param kernelMessage The message to relay
+   */
+  private def relay(messageType: MessageType, kernelMessage: KernelMessage) = {
+    logger.debug("Relaying message type of " + messageType.toString)
+    logKernelMessageAction("Relaying", kernelMessage)
+    actorLoader.load(messageType) ! kernelMessage
+  }
+
+  private def incomingRelay(kernelMessage: KernelMessage) = {
+    var messageTypeString = kernelMessage.header.msg_type
+
+    // If this is a special case, transform the message type accordingly
+    if (incomingSpecialCases.contains(messageTypeString)) {
+      logger.debug(s"$messageTypeString is a special incoming case!")
+      messageTypeString = incomingSpecialCases(messageTypeString)
+    }
+
+    relay(MessageType.withName(messageTypeString), kernelMessage)
+  }
+
+  private def outgoingRelay(kernelMessage: KernelMessage) = {
+    var messageTypeString = kernelMessage.header.msg_type
+
+    // If this is a special case, transform the message type accordingly
+    if (outgoingSpecialCases.contains(messageTypeString)) {
+      logger.debug(s"$messageTypeString is a special outgoing case!")
+      messageTypeString = outgoingSpecialCases(messageTypeString)
+    }
+
+    relay(MessageType.withName(messageTypeString), kernelMessage)
+  }
+
+
+  /**
+   * This actor will receive and handle two types; ZMQMessage and KernelMessage.
+   * These messages will be forwarded to the actors that are responsible for them.
+   */
+  override def receive = {
+    // TODO: How to restore this when the actor dies?
+    // Update ready status
+    case ready: Boolean =>
+      isReady = ready
+      if (isReady) {
+        logger.info("Unstashing all messages received!")
+        unstashAll()
+        logger.info("Relay is now fully ready to receive messages!")
+      } else {
+        logger.info("Relay is now disabled!")
+      }
+
+
+    // Add incoming messages (when not ready) to buffer to be processed
+    case (zmqStrings: Seq[_], kernelMessage: KernelMessage) if !isReady && kernelMessage.header.msg_type != ShutdownRequest.toTypeString =>
+      logger.info("Not ready for messages! Stashing until ready!")
+      stash()
+
+    // Assuming these messages are incoming messages
+    case (zmqStrings: Seq[_], kernelMessage: KernelMessage) if isReady || kernelMessage.header.msg_type == ShutdownRequest.toTypeString =>
+      startProcessing()
+      if (useSignatureManager) {
+        logger.trace(s"Verifying signature for incoming message " +
+          s"${kernelMessage.header.msg_id}")
+        val signatureManager =
+          actorLoader.load(SecurityActorType.SignatureManager)
+        val signatureVerificationFuture = signatureManager ? (
+          (kernelMessage.signature, zmqStrings)
+        )
+
+        signatureVerificationFuture.mapTo[Boolean].onComplete {
+          case Success(true) =>
+            incomingRelay(kernelMessage)
+            finishedProcessing()
+          case Success(false) =>
+            // TODO: Figure out what the failure message structure should be!
+            logger.error(s"Invalid signature received from message " +
+              s"${kernelMessage.header.msg_id}!")
+            finishedProcessing()
+          case Failure(t) =>
+            logger.error("Failure when verifying signature!", t)
+            finishedProcessing()
+        }
+      } else {
+        logger.debug(s"Relaying incoming message " +
+          s"${kernelMessage.header.msg_id} without SignatureManager")
+        incomingRelay(kernelMessage)
+        finishedProcessing()
+      }
+
+    // Assuming all kernel messages without zmq strings are outgoing
+    case kernelMessage: KernelMessage =>
+      startProcessing()
+      if (useSignatureManager) {
+        logger.trace(s"Creating signature for outgoing message " +
+          s"${kernelMessage.header.msg_id}")
+        val signatureManager = actorLoader.load(SecurityActorType.SignatureManager)
+        val signatureInsertFuture = signatureManager ? kernelMessage
+
+        // TODO: Handle error case for mapTo and non-present onFailure
+        signatureInsertFuture.mapTo[KernelMessage] onSuccess {
+          case message =>
+            outgoingRelay(message)
+            finishedProcessing()
+        }
+      } else {
+        logger.debug(s"Relaying outgoing message " +
+          s"${kernelMessage.header.msg_id} without SignatureManager")
+        outgoingRelay(kernelMessage)
+        finishedProcessing()
+      }
+  }
+
+  override def orderedTypes(): Seq[Class[_]] = Seq(
+    classOf[(Seq[_], KernelMessage)],
+    classOf[KernelMessage]
+  )
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelInputStream.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelInputStream.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelInputStream.scala
new file mode 100644
index 0000000..e57fd84
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelInputStream.scala
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.stream
+
+import java.io.InputStream
+import java.nio.charset.Charset
+
+import akka.pattern.ask
+import com.ibm.spark.kernel.protocol.v5.content.InputRequest
+import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
+import com.ibm.spark.kernel.protocol.v5.kernel.Utilities.timeout
+import com.ibm.spark.kernel.protocol.v5.{KMBuilder, MessageType}
+
+import scala.collection.mutable.ListBuffer
+import scala.concurrent.{Await, Future}
+
+import KernelInputStream._
+
+object KernelInputStream {
+  val DefaultPrompt = ""
+  val DefaultPassword = false
+}
+
+/**
+ * Represents an OutputStream that sends data back to the clients connect to the
+ * kernel instance.
+ *
+ * @param actorLoader The actor loader used to access the message relay
+ * @param kmBuilder The KMBuilder used to construct outgoing kernel messages
+ * @param prompt The prompt to use for input requests
+ * @param password Whether or not the input request is for a password
+ */
+class KernelInputStream(
+  actorLoader: ActorLoader,
+  kmBuilder: KMBuilder,
+  prompt: String = DefaultPrompt,
+  password: Boolean = DefaultPassword
+) extends InputStream {
+  private val EncodingType = Charset.forName("UTF-8")
+  @volatile private var internalBytes: ListBuffer[Byte] = ListBuffer()
+
+  /**
+   * Returns the number of bytes available before the next request is made
+   * for more data.
+   * @return The total number of bytes in the internal buffer
+   */
+  override def available(): Int = internalBytes.length
+
+  /**
+   * Requests the next byte of data from the client. If the buffer containing
+   * @return The byte of data as an integer
+   */
+  override def read(): Int = {
+    if (!this.hasByte) this.requestBytes()
+
+    this.nextByte()
+  }
+
+  private def hasByte: Boolean = internalBytes.nonEmpty
+
+  private def nextByte(): Int = {
+    val byte = internalBytes.head
+
+    internalBytes = internalBytes.tail
+
+    byte
+  }
+
+  private def requestBytes(): Unit = {
+    val inputRequest = InputRequest(prompt, password)
+    // NOTE: Assuming already provided parent header and correct ids
+    val kernelMessage = kmBuilder
+      .withHeader(MessageType.Outgoing.InputRequest)
+      .withContentString(inputRequest)
+      .build
+
+    // NOTE: The same handler is being used in both request and reply
+    val responseFuture: Future[String] =
+      (actorLoader.load(MessageType.Incoming.InputReply) ? kernelMessage)
+      .mapTo[String]
+
+    // Block until we get a response
+    import scala.concurrent.duration._
+    internalBytes ++=
+      Await.result(responseFuture, Duration.Inf).getBytes(EncodingType)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOutputStream.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOutputStream.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOutputStream.scala
new file mode 100644
index 0000000..56b0cbb
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOutputStream.scala
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.stream
+
+import java.io.OutputStream
+import java.nio.charset.Charset
+
+import com.ibm.spark.kernel.protocol.v5.content.StreamContent
+import com.ibm.spark.kernel.protocol.v5.{SystemActorType, MessageType, KMBuilder}
+import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
+import com.ibm.spark.utils.{LogLike, ScheduledTaskManager}
+import scala.collection.mutable.ListBuffer
+import KernelOutputStream._
+
+object KernelOutputStream {
+  val DefaultStreamType = "stdout"
+  val DefaultSendEmptyOutput = false
+}
+
+/**
+ * Represents an OutputStream that sends data back to the clients connect to the
+ * kernel instance.
+ *
+ * @param actorLoader The actor loader used to access the message relay
+ * @param kmBuilder The KMBuilder used to construct outgoing kernel messages
+ * @param scheduledTaskManager The task manager used to schedule periodic
+ *                             flushes to send data across the wire
+ * @param streamType The type of stream (stdout/stderr)
+ * @param sendEmptyOutput If true, will allow empty output to be flushed and
+ *                        sent out to listening clients
+ */
+class KernelOutputStream(
+  private val actorLoader: ActorLoader,
+  private val kmBuilder: KMBuilder,
+  private val scheduledTaskManager: ScheduledTaskManager,
+  private val streamType: String = DefaultStreamType,
+  private val sendEmptyOutput: Boolean = DefaultSendEmptyOutput
+) extends OutputStream with LogLike {
+  private val EncodingType = Charset.forName("UTF-8")
+  @volatile private var internalBytes: ListBuffer[Byte] = ListBuffer()
+
+  private var taskId: String = _
+
+  private def enableAutoFlush() =
+    if (taskId == null) {
+      logger.trace("Enabling auto flush")
+      taskId = scheduledTaskManager.addTask(task = this.flush())
+    }
+
+  private def disableAutoFlush() =
+    if (taskId != null) {
+      logger.trace("Disabling auto flush")
+      scheduledTaskManager.removeTask(taskId)
+      taskId = null
+    }
+
+  /**
+   * Takes the current byte array contents in memory, packages them up into a
+   * KernelMessage, and sends the message to the KernelMessageRelay.
+   */
+  override def flush(): Unit = {
+    val contents = internalBytes.synchronized {
+      logger.trace("Getting content to flush")
+      val bytesToString = new String(internalBytes.toArray, EncodingType)
+
+      // Clear the internal buffer
+      internalBytes.clear()
+
+      // Stop the auto-flushing
+      disableAutoFlush()
+
+      bytesToString
+    }
+
+    // Avoid building and sending a kernel message if the contents (when
+    // trimmed) are empty and the flag to send anyway is disabled
+    if (!sendEmptyOutput && contents.trim.isEmpty) {
+      val contentsWithVisibleWhitespace = contents
+        .replace("\n", "\\n")
+        .replace("\t", "\\t")
+        .replace("\r", "\\r")
+        .replace(" ", "\\s")
+      logger.warn(s"Suppressing empty output: '$contentsWithVisibleWhitespace'")
+      return
+    }
+
+    logger.trace(s"Content to flush: '$contents'")
+
+    val streamContent = StreamContent(
+      streamType, contents
+    )
+
+    val kernelMessage = kmBuilder
+      .withIds(Seq(MessageType.Outgoing.Stream.toString))
+      .withHeader(MessageType.Outgoing.Stream)
+      .withContentString(streamContent).build
+
+    actorLoader.load(SystemActorType.KernelMessageRelay) ! kernelMessage
+
+    // Ensure any underlying implementation is processed
+    super.flush()
+  }
+
+  /**
+   * Adds the specified byte to the end of the internal buffer. The most
+   * significant 24 bits are ignored. Only the least significant 8 bits
+   * are appended.
+   * @param b The byte whose least significant 8 bits are to be appended
+   */
+  override def write(b: Int): Unit = internalBytes.synchronized {
+    // Begin periodic flushing if this is a new set of bytes
+    enableAutoFlush()
+
+    internalBytes += b.toByte
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/magic/builtin/AddDeps.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/AddDeps.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/AddDeps.scala
new file mode 100644
index 0000000..5555c08
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/AddDeps.scala
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.magic.builtin
+
+import java.io.PrintStream
+
+import com.ibm.spark.magic._
+import com.ibm.spark.magic.dependencies._
+import com.ibm.spark.utils.ArgumentParsingSupport
+
+class AddDeps extends LineMagic with IncludeInterpreter
+  with IncludeOutputStream with IncludeSparkContext with ArgumentParsingSupport
+  with IncludeDependencyDownloader with IncludeKernel
+{
+
+  private lazy val printStream = new PrintStream(outputStream)
+
+  val _transitive =
+    parser.accepts("transitive", "retrieve dependencies recursively")
+
+  /**
+   * Execute a magic representing a line magic.
+   * @param code The single line of code
+   * @return The output of the magic
+   */
+  override def execute(code: String): Unit = {
+    val nonOptionArgs = parseArgs(code)
+    dependencyDownloader.setPrintStream(printStream)
+
+    // TODO: require a version or use the most recent if omitted?
+    if (nonOptionArgs.size == 3) {
+      // get the jars and hold onto the paths at which they reside
+      val urls = dependencyDownloader.retrieve(
+        nonOptionArgs(0), nonOptionArgs(1), nonOptionArgs(2), _transitive)
+
+      // add the jars to the interpreter and spark context
+      interpreter.addJars(urls:_*)
+      urls.foreach(url => sparkContext.addJar(url.getPath))
+    } else {
+      printHelp(printStream, """%AddDeps my.company artifact-id version""")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/magic/builtin/AddJar.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/AddJar.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/AddJar.scala
new file mode 100644
index 0000000..bdfdbe2
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/AddJar.scala
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.magic.builtin
+
+import java.io.{File, PrintStream}
+import java.net.URL
+import java.nio.file.{Files, Paths}
+
+import com.ibm.spark.magic._
+import com.ibm.spark.magic.builtin.AddJar._
+import com.ibm.spark.magic.dependencies._
+import com.ibm.spark.utils.{ArgumentParsingSupport, DownloadSupport, LogLike}
+import com.typesafe.config.Config
+
+object AddJar {
+
+  private var jarDir:Option[String] = None
+  def getJarDir(config: Config): String = {
+    jarDir.getOrElse({
+      jarDir = Some(
+        if(config.hasPath("jar_dir") && Files.exists(Paths.get(config.getString("jar_dir")))) {
+          config.getString("jar_dir")
+        } else {
+          Files.createTempDirectory("spark_kernel_add_jars").toFile.getAbsolutePath
+        }
+      )
+      jarDir.get
+    })
+  }
+}
+
+class AddJar
+  extends LineMagic with IncludeInterpreter with IncludeSparkContext
+  with IncludeOutputStream with DownloadSupport with ArgumentParsingSupport
+  with IncludeKernel with IncludeMagicLoader with IncludeConfig with LogLike
+{
+  // Option to mark re-downloading of jars
+  private val _force =
+    parser.accepts("f", "forces re-download of specified jar")
+
+  // Option to mark re-downloading of jars
+  private val _magic =
+    parser.accepts("magic", "loads jar as a magic extension")
+
+  // Lazy because the outputStream is not provided at construction
+  private lazy val printStream = new PrintStream(outputStream)
+
+  /**
+   * Retrieves file name from URL.
+   *
+   * @param location The remote location (URL) 
+   * @return The name of the remote URL, or an empty string if one does not exist
+   */
+  def getFileFromLocation(location: String): String = {
+    val url = new URL(location)
+    val file = url.getFile.split("/")
+    if (file.length > 0) {
+        file.last
+    } else {
+        ""
+    }
+  }
+
+  /**
+   * Downloads and adds the specified jar to the
+   * interpreter/compiler/cluster classpaths.
+   *
+   * @param code The line containing the location of the jar
+   */
+  override def execute(code: String): Unit = {
+    val nonOptionArgs = parseArgs(code.trim)
+
+    // Check valid arguments
+    if (nonOptionArgs.length != 1) {
+      printHelp(printStream, """%AddJar <jar_url>""")
+      return
+    }
+
+    // Check if the jar we want to download is valid
+    val jarRemoteLocation = nonOptionArgs(0)
+    if (jarRemoteLocation.isEmpty) {
+      printHelp(printStream, """%AddJar <jar_url>""")
+      return
+    }
+
+    // Get the destination of the jar
+    val jarName = getFileFromLocation(jarRemoteLocation)
+
+    // Ensure the URL actually contains a jar or zip file
+    if (!jarName.endsWith(".jar") && !jarName.endsWith(".zip")) {
+        throw new IllegalArgumentException(s"The jar file $jarName must end in .jar or .zip.")
+    }
+
+    val downloadLocation = getJarDir(config) + "/" + jarName
+
+    logger.debug( "Downloading jar to %s".format(downloadLocation) )
+
+    val fileDownloadLocation = new File(downloadLocation)
+
+    // Check if exists in cache or force applied
+    if (_force || !fileDownloadLocation.exists()) {
+      // Report beginning of download
+      printStream.println(s"Starting download from $jarRemoteLocation")
+
+      downloadFile(
+        new URL(jarRemoteLocation),
+        new File(downloadLocation).toURI.toURL
+      )
+
+      // Report download finished
+      printStream.println(s"Finished download of $jarName")
+    } else {
+      printStream.println(s"Using cached version of $jarName")
+    }
+
+
+    if (_magic)
+    {
+
+      magicLoader.addJar(fileDownloadLocation.toURI.toURL)
+
+    }
+    else
+    {
+      interpreter.addJars(fileDownloadLocation.toURI.toURL)
+      sparkContext.addJar(fileDownloadLocation.getCanonicalPath)
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/magic/builtin/BuiltinLoader.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/BuiltinLoader.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/BuiltinLoader.scala
new file mode 100644
index 0000000..aecc90f
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/BuiltinLoader.scala
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.magic.builtin
+
+import com.google.common.reflect.ClassPath
+import com.google.common.reflect.ClassPath.ClassInfo
+import com.ibm.spark.magic.InternalClassLoader
+import com.google.common.base.Strings._
+import scala.collection.JavaConversions._
+
+/**
+ * Represents a class loader that loads classes from the builtin package.
+ */
+class BuiltinLoader
+  extends InternalClassLoader(classOf[BuiltinLoader].getClassLoader) {
+
+  private val pkgName = this.getClass.getPackage.getName
+
+  /**
+   * Provides a list of ClassInfo objects for each class in the specified
+   * package.
+   * @param pkg package name
+   * @return list of ClassInfo objects
+   */
+  def getClasses(pkg: String = pkgName): List[ClassInfo] = {
+    isNullOrEmpty(pkg) match {
+      case true =>
+        List()
+      case false =>
+        // TODO: Decide if this.getClass.getClassLoader should just be this
+        val classPath = ClassPath.from(this.getClass.getClassLoader)
+        classPath.getTopLevelClasses(pkg).filter(
+          _.getSimpleName != this.getClass.getSimpleName
+        ).toList
+    }
+  }
+
+  /**
+   * Provides a list of Class[_] objects for each class in the specified
+   * package.
+   * @param pkg package name
+   * @return list of Class[_] objects
+   */
+  def loadClasses(pkg: String = pkgName): List[Class[_]] =
+    getClasses(pkg).map(_.load())
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/magic/builtin/Html.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/Html.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/Html.scala
new file mode 100644
index 0000000..95fa31a
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/Html.scala
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.magic.builtin
+
+import java.io.PrintStream
+
+import com.ibm.spark.kernel.protocol.v5.MIMEType
+import com.ibm.spark.magic._
+import com.ibm.spark.magic.dependencies.IncludeOutputStream
+import com.ibm.spark.utils.ArgumentParsingSupport
+import com.google.common.base.Strings
+
+class Html extends CellMagic with ArgumentParsingSupport
+  with IncludeOutputStream {
+
+  // Lazy because the outputStream is not provided at construction
+  private lazy val printStream = new PrintStream(outputStream)
+
+  override def execute(code: String): CellMagicOutput = {
+    def printHelpAndReturn: CellMagicOutput = {
+      printHelp(printStream, """%%Html <string_code>""")
+      CellMagicOutput()
+    }
+
+    Strings.isNullOrEmpty(code) match {
+      case true => printHelpAndReturn
+      case false => CellMagicOutput(MIMEType.TextHtml -> code)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/magic/builtin/JavaScript.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/JavaScript.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/JavaScript.scala
new file mode 100644
index 0000000..42772c1
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/JavaScript.scala
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.magic.builtin
+
+import java.io.PrintStream
+
+import com.google.common.base.Strings
+import com.ibm.spark.kernel.protocol.v5.MIMEType
+import com.ibm.spark.magic._
+import com.ibm.spark.magic.dependencies.IncludeOutputStream
+import com.ibm.spark.utils.ArgumentParsingSupport
+import org.slf4j.LoggerFactory
+
+class JavaScript extends CellMagic with ArgumentParsingSupport
+  with IncludeOutputStream {
+
+  // Lazy because the outputStream is not provided at construction
+  private lazy val printStream = new PrintStream(outputStream)
+
+  override def execute(code: String): CellMagicOutput = {
+    def printHelpAndReturn: CellMagicOutput = {
+      printHelp(printStream, """%JavaScript <string_code>""")
+      CellMagicOutput()
+    }
+
+    Strings.isNullOrEmpty(code) match {
+      case true => printHelpAndReturn
+      case false => CellMagicOutput(MIMEType.ApplicationJavaScript -> code)
+    }
+  }
+}