You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lb...@apache.org on 2016/01/22 23:07:24 UTC
[16/51] [abbrv] incubator-toree git commit: Moved scala files to new
locations based on new package
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/InterpreterActor.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/InterpreterActor.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/InterpreterActor.scala
new file mode 100644
index 0000000..b5ae174
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/InterpreterActor.scala
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.interpreter
+
+import java.io.OutputStream
+
+import akka.actor.{Actor, ActorRef, Props}
+import akka.pattern.{ask, pipe}
+import akka.util.Timeout
+import com.ibm.spark.interpreter.Interpreter
+import com.ibm.spark.kernel.protocol.v5.KernelMessage
+import com.ibm.spark.kernel.protocol.v5.interpreter.tasks._
+import com.ibm.spark.kernel.protocol.v5.content._
+import com.ibm.spark.interpreter._
+import com.ibm.spark.utils.LogLike
+
+import scala.concurrent.duration._
+
+object InterpreterActor {
+ def props(interpreter: Interpreter): Props =
+ Props(classOf[InterpreterActor], interpreter)
+}
+
+// TODO: Investigate restart sequence
+//
+// http://doc.akka.io/docs/akka/2.2.3/general/supervision.html
+//
+// "create new actor instance by invoking the originally provided factory again"
+//
+// Does this mean that the interpreter instance is not gc and is passed in?
+//
+class InterpreterActor(
+ interpreterTaskFactory: InterpreterTaskFactory
+) extends Actor with LogLike {
+ // NOTE: Required to provide the execution context for futures with akka
+ import context._
+
+ // NOTE: Required for ask (?) to function... maybe can define elsewhere?
+ implicit val timeout = Timeout(21474835.seconds)
+
+ //
+ // List of child actors that the interpreter contains
+ //
+ private var executeRequestTask: ActorRef = _
+ private var completeCodeTask: ActorRef = _
+
+ /**
+ * Initializes all child actors performing tasks for the interpreter.
+ */
+ override def preStart = {
+ executeRequestTask = interpreterTaskFactory.ExecuteRequestTask(
+ context, InterpreterChildActorType.ExecuteRequestTask.toString)
+ completeCodeTask = interpreterTaskFactory.CodeCompleteTask(
+ context, InterpreterChildActorType.CodeCompleteTask.toString)
+ }
+
+ override def receive: Receive = {
+ case (executeRequest: ExecuteRequest, parentMessage: KernelMessage,
+ outputStream: OutputStream) =>
+ val data = (executeRequest, parentMessage, outputStream)
+ (executeRequestTask ? data) recover {
+ case ex: Throwable =>
+ logger.error(s"Could not execute code ${executeRequest.code} because "
+ + s"of exception: ${ex.getMessage}")
+ Right(ExecuteError(
+ ex.getClass.getName,
+ ex.getLocalizedMessage,
+ ex.getStackTrace.map(_.toString).toList)
+ )
+ } pipeTo sender
+ case (completeRequest: CompleteRequest) =>
+ logger.debug(s"InterpreterActor requesting code completion for code " +
+ s"${completeRequest.code}")
+ (completeCodeTask ? completeRequest) recover {
+ case ex: Throwable =>
+ logger.error(s"Could not complete code ${completeRequest.code}: " +
+ s"${ex.getMessage}")
+ Right(ExecuteError(
+ ex.getClass.getName,
+ ex.getLocalizedMessage,
+ ex.getStackTrace.map(_.toString).toList)
+ )
+ } pipeTo sender
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/package.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/package.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/package.scala
new file mode 100644
index 0000000..6738520
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/package.scala
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5
+
+package object interpreter {
+ object InterpreterChildActorType extends Enumeration {
+ type InterpreterChildActorType = Value
+
+ val ExecuteRequestTask = Value("execute_request_task")
+ val CodeCompleteTask = Value("code_complete_task")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/CodeCompleteTaskActor.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/CodeCompleteTaskActor.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/CodeCompleteTaskActor.scala
new file mode 100644
index 0000000..87f37b0
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/CodeCompleteTaskActor.scala
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.interpreter.tasks
+
+import akka.actor.{Actor, Props}
+import com.ibm.spark.interpreter.Interpreter
+import com.ibm.spark.kernel.protocol.v5.content.CompleteRequest
+import com.ibm.spark.utils.LogLike
+
+object CodeCompleteTaskActor {
+ def props(interpreter: Interpreter): Props =
+ Props(classOf[CodeCompleteTaskActor], interpreter)
+}
+
+class CodeCompleteTaskActor(interpreter: Interpreter)
+ extends Actor with LogLike {
+ require(interpreter != null)
+
+ override def receive: Receive = {
+ case completeRequest: CompleteRequest =>
+ logger.debug("Invoking the interpreter completion")
+ sender ! interpreter.completion(completeRequest.code, completeRequest.cursor_pos)
+ case _ =>
+ sender ! "Unknown message" // TODO: Provide a failure message type to be passed around?
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/ExecuteRequestTaskActor.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/ExecuteRequestTaskActor.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/ExecuteRequestTaskActor.scala
new file mode 100644
index 0000000..dc22b21
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/ExecuteRequestTaskActor.scala
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.interpreter.tasks
+
+import java.io.OutputStream
+
+import akka.actor.{Props, Actor}
+import com.ibm.spark.global.StreamState
+import com.ibm.spark.interpreter.{ExecuteAborted, Results, ExecuteError, Interpreter}
+import com.ibm.spark.kernel.api.StreamInfo
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.kernel.protocol.v5.content._
+import com.ibm.spark.security.KernelSecurityManager
+import com.ibm.spark.utils.{ConditionalOutputStream, MultiOutputStream, LogLike}
+
+object ExecuteRequestTaskActor {
+ def props(interpreter: Interpreter): Props =
+ Props(classOf[ExecuteRequestTaskActor], interpreter)
+}
+
+class ExecuteRequestTaskActor(interpreter: Interpreter) extends Actor with LogLike {
+ require(interpreter != null)
+
+ override def receive: Receive = {
+ case (executeRequest: ExecuteRequest, parentMessage: KernelMessage,
+ outputStream: OutputStream) =>
+ // If the cell is not empty, then interpret.
+ if(executeRequest.code.trim != "") {
+ //interpreter.updatePrintStreams(System.in, outputStream, outputStream)
+ val newInputStream = System.in
+ val newOutputStream = buildOutputStream(outputStream, System.out)
+ val newErrorStream = buildOutputStream(outputStream, System.err)
+
+ // Update our global streams to be used by future output
+ // NOTE: This is not async-safe! This is expected to be broken when
+ // running asynchronously! Use an alternative for data
+ // communication!
+ StreamState.setStreams(newInputStream, newOutputStream, newErrorStream)
+
+ val (success, result) = {
+ // Add our parent message with StreamInfo type included
+// interpreter.doQuietly {
+// interpreter.bind(
+// "$streamInfo",
+// "com.ibm.spark.kernel.api.StreamInfo",
+// new KernelMessage(
+// ids = parentMessage.ids,
+// signature = parentMessage.signature,
+// header = parentMessage.header,
+// parentHeader = parentMessage.parentHeader,
+// metadata = parentMessage.metadata,
+// contentString = parentMessage.contentString
+// ) with StreamInfo,
+// List( """@transient""", """implicit""")
+// )
+ // TODO: Think of a cleaner wrapper to handle updating the Console
+ // input and output streams
+// interpreter.interpret(
+// """val $updateOutput = {
+// Console.setIn(System.in)
+// Console.setOut(System.out)
+// Console.setErr(System.err)
+// }""".trim)
+// }
+ interpreter.interpret(executeRequest.code.trim)
+ }
+
+ logger.debug(s"Interpreter execution result was ${success}")
+ success match {
+ case Results.Success =>
+ val output = result.left.get
+ sender ! Left(output)
+ case Results.Error =>
+ val error = result.right.get
+ sender ! Right(error)
+ case Results.Aborted =>
+ sender ! Right(new ExecuteAborted)
+ case Results.Incomplete =>
+ // If we get an incomplete it's most likely a syntax error, so
+ // let the user know.
+ sender ! Right(new ExecuteError("Syntax Error.", "", List()))
+ }
+ } else {
+ // If we get empty code from a cell then just return ExecuteReplyOk
+ sender ! Left("")
+ }
+ case unknownValue =>
+ logger.warn(s"Received unknown message type ${unknownValue}")
+ sender ! "Unknown message" // TODO: Provide a failure message type to be passed around?
+ }
+
+ private def buildOutputStream(
+ newOutput: OutputStream,
+ defaultOutput: OutputStream
+ ) = {
+ def isRestrictedThread = {
+ val currentGroup = Thread.currentThread().getThreadGroup
+ val restrictedGroupName =
+ KernelSecurityManager.RestrictedGroupName
+
+ currentGroup != null && currentGroup.getName == restrictedGroupName
+ }
+
+ new MultiOutputStream(List[OutputStream](
+ new ConditionalOutputStream(newOutput, isRestrictedThread),
+ new ConditionalOutputStream(defaultOutput, !isRestrictedThread)
+ ))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/InterpreterTaskFactory.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/InterpreterTaskFactory.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/InterpreterTaskFactory.scala
new file mode 100644
index 0000000..5f5a7ad
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/interpreter/tasks/InterpreterTaskFactory.scala
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.interpreter.tasks
+
+import akka.actor.{ActorRefFactory, ActorRef}
+import com.ibm.spark.interpreter.Interpreter
+
+class InterpreterTaskFactory(interpreter: Interpreter) {
+ /**
+ * Creates a new actor representing this specific task.
+ * @param actorRefFactory The factory used to task actor will belong
+ * @return The ActorRef created for the task
+ */
+ def ExecuteRequestTask(actorRefFactory: ActorRefFactory, name: String): ActorRef =
+ actorRefFactory.actorOf(ExecuteRequestTaskActor.props(interpreter), name)
+
+ /**
+ *
+ */
+ def CodeCompleteTask(actorRefFactory: ActorRefFactory, name: String): ActorRef =
+ actorRefFactory.actorOf(CodeCompleteTaskActor.props(interpreter), name)
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/ActorLoader.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/ActorLoader.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/ActorLoader.scala
new file mode 100644
index 0000000..171934d
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/ActorLoader.scala
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.kernel
+
+import akka.actor.{ActorRefFactory, ActorSelection}
+
+/**
+ * This trait defines the interface for loading actors based on some value
+ * (enum, attribute, etc...). The thought is to allow external consumers
+ * acquire actors through a common interface, minimizing the spread of the
+ * logic about the Actors, ActorSystem, and other similar concepts.
+ */
+trait ActorLoader {
+ /**
+ * This method is meant to find an actor associated with an enum value. This
+ * enum value can map to an actor associated with handling a specific kernel
+ * message, a socket type, or other functionality.
+ *
+ * @param actorEnum The enum value used to load the actor
+ *
+ * @return An ActorSelection to pass messages to
+ */
+ def load(actorEnum: Enumeration#Value): ActorSelection
+}
+
+case class SimpleActorLoader(actorRefFactory: ActorRefFactory)
+ extends ActorLoader
+{
+ private val userActorDirectory: String = "/user/%s"
+
+ override def load(actorEnum: Enumeration#Value): ActorSelection = {
+ actorRefFactory.actorSelection(
+ userActorDirectory.format(actorEnum.toString)
+ )
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/Utilities.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/Utilities.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/Utilities.scala
new file mode 100644
index 0000000..73cf1b1
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/Utilities.scala
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.kernel
+
+import java.nio.charset.Charset
+
+import akka.util.{ByteString, Timeout}
+import com.ibm.spark.communication.ZMQMessage
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.utils.LogLike
+import play.api.data.validation.ValidationError
+import play.api.libs.json.{JsPath, Json, Reads}
+
+import scala.concurrent.duration._
+
+object Utilities extends LogLike {
+ //
+ // NOTE: This is brought in to remove feature warnings regarding the use of
+ // implicit conversions regarding the following:
+ //
+ // 1. ByteStringToString
+ // 2. ZMQMessageToKernelMessage
+ //
+ import scala.language.implicitConversions
+
+ /**
+ * This timeout needs to be defined for the Akka asks to timeout
+ */
+ implicit val timeout = Timeout(21474835.seconds)
+
+ implicit def ByteStringToString(byteString : ByteString) : String = {
+ new String(byteString.toArray, Charset.forName("UTF-8"))
+ }
+
+ implicit def StringToByteString(string : String) : ByteString = {
+ ByteString(string.getBytes)
+ }
+
+ implicit def ZMQMessageToKernelMessage(message: ZMQMessage): KernelMessage = {
+ val delimiterIndex: Int =
+ message.frames.indexOf(ByteString("<IDS|MSG>".getBytes))
+ // TODO Handle the case where there is no delimiter
+ val ids: Seq[String] =
+ message.frames.take(delimiterIndex).map(
+ (byteString : ByteString) => { new String(byteString.toArray) }
+ )
+ val header = Json.parse(message.frames(delimiterIndex + 2)).as[Header]
+ // TODO: Investigate better solution than setting parentHeader to null for {}
+ val parentHeader = parseAndHandle(message.frames(delimiterIndex + 3),
+ ParentHeader.headerReads,
+ handler = (valid: ParentHeader) => valid,
+ errHandler = _ => null
+ )
+ val metadata = Json.parse(message.frames(delimiterIndex + 4)).as[Metadata]
+
+ KMBuilder().withIds(ids.toList)
+ .withSignature(message.frame(delimiterIndex + 1))
+ .withHeader(header)
+ .withParentHeader(parentHeader)
+ .withMetadata(metadata)
+ .withContentString(message.frame(delimiterIndex + 5)).build(false)
+ }
+
+ implicit def KernelMessageToZMQMessage(kernelMessage : KernelMessage) : ZMQMessage = {
+ val frames: scala.collection.mutable.ListBuffer[ByteString] = scala.collection.mutable.ListBuffer()
+ kernelMessage.ids.map((id : String) => frames += id )
+ frames += "<IDS|MSG>"
+ frames += kernelMessage.signature
+ frames += Json.toJson(kernelMessage.header).toString()
+ frames += Json.toJson(kernelMessage.parentHeader).toString()
+ frames += Json.toJson(kernelMessage.metadata).toString
+ frames += kernelMessage.contentString
+ ZMQMessage(frames : _*)
+ }
+
+ def parseAndHandle[T, U](json: String, reads: Reads[T],
+ handler: T => U) : U = {
+ parseAndHandle(json, reads, handler,
+ (invalid: Seq[(JsPath, Seq[ValidationError])]) => {
+ logger.error(s"Could not parse JSON, ${json}")
+ throw new Throwable(s"Could not parse JSON, ${json}")
+ }
+ )
+ }
+
+ def parseAndHandle[T, U](json: String, reads: Reads[T],
+ handler: T => U,
+ errHandler: Seq[(JsPath, Seq[ValidationError])] => U) : U = {
+ Json.parse(json).validate[T](reads).fold(
+ errHandler,
+ (content: T) => handler(content)
+ )
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Control.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Control.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Control.scala
new file mode 100644
index 0000000..df42b03
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Control.scala
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.kernel.socket
+
+import com.ibm.spark.kernel.protocol.v5.SystemActorType
+import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
+
+/**
+ * The server endpoint for control messages specified in the IPython Kernel Spec
+ * @param socketFactory A factory to create the ZeroMQ socket connection
+ * @param actorLoader The actor loader to use to load the relay for kernel
+ * messages
+ */
+class Control(socketFactory: SocketFactory, actorLoader: ActorLoader)
+ extends ZeromqKernelMessageSocket(
+ socketFactory.Control,
+ () => actorLoader.load(SystemActorType.KernelMessageRelay)
+ )
+{
+ logger.trace("Created new Control actor")
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Heartbeat.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Heartbeat.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Heartbeat.scala
new file mode 100644
index 0000000..d17c360
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Heartbeat.scala
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.kernel.socket
+
+import akka.actor.Actor
+import akka.util.ByteString
+import com.ibm.spark.communication.ZMQMessage
+import com.ibm.spark.utils.LogLike
+
+/**
+ * The server endpoint for heartbeat messages specified in the IPython Kernel Spec
+ * @param socketFactory A factory to create the ZeroMQ socket connection
+ */
+class Heartbeat(socketFactory : SocketFactory) extends Actor with LogLike {
+ logger.debug("Created new Heartbeat actor")
+ val socket = socketFactory.Heartbeat(context.system, self)
+
+ override def receive: Receive = {
+ case message: ZMQMessage =>
+ logger.trace("Heartbeat received message: " +
+ message.frames.map((byteString: ByteString) =>
+ new String(byteString.toArray)).mkString("\n"))
+ socket ! message
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/IOPub.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/IOPub.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/IOPub.scala
new file mode 100644
index 0000000..d2ff8e9
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/IOPub.scala
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.kernel.socket
+
+import akka.actor.Actor
+import akka.util.ByteString
+import com.ibm.spark.communication.ZMQMessage
+import com.ibm.spark.communication.utils.OrderedSupport
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.kernel.protocol.v5.kernel.Utilities
+import Utilities._
+import com.ibm.spark.utils.{MessageLogSupport, LogLike}
+
+/**
+ * The server endpoint for IOPub messages specified in the IPython Kernel Spec
+ * @param socketFactory A factory to create the ZeroMQ socket connection
+ */
+class IOPub(socketFactory: SocketFactory)
+ extends Actor with MessageLogSupport with OrderedSupport
+{
+ logger.trace("Created new IOPub actor")
+ val socket = socketFactory.IOPub(context.system)
+ override def receive: Receive = {
+ case message: KernelMessage => withProcessing {
+ val zmqMessage: ZMQMessage = message
+ logMessage(message)
+ socket ! zmqMessage
+ }
+ }
+
+ /**
+ * Defines the types that will be stashed by {@link #waiting() waiting}
+ * while the Actor is in processing state.
+ * @return
+ */
+ override def orderedTypes(): Seq[Class[_]] = Seq(classOf[KernelMessage])
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Shell.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Shell.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Shell.scala
new file mode 100644
index 0000000..b5c4bfb
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Shell.scala
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.kernel.socket
+
+import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
+import com.ibm.spark.kernel.protocol.v5.SystemActorType
+
+/**
+ * The server endpoint for shell messages specified in the IPython Kernel Spec
+ * @param socketFactory A factory to create the ZeroMQ socket connection
+ * @param actorLoader The actor loader to use to load the relay for kernel
+ * messages
+ */
+class Shell(socketFactory: SocketFactory, actorLoader: ActorLoader)
+ extends ZeromqKernelMessageSocket(
+ socketFactory.Shell,
+ () => actorLoader.load(SystemActorType.KernelMessageRelay)
+ )
+{
+ logger.trace("Created new Shell actor")
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketConfig.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketConfig.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketConfig.scala
new file mode 100644
index 0000000..5fe3e39
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketConfig.scala
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.kernel.socket
+
+import com.typesafe.config.Config
+import play.api.libs.json.Json
+
+case class SocketConfig (
+ stdin_port: Int,
+ control_port: Int,
+ hb_port: Int,
+ shell_port: Int,
+ iopub_port: Int,
+ ip : String,
+ transport: String,
+ signature_scheme: String,
+ key: String
+)
+
+object SocketConfig {
+ implicit val socketConfigReads = Json.reads[SocketConfig]
+ implicit val socketConfigWrites = Json.writes[SocketConfig]
+
+ def fromConfig(config: Config) = {
+ new SocketConfig(
+ config.getInt("stdin_port"),
+ config.getInt("control_port"),
+ config.getInt("hb_port"),
+ config.getInt("shell_port"),
+ config.getInt("iopub_port"),
+ config.getString("ip"),
+ config.getString("transport"),
+ config.getString("signature_scheme"),
+ config.getString("key")
+ )
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketConnection.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketConnection.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketConnection.scala
new file mode 100644
index 0000000..865c383
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketConnection.scala
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.kernel.socket
+
+object SocketConnection {
+ def apply(protocol: String, ip: String, port: Int) = new SocketConnection(protocol, ip, port)
+}
+
+/**
+ * Represent a connection string for a socket
+ * @param protocol The protocol portion of the connection (e.g. tcp, akka, udp)
+ * @param ip The hostname or ip address to bind on (e.g. *, myhost, 127.0.0.1)
+ * @param port The port for the socket to listen on
+ */
+class SocketConnection(protocol: String, ip: String, port: Int) {
+ private val SocketConnectionString : String = "%s://%s:%d"
+
+ override def toString: String = {
+ SocketConnectionString.format(protocol, ip, port)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketFactory.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketFactory.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketFactory.scala
new file mode 100644
index 0000000..ef2001f
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/SocketFactory.scala
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.kernel.socket
+
+import akka.actor.{Props, ActorRef, ActorSystem}
+import com.ibm.spark.communication.actors.{RouterSocketActor, RepSocketActor, PubSocketActor}
+
+object SocketFactory {
+ def apply(socketConfig: SocketConfig) = {
+ new SocketFactory(socketConfig)
+ }
+}
+
+/**
+ * A Factory class to provide various socket connections for IPython Kernel Spec
+ * @param socketConfig The configuration for the sockets to be properly
+ * instantiated
+ */
+class SocketFactory(socketConfig: SocketConfig) {
+ val HeartbeatConnection = SocketConnection(
+ socketConfig.transport, socketConfig.ip, socketConfig.hb_port)
+ val ShellConnection = SocketConnection(
+ socketConfig.transport, socketConfig.ip, socketConfig.shell_port)
+ val ControlConnection = SocketConnection(
+ socketConfig.transport, socketConfig.ip, socketConfig.control_port)
+ val IOPubConnection = SocketConnection(
+ socketConfig.transport, socketConfig.ip, socketConfig.iopub_port)
+ val StdinConnection = SocketConnection(
+ socketConfig.transport, socketConfig.ip, socketConfig.stdin_port)
+
+ /**
+ * Creates a ZeroMQ reply socket representing the server endpoint for
+ * heartbeat messages
+ * @param system The actor system the socket actor will belong
+ * @param listener The actor who will receive
+ * @return The ActorRef created for the socket connection
+ */
+ def Heartbeat(system: ActorSystem, listener: ActorRef) : ActorRef =
+ system.actorOf(Props(classOf[RepSocketActor], HeartbeatConnection.toString, listener))
+// ZeroMQExtension(system).newRepSocket(
+// Array(Listener(listener), Bind(HeartbeatConnection.toString))
+// )
+
+ /**
+ * Creates a ZeroMQ reply socket representing the server endpoint for shell
+ * messages
+ * @param system The actor system the socket actor will belong
+ * @param listener The actor who will receive
+ * @return The ActorRef created for the socket connection
+ */
+ def Shell(system: ActorSystem, listener: ActorRef) : ActorRef =
+ system.actorOf(Props(classOf[RouterSocketActor], ShellConnection.toString, listener))
+// ZeroMQExtension(system).newRouterSocket(
+// Array(Listener(listener), Bind(ShellConnection.toString))
+// )
+
+ /**
+ * Creates a ZeroMQ reply socket representing the server endpoint for control
+ * messages
+ * @param system The actor system the socket actor will belong
+ * @param listener The actor who will receive
+ * @return The ActorRef created for the socket connection
+ */
+ def Control(system: ActorSystem, listener: ActorRef) : ActorRef =
+ system.actorOf(Props(classOf[RouterSocketActor], ControlConnection.toString, listener))
+
+ /**
+ * Creates a ZeroMQ reply socket representing the server endpoint for stdin
+ * messages
+ * @param system The actor system the socket actor will belong
+ * @param listener The actor who will receive
+ * @return The ActorRef created for the socket connection
+ */
+ def Stdin(system: ActorSystem, listener: ActorRef) : ActorRef =
+ system.actorOf(Props(classOf[RouterSocketActor], StdinConnection.toString, listener))
+// ZeroMQExtension(system).newRouterSocket(
+// Array(Listener(listener), Bind(StdinConnection.toString))
+// )
+
+ /**
+ * Creates a ZeroMQ reply socket representing the server endpoint for IOPub
+ * messages
+ * @param system The actor system the socket actor will belong
+ * @return The ActorRef created for the socket connection
+ */
+ def IOPub(system: ActorSystem) : ActorRef =
+ system.actorOf(Props(classOf[PubSocketActor], IOPubConnection.toString))
+// ZeroMQExtension(system).newPubSocket(
+// Bind(IOPubConnection.toString)
+// )
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Stdin.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Stdin.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Stdin.scala
new file mode 100644
index 0000000..261151e
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/Stdin.scala
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.kernel.socket
+
+import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
+import com.ibm.spark.kernel.protocol.v5.SystemActorType
+
+/**
+ * The server endpoint for stdin messages specified in the IPython Kernel Spec
+ * @param socketFactory A factory to create the ZeroMQ socket connection
+ * @param actorLoader The actor loader to use to load the relay for kernel
+ * messages
+ */
+class Stdin(socketFactory: SocketFactory, actorLoader: ActorLoader)
+ extends ZeromqKernelMessageSocket(
+ socketFactory.Stdin,
+ () => actorLoader.load(SystemActorType.KernelMessageRelay)
+ )
+{
+ logger.trace("Created new Stdin actor")
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/ZeromqKernelMessageSocket.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/ZeromqKernelMessageSocket.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/ZeromqKernelMessageSocket.scala
new file mode 100644
index 0000000..3d02f0b
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/socket/ZeromqKernelMessageSocket.scala
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.kernel.socket
+
+import java.nio.charset.Charset
+
+import akka.actor.{ActorSelection, ActorSystem, ActorRef, Actor}
+import akka.util.ByteString
+import com.ibm.spark.communication.ZMQMessage
+
+//import com.ibm.spark.kernel.protocol.v5.kernel.ZMQMessage
+import com.ibm.spark.kernel.protocol.v5.KernelMessage
+import com.ibm.spark.kernel.protocol.v5.kernel.Utilities._
+import com.ibm.spark.utils.MessageLogSupport
+
+/**
+ * Represents a generic socket geared toward two-way communication using
+ * ZeroMQ and KernelMessage structures.
+ * @param actorSocketFunc The function used to retrieve the actor for outgoing
+ * communication via sockets
+ * @param actorForwardFunc The function used to retrieve the actor for incoming
+ * kernel messages
+ */
+abstract class ZeromqKernelMessageSocket(
+ actorSocketFunc: (ActorSystem, ActorRef) => ActorRef,
+ actorForwardFunc: () => ActorSelection
+) extends Actor with MessageLogSupport {
+ val actorSocketRef = actorSocketFunc(context.system, self)
+ val actorForwardRef = actorForwardFunc()
+
+ override def receive: Receive = {
+ case message: ZMQMessage =>
+ val kernelMessage: KernelMessage = message
+ logMessage(kernelMessage)
+
+ // Grab the strings to use for signature verification
+ val zmqStrings = message.frames.map((byteString: ByteString) =>
+ new String(byteString.toArray, Charset.forName("UTF-8"))
+ ).takeRight(4) // TODO: This assumes NO extra buffers, refactor?
+
+ // Forward along our message (along with the strings used for
+ // signatures)
+ actorForwardRef ! ((zmqStrings, kernelMessage))
+
+ case message: KernelMessage =>
+ val zmqMessage: ZMQMessage = message
+ logMessage(message)
+ actorSocketRef ! zmqMessage
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/magic/MagicParser.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/magic/MagicParser.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/magic/MagicParser.scala
new file mode 100644
index 0000000..a24c062
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/magic/MagicParser.scala
@@ -0,0 +1,145 @@
+package com.ibm.spark.kernel.protocol.v5.magic
+
+import com.ibm.spark.magic.MagicLoader
+
+class MagicParser(magicLoader: MagicLoader) {
+ private val magicRegex = """^[%]{1,2}(\w+)""".r
+ protected[magic] val kernelObjectName = "kernel.magics"
+
+ /**
+ * Determines whether a given line of code represents a line magic.
+ * @param codeLine a single line of code
+ * @return
+ */
+ private def isLineMagic(codeLine: String) = codeLine.startsWith("%") &&
+ !isCellMagic(codeLine)
+
+ /**
+ * Determines whether a given string of code represents a cell magic.
+ * @param codeBlob a string of code separated by newlines
+ * @return
+ */
+ private def isCellMagic(codeBlob: String) = codeBlob.startsWith("%%")
+
+ /**
+ * Finds the first occurrence of a "magic string" i.e. "%%magic" or "%magic"
+ * in a given code string, and separates the magic name from the code that
+ * follows it.
+ *
+ * E.g.
+ * "%magic foo bar" -> ("magic", "foo bar")
+ * @param codeBlob a string of code separated by newlines
+ * @return (magicName, args)
+ */
+ protected[magic] def parseMagic(codeBlob: String): Option[(String, String)] = {
+ val matchData =
+ magicRegex.findFirstMatchIn(codeBlob)
+
+ matchData match {
+ case Some(m) => Some((m.group(1), m.after(1).toString.trim))
+ case None => None
+ }
+ }
+
+ /**
+ * Given a line of code representing a magic invocation determines whether
+ * the magic has an implementation.
+ * @param codeLine a single line of code
+ * @return true if the magic exists and is a line magic
+ */
+ protected[magic] def isValidLineMagic(codeLine: String): Boolean = {
+ parseMagic(codeLine) match {
+ case Some((magicName, _)) =>
+ isLineMagic(codeLine) && magicLoader.hasLineMagic(magicName)
+ case None => false
+ }
+ }
+
+ /**
+ * Given a blob of code, finds any magic invocations of magics that don't
+ * exist.
+ * @param codeBlob a string of code separated by newlines
+ * @return invalid magic names from the given code blob
+ */
+ protected[magic] def parseOutInvalidMagics(codeBlob: String): List[String] = {
+ val lineMagics = codeBlob.split("\n").toList.filter(isLineMagic)
+ lineMagics.filterNot(isValidLineMagic).map(line => {
+ val (magicName, _) = parseMagic(line).get
+ magicName
+ })
+ }
+
+ /**
+ * Formats a given magic name and args to code for a kernel method call.
+ * @param magicName the name of the magic
+ * @param args the arguments to the magic
+ * @return equivalent kernel method call
+ */
+ protected[magic] def substitute(magicName: String, args: String): String =
+ s"""$kernelObjectName.$magicName(\"\"\"$args\"\"\")"""
+
+ /**
+ * Formats a given line of code representing a line magic invocation into an
+ * equivalent kernel object call if the magic invocation is valid.
+ * @param codeLine the line of code to convert.
+ * @return a substituted line of code if valid else the original line
+ */
+ protected[magic] def substituteLine(codeLine: String): String = {
+ isValidLineMagic(codeLine) match {
+ case true =>
+ val (magicName, args) = parseMagic(codeLine).get
+ substitute(magicName, args)
+ case false => codeLine
+ }
+ }
+
+ /**
+ * Formats a given code blob representing a cell magic invocation into an
+ * equivalent kernel object call if the cell magic invocation is valid. An
+ * error message is returned if not.
+ * @param codeBlob the blob of code representing a cell magic invocation
+ * @return Left(the substituted code) or Right(error message)
+ */
+ protected[magic] def parseCell(codeBlob: String): Either[String, String] = {
+ parseMagic(codeBlob.trim) match {
+ case Some((cellMagicName, args)) =>
+ magicLoader.hasCellMagic(cellMagicName) match {
+ case true => Left(substitute(cellMagicName, args))
+ case false => Right(s"Magic $cellMagicName does not exist!")
+ }
+ case None => Left(codeBlob)
+ }
+ }
+
+ /**
+ * Parses all lines in a given code blob and either substitutes equivalent
+ * kernel object calls for each line magic in the code blob OR returns
+ * an error message if any of the line magic invocations were invalid.
+ * @param codeBlob a string of code separated by newlines
+ * @return Left(code blob with substitutions) or Right(error message)
+ */
+ protected[magic] def parseLines(codeBlob: String): Either[String, String] = {
+ val invalidMagics = parseOutInvalidMagics(codeBlob.trim)
+ invalidMagics match {
+ case Nil =>
+ val substitutedCode = codeBlob.trim.split("\n").map(substituteLine)
+ Left(substitutedCode.mkString("\n"))
+ case _ =>
+ Right(s"Magics [${invalidMagics.mkString(", ")}] do not exist!")
+ }
+ }
+
+ /**
+ * Parses a given code blob and returns an equivalent blob with substitutions
+ * for magic invocations, if any, or an error string.
+ * @param codeBlob the blob of code to parse
+ * @return Left(parsed code) or Right(error message)
+ */
+ def parse(codeBlob: String): Either[String, String] = {
+ val trimCodeBlob = codeBlob.trim
+ isCellMagic(trimCodeBlob) match {
+ case true => parseCell(trimCodeBlob)
+ case false => parseLines(trimCodeBlob)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/magic/PostProcessor.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/magic/PostProcessor.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/magic/PostProcessor.scala
new file mode 100644
index 0000000..73c3c9f
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/magic/PostProcessor.scala
@@ -0,0 +1,35 @@
+package com.ibm.spark.kernel.protocol.v5.magic
+
+import com.ibm.spark.interpreter.{ExecuteOutput, Interpreter}
+import com.ibm.spark.kernel.protocol.v5.{Data, MIMEType}
+import com.ibm.spark.magic.{CellMagicOutput, LineMagicOutput}
+import com.ibm.spark.utils.LogLike
+
+class PostProcessor(interpreter: Interpreter) extends LogLike {
+ val defaultErr = "Something went wrong in postprocessor!"
+
+ def process(codeOutput: ExecuteOutput): Data = {
+ interpreter.lastExecutionVariableName.flatMap(interpreter.read) match {
+ case Some(l: Left[_, _]) => matchCellMagic(codeOutput, l)
+ case Some(r: Right[_, _]) => matchLineMagic(codeOutput, r)
+ case _ => Data(MIMEType.PlainText -> codeOutput)
+ }
+ }
+
+ protected[magic] def matchCellMagic(code: String, l: Left[_,_]) =
+ l.left.getOrElse(None) match {
+ case cmo: CellMagicOutput => cmo
+ case _ => Data(MIMEType.PlainText -> code)
+ }
+
+ protected[magic] def matchLineMagic(code: String, r: Right[_,_]) =
+ r.right.getOrElse(None) match {
+ case lmo: LineMagicOutput => processLineMagic(code)
+ case _ => Data(MIMEType.PlainText -> code)
+ }
+
+ protected[magic] def processLineMagic(code: String): Data = {
+ val parts = code.split("\n")
+ Data(MIMEType.PlainText -> parts.take(parts.size - 1).mkString("\n"))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/relay/ExecuteRequestRelay.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/relay/ExecuteRequestRelay.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/relay/ExecuteRequestRelay.scala
new file mode 100644
index 0000000..a1b846a
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/relay/ExecuteRequestRelay.scala
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.relay
+
+import java.io.OutputStream
+
+import akka.actor.Actor
+import akka.pattern._
+import akka.util.Timeout
+import com.ibm.spark.interpreter.{ExecuteAborted, ExecuteError, ExecuteFailure, ExecuteOutput}
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.kernel.protocol.v5.content._
+import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
+import com.ibm.spark.kernel.protocol.v5.magic.{PostProcessor, MagicParser}
+import com.ibm.spark.magic.MagicLoader
+import com.ibm.spark.utils.LogLike
+
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+case class ExecuteRequestRelay(
+ actorLoader: ActorLoader,
+ magicLoader: MagicLoader,
+ magicParser: MagicParser,
+ postProcessor: PostProcessor
+)
+ extends Actor with LogLike
+{
+ import context._
+ implicit val timeout = Timeout(21474835.seconds)
+
+ /**
+ * Takes an ExecuteFailure and (ExecuteReply, ExecuteResult) with contents
+ * dictated by the type of failure (either an error or an abort).
+ * @param failure the failure
+ * @return (ExecuteReply, ExecuteResult)
+ */
+ private def failureMatch(failure: ExecuteFailure) =
+ failure match {
+ case err: ExecuteError =>
+ val error = ExecuteReplyError(
+ 1, Some(err.name), Some(err.value), Some(err.stackTrace)
+ )
+ val result =
+ ExecuteResult(1, Data(MIMEType.PlainText -> err.toString), Metadata())
+ (error, result)
+
+ case _: ExecuteAborted =>
+ val abort = ExecuteReplyAbort(1)
+ val result = ExecuteResult(1, Data(), Metadata())
+ (abort, result)
+ }
+
+ /**
+ * Packages the response into an ExecuteReply,ExecuteResult tuple.
+ * @param future The future containing either the output or failure
+ * @return The tuple representing the proper response
+ */
+ private def packageFutureResponse(
+ future: Future[Either[ExecuteOutput, ExecuteFailure]]
+ ): Future[(ExecuteReply, ExecuteResult)] = future.map { value =>
+ if (value.isLeft) {
+ val output = value.left.get
+ val data = postProcessor.process(output)
+ (
+ ExecuteReplyOk(1, Some(Payloads()), Some(UserExpressions())),
+ ExecuteResult(1, data, Metadata())
+ )
+ } else {
+ failureMatch(value.right.get)
+ }
+ }
+
+ override def receive: Receive = {
+ case (executeRequest: ExecuteRequest, parentMessage: KernelMessage,
+ outputStream: OutputStream) =>
+ val interpreterActor = actorLoader.load(SystemActorType.Interpreter)
+
+ // Store our old sender so we don't lose it in the callback
+ // NOTE: Should point back to our KernelMessageRelay
+ val oldSender = sender()
+
+ // Sets the outputStream for this particular ExecuteRequest
+ magicLoader.dependencyMap.setOutputStream(outputStream)
+
+ // Parse the code for magics before sending it to the interpreter and
+ // pipe the response to sender
+ (magicParser.parse(executeRequest.code) match {
+ case Left(code) =>
+ val parsedRequest =
+ (executeRequest.copy(code = code), parentMessage, outputStream)
+ val interpreterFuture = (interpreterActor ? parsedRequest)
+ .mapTo[Either[ExecuteOutput, ExecuteFailure]]
+ packageFutureResponse(interpreterFuture)
+
+ case Right(error) =>
+ val failure = ExecuteError("Error parsing magics!", error, Nil)
+ Future { failureMatch(failure) }
+ }) pipeTo oldSender
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/relay/KernelMessageRelay.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/relay/KernelMessageRelay.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/relay/KernelMessageRelay.scala
new file mode 100644
index 0000000..cc45479
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/relay/KernelMessageRelay.scala
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.relay
+
+import akka.pattern.ask
+import akka.util.Timeout
+import com.ibm.spark.communication.security.SecurityActorType
+import com.ibm.spark.communication.utils.OrderedSupport
+import com.ibm.spark.kernel.protocol.v5.MessageType.MessageType
+import com.ibm.spark.kernel.protocol.v5.content.ShutdownRequest
+import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
+import com.ibm.spark.kernel.protocol.v5.{KernelMessage, MessageType, _}
+import com.ibm.spark.utils.MessageLogSupport
+import scala.collection.immutable.HashMap
+import scala.concurrent.duration._
+import scala.util.{Random, Failure, Success}
+
+/**
+ * This class is meant to be a relay for send KernelMessages through kernel
+ * system.
+ * @param actorLoader The ActorLoader used by this class for finding actors for
+ * relaying messages
+ * @param incomingSpecialCases The special cases for incoming messages
+ * @param outgoingSpecialCases The special cases for outgoing messages
+ * @param useSignatureManager Whether or not to use signature verification and
+ * generation
+ */
+case class KernelMessageRelay(
+ actorLoader: ActorLoader,
+ useSignatureManager: Boolean,
+ incomingSpecialCases: Map[String, String] = new HashMap[String, String](),
+ outgoingSpecialCases: Map[String, String] = new HashMap[String, String]()
+) extends OrderedSupport with MessageLogSupport {
+ // NOTE: Required to provide the execution context for futures with akka
+ import context._
+
+ // NOTE: Required for ask (?) to function... maybe can define elsewhere?
+ implicit val timeout = Timeout(5.seconds)
+
+ // Flag indicating if can receive messages (or add them to buffer)
+ var isReady = false
+
+ def this(actorLoader: ActorLoader) =
+ this(actorLoader, true)
+
+ /**
+ * Relays a KernelMessage to a specific actor to handle that message.
+ *
+ * @param messageType The enumeration representing the message type
+ * @param kernelMessage The message to relay
+ */
+ private def relay(messageType: MessageType, kernelMessage: KernelMessage) = {
+ logger.debug("Relaying message type of " + messageType.toString)
+ logKernelMessageAction("Relaying", kernelMessage)
+ actorLoader.load(messageType) ! kernelMessage
+ }
+
+ private def incomingRelay(kernelMessage: KernelMessage) = {
+ var messageTypeString = kernelMessage.header.msg_type
+
+ // If this is a special case, transform the message type accordingly
+ if (incomingSpecialCases.contains(messageTypeString)) {
+ logger.debug(s"$messageTypeString is a special incoming case!")
+ messageTypeString = incomingSpecialCases(messageTypeString)
+ }
+
+ relay(MessageType.withName(messageTypeString), kernelMessage)
+ }
+
+ private def outgoingRelay(kernelMessage: KernelMessage) = {
+ var messageTypeString = kernelMessage.header.msg_type
+
+ // If this is a special case, transform the message type accordingly
+ if (outgoingSpecialCases.contains(messageTypeString)) {
+ logger.debug(s"$messageTypeString is a special outgoing case!")
+ messageTypeString = outgoingSpecialCases(messageTypeString)
+ }
+
+ relay(MessageType.withName(messageTypeString), kernelMessage)
+ }
+
+
+ /**
+ * This actor will receive and handle two types; ZMQMessage and KernelMessage.
+ * These messages will be forwarded to the actors that are responsible for them.
+ */
+ override def receive = {
+ // TODO: How to restore this when the actor dies?
+ // Update ready status
+ case ready: Boolean =>
+ isReady = ready
+ if (isReady) {
+ logger.info("Unstashing all messages received!")
+ unstashAll()
+ logger.info("Relay is now fully ready to receive messages!")
+ } else {
+ logger.info("Relay is now disabled!")
+ }
+
+
+ // Add incoming messages (when not ready) to buffer to be processed
+ case (zmqStrings: Seq[_], kernelMessage: KernelMessage) if !isReady && kernelMessage.header.msg_type != ShutdownRequest.toTypeString =>
+ logger.info("Not ready for messages! Stashing until ready!")
+ stash()
+
+ // Assuming these messages are incoming messages
+ case (zmqStrings: Seq[_], kernelMessage: KernelMessage) if isReady || kernelMessage.header.msg_type == ShutdownRequest.toTypeString =>
+ startProcessing()
+ if (useSignatureManager) {
+ logger.trace(s"Verifying signature for incoming message " +
+ s"${kernelMessage.header.msg_id}")
+ val signatureManager =
+ actorLoader.load(SecurityActorType.SignatureManager)
+ val signatureVerificationFuture = signatureManager ? (
+ (kernelMessage.signature, zmqStrings)
+ )
+
+ signatureVerificationFuture.mapTo[Boolean].onComplete {
+ case Success(true) =>
+ incomingRelay(kernelMessage)
+ finishedProcessing()
+ case Success(false) =>
+ // TODO: Figure out what the failure message structure should be!
+ logger.error(s"Invalid signature received from message " +
+ s"${kernelMessage.header.msg_id}!")
+ finishedProcessing()
+ case Failure(t) =>
+ logger.error("Failure when verifying signature!", t)
+ finishedProcessing()
+ }
+ } else {
+ logger.debug(s"Relaying incoming message " +
+ s"${kernelMessage.header.msg_id} without SignatureManager")
+ incomingRelay(kernelMessage)
+ finishedProcessing()
+ }
+
+ // Assuming all kernel messages without zmq strings are outgoing
+ case kernelMessage: KernelMessage =>
+ startProcessing()
+ if (useSignatureManager) {
+ logger.trace(s"Creating signature for outgoing message " +
+ s"${kernelMessage.header.msg_id}")
+ val signatureManager = actorLoader.load(SecurityActorType.SignatureManager)
+ val signatureInsertFuture = signatureManager ? kernelMessage
+
+ // TODO: Handle error case for mapTo and non-present onFailure
+ signatureInsertFuture.mapTo[KernelMessage] onSuccess {
+ case message =>
+ outgoingRelay(message)
+ finishedProcessing()
+ }
+ } else {
+ logger.debug(s"Relaying outgoing message " +
+ s"${kernelMessage.header.msg_id} without SignatureManager")
+ outgoingRelay(kernelMessage)
+ finishedProcessing()
+ }
+ }
+
+ override def orderedTypes(): Seq[Class[_]] = Seq(
+ classOf[(Seq[_], KernelMessage)],
+ classOf[KernelMessage]
+ )
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelInputStream.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelInputStream.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelInputStream.scala
new file mode 100644
index 0000000..e57fd84
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelInputStream.scala
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.stream
+
+import java.io.InputStream
+import java.nio.charset.Charset
+
+import akka.pattern.ask
+import com.ibm.spark.kernel.protocol.v5.content.InputRequest
+import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
+import com.ibm.spark.kernel.protocol.v5.kernel.Utilities.timeout
+import com.ibm.spark.kernel.protocol.v5.{KMBuilder, MessageType}
+
+import scala.collection.mutable.ListBuffer
+import scala.concurrent.{Await, Future}
+
+import KernelInputStream._
+
+object KernelInputStream {
+ val DefaultPrompt = ""
+ val DefaultPassword = false
+}
+
+/**
+ * Represents an OutputStream that sends data back to the clients connect to the
+ * kernel instance.
+ *
+ * @param actorLoader The actor loader used to access the message relay
+ * @param kmBuilder The KMBuilder used to construct outgoing kernel messages
+ * @param prompt The prompt to use for input requests
+ * @param password Whether or not the input request is for a password
+ */
+class KernelInputStream(
+ actorLoader: ActorLoader,
+ kmBuilder: KMBuilder,
+ prompt: String = DefaultPrompt,
+ password: Boolean = DefaultPassword
+) extends InputStream {
+ private val EncodingType = Charset.forName("UTF-8")
+ @volatile private var internalBytes: ListBuffer[Byte] = ListBuffer()
+
+ /**
+ * Returns the number of bytes available before the next request is made
+ * for more data.
+ * @return The total number of bytes in the internal buffer
+ */
+ override def available(): Int = internalBytes.length
+
+ /**
+ * Requests the next byte of data from the client. If the buffer containing
+ * @return The byte of data as an integer
+ */
+ override def read(): Int = {
+ if (!this.hasByte) this.requestBytes()
+
+ this.nextByte()
+ }
+
+ private def hasByte: Boolean = internalBytes.nonEmpty
+
+ private def nextByte(): Int = {
+ val byte = internalBytes.head
+
+ internalBytes = internalBytes.tail
+
+ byte
+ }
+
+ private def requestBytes(): Unit = {
+ val inputRequest = InputRequest(prompt, password)
+ // NOTE: Assuming already provided parent header and correct ids
+ val kernelMessage = kmBuilder
+ .withHeader(MessageType.Outgoing.InputRequest)
+ .withContentString(inputRequest)
+ .build
+
+ // NOTE: The same handler is being used in both request and reply
+ val responseFuture: Future[String] =
+ (actorLoader.load(MessageType.Incoming.InputReply) ? kernelMessage)
+ .mapTo[String]
+
+ // Block until we get a response
+ import scala.concurrent.duration._
+ internalBytes ++=
+ Await.result(responseFuture, Duration.Inf).getBytes(EncodingType)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOutputStream.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOutputStream.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOutputStream.scala
new file mode 100644
index 0000000..56b0cbb
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOutputStream.scala
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.stream
+
+import java.io.OutputStream
+import java.nio.charset.Charset
+
+import com.ibm.spark.kernel.protocol.v5.content.StreamContent
+import com.ibm.spark.kernel.protocol.v5.{SystemActorType, MessageType, KMBuilder}
+import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
+import com.ibm.spark.utils.{LogLike, ScheduledTaskManager}
+import scala.collection.mutable.ListBuffer
+import KernelOutputStream._
+
+object KernelOutputStream {
+ val DefaultStreamType = "stdout"
+ val DefaultSendEmptyOutput = false
+}
+
+/**
+ * Represents an OutputStream that sends data back to the clients connect to the
+ * kernel instance.
+ *
+ * @param actorLoader The actor loader used to access the message relay
+ * @param kmBuilder The KMBuilder used to construct outgoing kernel messages
+ * @param scheduledTaskManager The task manager used to schedule periodic
+ * flushes to send data across the wire
+ * @param streamType The type of stream (stdout/stderr)
+ * @param sendEmptyOutput If true, will allow empty output to be flushed and
+ * sent out to listening clients
+ */
+class KernelOutputStream(
+ private val actorLoader: ActorLoader,
+ private val kmBuilder: KMBuilder,
+ private val scheduledTaskManager: ScheduledTaskManager,
+ private val streamType: String = DefaultStreamType,
+ private val sendEmptyOutput: Boolean = DefaultSendEmptyOutput
+) extends OutputStream with LogLike {
+ private val EncodingType = Charset.forName("UTF-8")
+ @volatile private var internalBytes: ListBuffer[Byte] = ListBuffer()
+
+ private var taskId: String = _
+
+ private def enableAutoFlush() =
+ if (taskId == null) {
+ logger.trace("Enabling auto flush")
+ taskId = scheduledTaskManager.addTask(task = this.flush())
+ }
+
+ private def disableAutoFlush() =
+ if (taskId != null) {
+ logger.trace("Disabling auto flush")
+ scheduledTaskManager.removeTask(taskId)
+ taskId = null
+ }
+
+ /**
+ * Takes the current byte array contents in memory, packages them up into a
+ * KernelMessage, and sends the message to the KernelMessageRelay.
+ */
+ override def flush(): Unit = {
+ val contents = internalBytes.synchronized {
+ logger.trace("Getting content to flush")
+ val bytesToString = new String(internalBytes.toArray, EncodingType)
+
+ // Clear the internal buffer
+ internalBytes.clear()
+
+ // Stop the auto-flushing
+ disableAutoFlush()
+
+ bytesToString
+ }
+
+ // Avoid building and sending a kernel message if the contents (when
+ // trimmed) are empty and the flag to send anyway is disabled
+ if (!sendEmptyOutput && contents.trim.isEmpty) {
+ val contentsWithVisibleWhitespace = contents
+ .replace("\n", "\\n")
+ .replace("\t", "\\t")
+ .replace("\r", "\\r")
+ .replace(" ", "\\s")
+ logger.warn(s"Suppressing empty output: '$contentsWithVisibleWhitespace'")
+ return
+ }
+
+ logger.trace(s"Content to flush: '$contents'")
+
+ val streamContent = StreamContent(
+ streamType, contents
+ )
+
+ val kernelMessage = kmBuilder
+ .withIds(Seq(MessageType.Outgoing.Stream.toString))
+ .withHeader(MessageType.Outgoing.Stream)
+ .withContentString(streamContent).build
+
+ actorLoader.load(SystemActorType.KernelMessageRelay) ! kernelMessage
+
+ // Ensure any underlying implementation is processed
+ super.flush()
+ }
+
+ /**
+ * Adds the specified byte to the end of the internal buffer. The most
+ * significant 24 bits are ignored. Only the least significant 8 bits
+ * are appended.
+ * @param b The byte whose least significant 8 bits are to be appended
+ */
+ override def write(b: Int): Unit = internalBytes.synchronized {
+ // Begin periodic flushing if this is a new set of bytes
+ enableAutoFlush()
+
+ internalBytes += b.toByte
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/magic/builtin/AddDeps.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/AddDeps.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/AddDeps.scala
new file mode 100644
index 0000000..5555c08
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/AddDeps.scala
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.magic.builtin
+
+import java.io.PrintStream
+
+import com.ibm.spark.magic._
+import com.ibm.spark.magic.dependencies._
+import com.ibm.spark.utils.ArgumentParsingSupport
+
+class AddDeps extends LineMagic with IncludeInterpreter
+ with IncludeOutputStream with IncludeSparkContext with ArgumentParsingSupport
+ with IncludeDependencyDownloader with IncludeKernel
+{
+
+ private lazy val printStream = new PrintStream(outputStream)
+
+ val _transitive =
+ parser.accepts("transitive", "retrieve dependencies recursively")
+
+ /**
+ * Execute a magic representing a line magic.
+ * @param code The single line of code
+ * @return The output of the magic
+ */
+ override def execute(code: String): Unit = {
+ val nonOptionArgs = parseArgs(code)
+ dependencyDownloader.setPrintStream(printStream)
+
+ // TODO: require a version or use the most recent if omitted?
+ if (nonOptionArgs.size == 3) {
+ // get the jars and hold onto the paths at which they reside
+ val urls = dependencyDownloader.retrieve(
+ nonOptionArgs(0), nonOptionArgs(1), nonOptionArgs(2), _transitive)
+
+ // add the jars to the interpreter and spark context
+ interpreter.addJars(urls:_*)
+ urls.foreach(url => sparkContext.addJar(url.getPath))
+ } else {
+ printHelp(printStream, """%AddDeps my.company artifact-id version""")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/magic/builtin/AddJar.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/AddJar.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/AddJar.scala
new file mode 100644
index 0000000..bdfdbe2
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/AddJar.scala
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.magic.builtin
+
+import java.io.{File, PrintStream}
+import java.net.URL
+import java.nio.file.{Files, Paths}
+
+import com.ibm.spark.magic._
+import com.ibm.spark.magic.builtin.AddJar._
+import com.ibm.spark.magic.dependencies._
+import com.ibm.spark.utils.{ArgumentParsingSupport, DownloadSupport, LogLike}
+import com.typesafe.config.Config
+
+object AddJar {
+
+ private var jarDir:Option[String] = None
+ def getJarDir(config: Config): String = {
+ jarDir.getOrElse({
+ jarDir = Some(
+ if(config.hasPath("jar_dir") && Files.exists(Paths.get(config.getString("jar_dir")))) {
+ config.getString("jar_dir")
+ } else {
+ Files.createTempDirectory("spark_kernel_add_jars").toFile.getAbsolutePath
+ }
+ )
+ jarDir.get
+ })
+ }
+}
+
+class AddJar
+ extends LineMagic with IncludeInterpreter with IncludeSparkContext
+ with IncludeOutputStream with DownloadSupport with ArgumentParsingSupport
+ with IncludeKernel with IncludeMagicLoader with IncludeConfig with LogLike
+{
+ // Option to mark re-downloading of jars
+ private val _force =
+ parser.accepts("f", "forces re-download of specified jar")
+
+ // Option to mark re-downloading of jars
+ private val _magic =
+ parser.accepts("magic", "loads jar as a magic extension")
+
+ // Lazy because the outputStream is not provided at construction
+ private lazy val printStream = new PrintStream(outputStream)
+
+ /**
+ * Retrieves file name from URL.
+ *
+ * @param location The remote location (URL)
+ * @return The name of the remote URL, or an empty string if one does not exist
+ */
+ def getFileFromLocation(location: String): String = {
+ val url = new URL(location)
+ val file = url.getFile.split("/")
+ if (file.length > 0) {
+ file.last
+ } else {
+ ""
+ }
+ }
+
+ /**
+ * Downloads and adds the specified jar to the
+ * interpreter/compiler/cluster classpaths.
+ *
+ * @param code The line containing the location of the jar
+ */
+ override def execute(code: String): Unit = {
+ val nonOptionArgs = parseArgs(code.trim)
+
+ // Check valid arguments
+ if (nonOptionArgs.length != 1) {
+ printHelp(printStream, """%AddJar <jar_url>""")
+ return
+ }
+
+ // Check if the jar we want to download is valid
+ val jarRemoteLocation = nonOptionArgs(0)
+ if (jarRemoteLocation.isEmpty) {
+ printHelp(printStream, """%AddJar <jar_url>""")
+ return
+ }
+
+ // Get the destination of the jar
+ val jarName = getFileFromLocation(jarRemoteLocation)
+
+ // Ensure the URL actually contains a jar or zip file
+ if (!jarName.endsWith(".jar") && !jarName.endsWith(".zip")) {
+ throw new IllegalArgumentException(s"The jar file $jarName must end in .jar or .zip.")
+ }
+
+ val downloadLocation = getJarDir(config) + "/" + jarName
+
+ logger.debug( "Downloading jar to %s".format(downloadLocation) )
+
+ val fileDownloadLocation = new File(downloadLocation)
+
+ // Check if exists in cache or force applied
+ if (_force || !fileDownloadLocation.exists()) {
+ // Report beginning of download
+ printStream.println(s"Starting download from $jarRemoteLocation")
+
+ downloadFile(
+ new URL(jarRemoteLocation),
+ new File(downloadLocation).toURI.toURL
+ )
+
+ // Report download finished
+ printStream.println(s"Finished download of $jarName")
+ } else {
+ printStream.println(s"Using cached version of $jarName")
+ }
+
+
+ if (_magic)
+ {
+
+ magicLoader.addJar(fileDownloadLocation.toURI.toURL)
+
+ }
+ else
+ {
+ interpreter.addJars(fileDownloadLocation.toURI.toURL)
+ sparkContext.addJar(fileDownloadLocation.getCanonicalPath)
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/magic/builtin/BuiltinLoader.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/BuiltinLoader.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/BuiltinLoader.scala
new file mode 100644
index 0000000..aecc90f
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/BuiltinLoader.scala
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.magic.builtin
+
+import com.google.common.reflect.ClassPath
+import com.google.common.reflect.ClassPath.ClassInfo
+import com.ibm.spark.magic.InternalClassLoader
+import com.google.common.base.Strings._
+import scala.collection.JavaConversions._
+
+/**
+ * Represents a class loader that loads classes from the builtin package.
+ */
+class BuiltinLoader
+ extends InternalClassLoader(classOf[BuiltinLoader].getClassLoader) {
+
+ private val pkgName = this.getClass.getPackage.getName
+
+ /**
+ * Provides a list of ClassInfo objects for each class in the specified
+ * package.
+ * @param pkg package name
+ * @return list of ClassInfo objects
+ */
+ def getClasses(pkg: String = pkgName): List[ClassInfo] = {
+ isNullOrEmpty(pkg) match {
+ case true =>
+ List()
+ case false =>
+ // TODO: Decide if this.getClass.getClassLoader should just be this
+ val classPath = ClassPath.from(this.getClass.getClassLoader)
+ classPath.getTopLevelClasses(pkg).filter(
+ _.getSimpleName != this.getClass.getSimpleName
+ ).toList
+ }
+ }
+
+ /**
+ * Provides a list of Class[_] objects for each class in the specified
+ * package.
+ * @param pkg package name
+ * @return list of Class[_] objects
+ */
+ def loadClasses(pkg: String = pkgName): List[Class[_]] =
+ getClasses(pkg).map(_.load())
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/magic/builtin/Html.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/Html.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/Html.scala
new file mode 100644
index 0000000..95fa31a
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/Html.scala
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.magic.builtin
+
+import java.io.PrintStream
+
+import com.ibm.spark.kernel.protocol.v5.MIMEType
+import com.ibm.spark.magic._
+import com.ibm.spark.magic.dependencies.IncludeOutputStream
+import com.ibm.spark.utils.ArgumentParsingSupport
+import com.google.common.base.Strings
+
+class Html extends CellMagic with ArgumentParsingSupport
+ with IncludeOutputStream {
+
+ // Lazy because the outputStream is not provided at construction
+ private lazy val printStream = new PrintStream(outputStream)
+
+ override def execute(code: String): CellMagicOutput = {
+ def printHelpAndReturn: CellMagicOutput = {
+ printHelp(printStream, """%%Html <string_code>""")
+ CellMagicOutput()
+ }
+
+ Strings.isNullOrEmpty(code) match {
+ case true => printHelpAndReturn
+ case false => CellMagicOutput(MIMEType.TextHtml -> code)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/magic/builtin/JavaScript.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/JavaScript.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/JavaScript.scala
new file mode 100644
index 0000000..42772c1
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/JavaScript.scala
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.magic.builtin
+
+import java.io.PrintStream
+
+import com.google.common.base.Strings
+import com.ibm.spark.kernel.protocol.v5.MIMEType
+import com.ibm.spark.magic._
+import com.ibm.spark.magic.dependencies.IncludeOutputStream
+import com.ibm.spark.utils.ArgumentParsingSupport
+import org.slf4j.LoggerFactory
+
+class JavaScript extends CellMagic with ArgumentParsingSupport
+ with IncludeOutputStream {
+
+ // Lazy because the outputStream is not provided at construction
+ private lazy val printStream = new PrintStream(outputStream)
+
+ override def execute(code: String): CellMagicOutput = {
+ def printHelpAndReturn: CellMagicOutput = {
+ printHelp(printStream, """%JavaScript <string_code>""")
+ CellMagicOutput()
+ }
+
+ Strings.isNullOrEmpty(code) match {
+ case true => printHelpAndReturn
+ case false => CellMagicOutput(MIMEType.ApplicationJavaScript -> code)
+ }
+ }
+}