You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lb...@apache.org on 2016/01/22 23:07:26 UTC
[18/51] [abbrv] incubator-toree git commit: Moved scala files to new
locations based on new package
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/magic/MagicParser.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/magic/MagicParser.scala b/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/magic/MagicParser.scala
deleted file mode 100644
index a24c062..0000000
--- a/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/magic/MagicParser.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-package com.ibm.spark.kernel.protocol.v5.magic
-
-import com.ibm.spark.magic.MagicLoader
-
-class MagicParser(magicLoader: MagicLoader) {
- private val magicRegex = """^[%]{1,2}(\w+)""".r
- protected[magic] val kernelObjectName = "kernel.magics"
-
- /**
- * Determines whether a given line of code represents a line magic.
- * @param codeLine a single line of code
- * @return
- */
- private def isLineMagic(codeLine: String) = codeLine.startsWith("%") &&
- !isCellMagic(codeLine)
-
- /**
- * Determines whether a given string of code represents a cell magic.
- * @param codeBlob a string of code separated by newlines
- * @return
- */
- private def isCellMagic(codeBlob: String) = codeBlob.startsWith("%%")
-
- /**
- * Finds the first occurrence of a "magic string" i.e. "%%magic" or "%magic"
- * in a given code string, and separates the magic name from the code that
- * follows it.
- *
- * E.g.
- * "%magic foo bar" -> ("magic", "foo bar")
- * @param codeBlob a string of code separated by newlines
- * @return (magicName, args)
- */
- protected[magic] def parseMagic(codeBlob: String): Option[(String, String)] = {
- val matchData =
- magicRegex.findFirstMatchIn(codeBlob)
-
- matchData match {
- case Some(m) => Some((m.group(1), m.after(1).toString.trim))
- case None => None
- }
- }
-
- /**
- * Given a line of code representing a magic invocation determines whether
- * the magic has an implementation.
- * @param codeLine a single line of code
- * @return true if the magic exists and is a line magic
- */
- protected[magic] def isValidLineMagic(codeLine: String): Boolean = {
- parseMagic(codeLine) match {
- case Some((magicName, _)) =>
- isLineMagic(codeLine) && magicLoader.hasLineMagic(magicName)
- case None => false
- }
- }
-
- /**
- * Given a blob of code, finds any magic invocations of magics that don't
- * exist.
- * @param codeBlob a string of code separated by newlines
- * @return invalid magic names from the given code blob
- */
- protected[magic] def parseOutInvalidMagics(codeBlob: String): List[String] = {
- val lineMagics = codeBlob.split("\n").toList.filter(isLineMagic)
- lineMagics.filterNot(isValidLineMagic).map(line => {
- val (magicName, _) = parseMagic(line).get
- magicName
- })
- }
-
- /**
- * Formats a given magic name and args to code for a kernel method call.
- * @param magicName the name of the magic
- * @param args the arguments to the magic
- * @return equivalent kernel method call
- */
- protected[magic] def substitute(magicName: String, args: String): String =
- s"""$kernelObjectName.$magicName(\"\"\"$args\"\"\")"""
-
- /**
- * Formats a given line of code representing a line magic invocation into an
- * equivalent kernel object call if the magic invocation is valid.
- * @param codeLine the line of code to convert.
- * @return a substituted line of code if valid else the original line
- */
- protected[magic] def substituteLine(codeLine: String): String = {
- isValidLineMagic(codeLine) match {
- case true =>
- val (magicName, args) = parseMagic(codeLine).get
- substitute(magicName, args)
- case false => codeLine
- }
- }
-
- /**
- * Formats a given code blob representing a cell magic invocation into an
- * equivalent kernel object call if the cell magic invocation is valid. An
- * error message is returned if not.
- * @param codeBlob the blob of code representing a cell magic invocation
- * @return Left(the substituted code) or Right(error message)
- */
- protected[magic] def parseCell(codeBlob: String): Either[String, String] = {
- parseMagic(codeBlob.trim) match {
- case Some((cellMagicName, args)) =>
- magicLoader.hasCellMagic(cellMagicName) match {
- case true => Left(substitute(cellMagicName, args))
- case false => Right(s"Magic $cellMagicName does not exist!")
- }
- case None => Left(codeBlob)
- }
- }
-
- /**
- * Parses all lines in a given code blob and either substitutes equivalent
- * kernel object calls for each line magic in the code blob OR returns
- * an error message if any of the line magic invocations were invalid.
- * @param codeBlob a string of code separated by newlines
- * @return Left(code blob with substitutions) or Right(error message)
- */
- protected[magic] def parseLines(codeBlob: String): Either[String, String] = {
- val invalidMagics = parseOutInvalidMagics(codeBlob.trim)
- invalidMagics match {
- case Nil =>
- val substitutedCode = codeBlob.trim.split("\n").map(substituteLine)
- Left(substitutedCode.mkString("\n"))
- case _ =>
- Right(s"Magics [${invalidMagics.mkString(", ")}] do not exist!")
- }
- }
-
- /**
- * Parses a given code blob and returns an equivalent blob with substitutions
- * for magic invocations, if any, or an error string.
- * @param codeBlob the blob of code to parse
- * @return Left(parsed code) or Right(error message)
- */
- def parse(codeBlob: String): Either[String, String] = {
- val trimCodeBlob = codeBlob.trim
- isCellMagic(trimCodeBlob) match {
- case true => parseCell(trimCodeBlob)
- case false => parseLines(trimCodeBlob)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/magic/PostProcessor.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/magic/PostProcessor.scala b/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/magic/PostProcessor.scala
deleted file mode 100644
index 73c3c9f..0000000
--- a/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/magic/PostProcessor.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-package com.ibm.spark.kernel.protocol.v5.magic
-
-import com.ibm.spark.interpreter.{ExecuteOutput, Interpreter}
-import com.ibm.spark.kernel.protocol.v5.{Data, MIMEType}
-import com.ibm.spark.magic.{CellMagicOutput, LineMagicOutput}
-import com.ibm.spark.utils.LogLike
-
-class PostProcessor(interpreter: Interpreter) extends LogLike {
- val defaultErr = "Something went wrong in postprocessor!"
-
- def process(codeOutput: ExecuteOutput): Data = {
- interpreter.lastExecutionVariableName.flatMap(interpreter.read) match {
- case Some(l: Left[_, _]) => matchCellMagic(codeOutput, l)
- case Some(r: Right[_, _]) => matchLineMagic(codeOutput, r)
- case _ => Data(MIMEType.PlainText -> codeOutput)
- }
- }
-
- protected[magic] def matchCellMagic(code: String, l: Left[_,_]) =
- l.left.getOrElse(None) match {
- case cmo: CellMagicOutput => cmo
- case _ => Data(MIMEType.PlainText -> code)
- }
-
- protected[magic] def matchLineMagic(code: String, r: Right[_,_]) =
- r.right.getOrElse(None) match {
- case lmo: LineMagicOutput => processLineMagic(code)
- case _ => Data(MIMEType.PlainText -> code)
- }
-
- protected[magic] def processLineMagic(code: String): Data = {
- val parts = code.split("\n")
- Data(MIMEType.PlainText -> parts.take(parts.size - 1).mkString("\n"))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/relay/ExecuteRequestRelay.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/relay/ExecuteRequestRelay.scala b/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/relay/ExecuteRequestRelay.scala
deleted file mode 100644
index a1b846a..0000000
--- a/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/relay/ExecuteRequestRelay.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.kernel.protocol.v5.relay
-
-import java.io.OutputStream
-
-import akka.actor.Actor
-import akka.pattern._
-import akka.util.Timeout
-import com.ibm.spark.interpreter.{ExecuteAborted, ExecuteError, ExecuteFailure, ExecuteOutput}
-import com.ibm.spark.kernel.protocol.v5._
-import com.ibm.spark.kernel.protocol.v5.content._
-import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
-import com.ibm.spark.kernel.protocol.v5.magic.{PostProcessor, MagicParser}
-import com.ibm.spark.magic.MagicLoader
-import com.ibm.spark.utils.LogLike
-
-import scala.concurrent.Future
-import scala.concurrent.duration._
-
-case class ExecuteRequestRelay(
- actorLoader: ActorLoader,
- magicLoader: MagicLoader,
- magicParser: MagicParser,
- postProcessor: PostProcessor
-)
- extends Actor with LogLike
-{
- import context._
- implicit val timeout = Timeout(21474835.seconds)
-
- /**
- * Takes an ExecuteFailure and (ExecuteReply, ExecuteResult) with contents
- * dictated by the type of failure (either an error or an abort).
- * @param failure the failure
- * @return (ExecuteReply, ExecuteResult)
- */
- private def failureMatch(failure: ExecuteFailure) =
- failure match {
- case err: ExecuteError =>
- val error = ExecuteReplyError(
- 1, Some(err.name), Some(err.value), Some(err.stackTrace)
- )
- val result =
- ExecuteResult(1, Data(MIMEType.PlainText -> err.toString), Metadata())
- (error, result)
-
- case _: ExecuteAborted =>
- val abort = ExecuteReplyAbort(1)
- val result = ExecuteResult(1, Data(), Metadata())
- (abort, result)
- }
-
- /**
- * Packages the response into an ExecuteReply,ExecuteResult tuple.
- * @param future The future containing either the output or failure
- * @return The tuple representing the proper response
- */
- private def packageFutureResponse(
- future: Future[Either[ExecuteOutput, ExecuteFailure]]
- ): Future[(ExecuteReply, ExecuteResult)] = future.map { value =>
- if (value.isLeft) {
- val output = value.left.get
- val data = postProcessor.process(output)
- (
- ExecuteReplyOk(1, Some(Payloads()), Some(UserExpressions())),
- ExecuteResult(1, data, Metadata())
- )
- } else {
- failureMatch(value.right.get)
- }
- }
-
- override def receive: Receive = {
- case (executeRequest: ExecuteRequest, parentMessage: KernelMessage,
- outputStream: OutputStream) =>
- val interpreterActor = actorLoader.load(SystemActorType.Interpreter)
-
- // Store our old sender so we don't lose it in the callback
- // NOTE: Should point back to our KernelMessageRelay
- val oldSender = sender()
-
- // Sets the outputStream for this particular ExecuteRequest
- magicLoader.dependencyMap.setOutputStream(outputStream)
-
- // Parse the code for magics before sending it to the interpreter and
- // pipe the response to sender
- (magicParser.parse(executeRequest.code) match {
- case Left(code) =>
- val parsedRequest =
- (executeRequest.copy(code = code), parentMessage, outputStream)
- val interpreterFuture = (interpreterActor ? parsedRequest)
- .mapTo[Either[ExecuteOutput, ExecuteFailure]]
- packageFutureResponse(interpreterFuture)
-
- case Right(error) =>
- val failure = ExecuteError("Error parsing magics!", error, Nil)
- Future { failureMatch(failure) }
- }) pipeTo oldSender
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/relay/KernelMessageRelay.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/relay/KernelMessageRelay.scala b/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/relay/KernelMessageRelay.scala
deleted file mode 100644
index cc45479..0000000
--- a/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/relay/KernelMessageRelay.scala
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.kernel.protocol.v5.relay
-
-import akka.pattern.ask
-import akka.util.Timeout
-import com.ibm.spark.communication.security.SecurityActorType
-import com.ibm.spark.communication.utils.OrderedSupport
-import com.ibm.spark.kernel.protocol.v5.MessageType.MessageType
-import com.ibm.spark.kernel.protocol.v5.content.ShutdownRequest
-import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
-import com.ibm.spark.kernel.protocol.v5.{KernelMessage, MessageType, _}
-import com.ibm.spark.utils.MessageLogSupport
-import scala.collection.immutable.HashMap
-import scala.concurrent.duration._
-import scala.util.{Random, Failure, Success}
-
-/**
- * This class is meant to be a relay for send KernelMessages through kernel
- * system.
- * @param actorLoader The ActorLoader used by this class for finding actors for
- * relaying messages
- * @param incomingSpecialCases The special cases for incoming messages
- * @param outgoingSpecialCases The special cases for outgoing messages
- * @param useSignatureManager Whether or not to use signature verification and
- * generation
- */
-case class KernelMessageRelay(
- actorLoader: ActorLoader,
- useSignatureManager: Boolean,
- incomingSpecialCases: Map[String, String] = new HashMap[String, String](),
- outgoingSpecialCases: Map[String, String] = new HashMap[String, String]()
-) extends OrderedSupport with MessageLogSupport {
- // NOTE: Required to provide the execution context for futures with akka
- import context._
-
- // NOTE: Required for ask (?) to function... maybe can define elsewhere?
- implicit val timeout = Timeout(5.seconds)
-
- // Flag indicating if can receive messages (or add them to buffer)
- var isReady = false
-
- def this(actorLoader: ActorLoader) =
- this(actorLoader, true)
-
- /**
- * Relays a KernelMessage to a specific actor to handle that message.
- *
- * @param messageType The enumeration representing the message type
- * @param kernelMessage The message to relay
- */
- private def relay(messageType: MessageType, kernelMessage: KernelMessage) = {
- logger.debug("Relaying message type of " + messageType.toString)
- logKernelMessageAction("Relaying", kernelMessage)
- actorLoader.load(messageType) ! kernelMessage
- }
-
- private def incomingRelay(kernelMessage: KernelMessage) = {
- var messageTypeString = kernelMessage.header.msg_type
-
- // If this is a special case, transform the message type accordingly
- if (incomingSpecialCases.contains(messageTypeString)) {
- logger.debug(s"$messageTypeString is a special incoming case!")
- messageTypeString = incomingSpecialCases(messageTypeString)
- }
-
- relay(MessageType.withName(messageTypeString), kernelMessage)
- }
-
- private def outgoingRelay(kernelMessage: KernelMessage) = {
- var messageTypeString = kernelMessage.header.msg_type
-
- // If this is a special case, transform the message type accordingly
- if (outgoingSpecialCases.contains(messageTypeString)) {
- logger.debug(s"$messageTypeString is a special outgoing case!")
- messageTypeString = outgoingSpecialCases(messageTypeString)
- }
-
- relay(MessageType.withName(messageTypeString), kernelMessage)
- }
-
-
- /**
- * This actor will receive and handle two types; ZMQMessage and KernelMessage.
- * These messages will be forwarded to the actors that are responsible for them.
- */
- override def receive = {
- // TODO: How to restore this when the actor dies?
- // Update ready status
- case ready: Boolean =>
- isReady = ready
- if (isReady) {
- logger.info("Unstashing all messages received!")
- unstashAll()
- logger.info("Relay is now fully ready to receive messages!")
- } else {
- logger.info("Relay is now disabled!")
- }
-
-
- // Add incoming messages (when not ready) to buffer to be processed
- case (zmqStrings: Seq[_], kernelMessage: KernelMessage) if !isReady && kernelMessage.header.msg_type != ShutdownRequest.toTypeString =>
- logger.info("Not ready for messages! Stashing until ready!")
- stash()
-
- // Assuming these messages are incoming messages
- case (zmqStrings: Seq[_], kernelMessage: KernelMessage) if isReady || kernelMessage.header.msg_type == ShutdownRequest.toTypeString =>
- startProcessing()
- if (useSignatureManager) {
- logger.trace(s"Verifying signature for incoming message " +
- s"${kernelMessage.header.msg_id}")
- val signatureManager =
- actorLoader.load(SecurityActorType.SignatureManager)
- val signatureVerificationFuture = signatureManager ? (
- (kernelMessage.signature, zmqStrings)
- )
-
- signatureVerificationFuture.mapTo[Boolean].onComplete {
- case Success(true) =>
- incomingRelay(kernelMessage)
- finishedProcessing()
- case Success(false) =>
- // TODO: Figure out what the failure message structure should be!
- logger.error(s"Invalid signature received from message " +
- s"${kernelMessage.header.msg_id}!")
- finishedProcessing()
- case Failure(t) =>
- logger.error("Failure when verifying signature!", t)
- finishedProcessing()
- }
- } else {
- logger.debug(s"Relaying incoming message " +
- s"${kernelMessage.header.msg_id} without SignatureManager")
- incomingRelay(kernelMessage)
- finishedProcessing()
- }
-
- // Assuming all kernel messages without zmq strings are outgoing
- case kernelMessage: KernelMessage =>
- startProcessing()
- if (useSignatureManager) {
- logger.trace(s"Creating signature for outgoing message " +
- s"${kernelMessage.header.msg_id}")
- val signatureManager = actorLoader.load(SecurityActorType.SignatureManager)
- val signatureInsertFuture = signatureManager ? kernelMessage
-
- // TODO: Handle error case for mapTo and non-present onFailure
- signatureInsertFuture.mapTo[KernelMessage] onSuccess {
- case message =>
- outgoingRelay(message)
- finishedProcessing()
- }
- } else {
- logger.debug(s"Relaying outgoing message " +
- s"${kernelMessage.header.msg_id} without SignatureManager")
- outgoingRelay(kernelMessage)
- finishedProcessing()
- }
- }
-
- override def orderedTypes(): Seq[Class[_]] = Seq(
- classOf[(Seq[_], KernelMessage)],
- classOf[KernelMessage]
- )
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/stream/KernelInputStream.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/stream/KernelInputStream.scala b/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/stream/KernelInputStream.scala
deleted file mode 100644
index e57fd84..0000000
--- a/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/stream/KernelInputStream.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.kernel.protocol.v5.stream
-
-import java.io.InputStream
-import java.nio.charset.Charset
-
-import akka.pattern.ask
-import com.ibm.spark.kernel.protocol.v5.content.InputRequest
-import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
-import com.ibm.spark.kernel.protocol.v5.kernel.Utilities.timeout
-import com.ibm.spark.kernel.protocol.v5.{KMBuilder, MessageType}
-
-import scala.collection.mutable.ListBuffer
-import scala.concurrent.{Await, Future}
-
-import KernelInputStream._
-
-object KernelInputStream {
- val DefaultPrompt = ""
- val DefaultPassword = false
-}
-
-/**
- * Represents an OutputStream that sends data back to the clients connect to the
- * kernel instance.
- *
- * @param actorLoader The actor loader used to access the message relay
- * @param kmBuilder The KMBuilder used to construct outgoing kernel messages
- * @param prompt The prompt to use for input requests
- * @param password Whether or not the input request is for a password
- */
-class KernelInputStream(
- actorLoader: ActorLoader,
- kmBuilder: KMBuilder,
- prompt: String = DefaultPrompt,
- password: Boolean = DefaultPassword
-) extends InputStream {
- private val EncodingType = Charset.forName("UTF-8")
- @volatile private var internalBytes: ListBuffer[Byte] = ListBuffer()
-
- /**
- * Returns the number of bytes available before the next request is made
- * for more data.
- * @return The total number of bytes in the internal buffer
- */
- override def available(): Int = internalBytes.length
-
- /**
- * Requests the next byte of data from the client. If the buffer containing
- * @return The byte of data as an integer
- */
- override def read(): Int = {
- if (!this.hasByte) this.requestBytes()
-
- this.nextByte()
- }
-
- private def hasByte: Boolean = internalBytes.nonEmpty
-
- private def nextByte(): Int = {
- val byte = internalBytes.head
-
- internalBytes = internalBytes.tail
-
- byte
- }
-
- private def requestBytes(): Unit = {
- val inputRequest = InputRequest(prompt, password)
- // NOTE: Assuming already provided parent header and correct ids
- val kernelMessage = kmBuilder
- .withHeader(MessageType.Outgoing.InputRequest)
- .withContentString(inputRequest)
- .build
-
- // NOTE: The same handler is being used in both request and reply
- val responseFuture: Future[String] =
- (actorLoader.load(MessageType.Incoming.InputReply) ? kernelMessage)
- .mapTo[String]
-
- // Block until we get a response
- import scala.concurrent.duration._
- internalBytes ++=
- Await.result(responseFuture, Duration.Inf).getBytes(EncodingType)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/stream/KernelOutputStream.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/stream/KernelOutputStream.scala b/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/stream/KernelOutputStream.scala
deleted file mode 100644
index 56b0cbb..0000000
--- a/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/stream/KernelOutputStream.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.kernel.protocol.v5.stream
-
-import java.io.OutputStream
-import java.nio.charset.Charset
-
-import com.ibm.spark.kernel.protocol.v5.content.StreamContent
-import com.ibm.spark.kernel.protocol.v5.{SystemActorType, MessageType, KMBuilder}
-import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
-import com.ibm.spark.utils.{LogLike, ScheduledTaskManager}
-import scala.collection.mutable.ListBuffer
-import KernelOutputStream._
-
-object KernelOutputStream {
- val DefaultStreamType = "stdout"
- val DefaultSendEmptyOutput = false
-}
-
-/**
- * Represents an OutputStream that sends data back to the clients connect to the
- * kernel instance.
- *
- * @param actorLoader The actor loader used to access the message relay
- * @param kmBuilder The KMBuilder used to construct outgoing kernel messages
- * @param scheduledTaskManager The task manager used to schedule periodic
- * flushes to send data across the wire
- * @param streamType The type of stream (stdout/stderr)
- * @param sendEmptyOutput If true, will allow empty output to be flushed and
- * sent out to listening clients
- */
-class KernelOutputStream(
- private val actorLoader: ActorLoader,
- private val kmBuilder: KMBuilder,
- private val scheduledTaskManager: ScheduledTaskManager,
- private val streamType: String = DefaultStreamType,
- private val sendEmptyOutput: Boolean = DefaultSendEmptyOutput
-) extends OutputStream with LogLike {
- private val EncodingType = Charset.forName("UTF-8")
- @volatile private var internalBytes: ListBuffer[Byte] = ListBuffer()
-
- private var taskId: String = _
-
- private def enableAutoFlush() =
- if (taskId == null) {
- logger.trace("Enabling auto flush")
- taskId = scheduledTaskManager.addTask(task = this.flush())
- }
-
- private def disableAutoFlush() =
- if (taskId != null) {
- logger.trace("Disabling auto flush")
- scheduledTaskManager.removeTask(taskId)
- taskId = null
- }
-
- /**
- * Takes the current byte array contents in memory, packages them up into a
- * KernelMessage, and sends the message to the KernelMessageRelay.
- */
- override def flush(): Unit = {
- val contents = internalBytes.synchronized {
- logger.trace("Getting content to flush")
- val bytesToString = new String(internalBytes.toArray, EncodingType)
-
- // Clear the internal buffer
- internalBytes.clear()
-
- // Stop the auto-flushing
- disableAutoFlush()
-
- bytesToString
- }
-
- // Avoid building and sending a kernel message if the contents (when
- // trimmed) are empty and the flag to send anyway is disabled
- if (!sendEmptyOutput && contents.trim.isEmpty) {
- val contentsWithVisibleWhitespace = contents
- .replace("\n", "\\n")
- .replace("\t", "\\t")
- .replace("\r", "\\r")
- .replace(" ", "\\s")
- logger.warn(s"Suppressing empty output: '$contentsWithVisibleWhitespace'")
- return
- }
-
- logger.trace(s"Content to flush: '$contents'")
-
- val streamContent = StreamContent(
- streamType, contents
- )
-
- val kernelMessage = kmBuilder
- .withIds(Seq(MessageType.Outgoing.Stream.toString))
- .withHeader(MessageType.Outgoing.Stream)
- .withContentString(streamContent).build
-
- actorLoader.load(SystemActorType.KernelMessageRelay) ! kernelMessage
-
- // Ensure any underlying implementation is processed
- super.flush()
- }
-
- /**
- * Adds the specified byte to the end of the internal buffer. The most
- * significant 24 bits are ignored. Only the least significant 8 bits
- * are appended.
- * @param b The byte whose least significant 8 bits are to be appended
- */
- override def write(b: Int): Unit = internalBytes.synchronized {
- // Begin periodic flushing if this is a new set of bytes
- enableAutoFlush()
-
- internalBytes += b.toByte
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/magic/builtin/AddDeps.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/magic/builtin/AddDeps.scala b/kernel/src/main/scala/com/ibm/spark/magic/builtin/AddDeps.scala
deleted file mode 100644
index 5555c08..0000000
--- a/kernel/src/main/scala/com/ibm/spark/magic/builtin/AddDeps.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.magic.builtin
-
-import java.io.PrintStream
-
-import com.ibm.spark.magic._
-import com.ibm.spark.magic.dependencies._
-import com.ibm.spark.utils.ArgumentParsingSupport
-
-class AddDeps extends LineMagic with IncludeInterpreter
- with IncludeOutputStream with IncludeSparkContext with ArgumentParsingSupport
- with IncludeDependencyDownloader with IncludeKernel
-{
-
- private lazy val printStream = new PrintStream(outputStream)
-
- val _transitive =
- parser.accepts("transitive", "retrieve dependencies recursively")
-
- /**
- * Execute a magic representing a line magic.
- * @param code The single line of code
- * @return The output of the magic
- */
- override def execute(code: String): Unit = {
- val nonOptionArgs = parseArgs(code)
- dependencyDownloader.setPrintStream(printStream)
-
- // TODO: require a version or use the most recent if omitted?
- if (nonOptionArgs.size == 3) {
- // get the jars and hold onto the paths at which they reside
- val urls = dependencyDownloader.retrieve(
- nonOptionArgs(0), nonOptionArgs(1), nonOptionArgs(2), _transitive)
-
- // add the jars to the interpreter and spark context
- interpreter.addJars(urls:_*)
- urls.foreach(url => sparkContext.addJar(url.getPath))
- } else {
- printHelp(printStream, """%AddDeps my.company artifact-id version""")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/magic/builtin/AddJar.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/magic/builtin/AddJar.scala b/kernel/src/main/scala/com/ibm/spark/magic/builtin/AddJar.scala
deleted file mode 100644
index bdfdbe2..0000000
--- a/kernel/src/main/scala/com/ibm/spark/magic/builtin/AddJar.scala
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.magic.builtin
-
-import java.io.{File, PrintStream}
-import java.net.URL
-import java.nio.file.{Files, Paths}
-
-import com.ibm.spark.magic._
-import com.ibm.spark.magic.builtin.AddJar._
-import com.ibm.spark.magic.dependencies._
-import com.ibm.spark.utils.{ArgumentParsingSupport, DownloadSupport, LogLike}
-import com.typesafe.config.Config
-
-object AddJar {
-
- private var jarDir:Option[String] = None
- def getJarDir(config: Config): String = {
- jarDir.getOrElse({
- jarDir = Some(
- if(config.hasPath("jar_dir") && Files.exists(Paths.get(config.getString("jar_dir")))) {
- config.getString("jar_dir")
- } else {
- Files.createTempDirectory("spark_kernel_add_jars").toFile.getAbsolutePath
- }
- )
- jarDir.get
- })
- }
-}
-
-class AddJar
- extends LineMagic with IncludeInterpreter with IncludeSparkContext
- with IncludeOutputStream with DownloadSupport with ArgumentParsingSupport
- with IncludeKernel with IncludeMagicLoader with IncludeConfig with LogLike
-{
- // Option to mark re-downloading of jars
- private val _force =
- parser.accepts("f", "forces re-download of specified jar")
-
- // Option to mark re-downloading of jars
- private val _magic =
- parser.accepts("magic", "loads jar as a magic extension")
-
- // Lazy because the outputStream is not provided at construction
- private lazy val printStream = new PrintStream(outputStream)
-
- /**
- * Retrieves file name from URL.
- *
- * @param location The remote location (URL)
- * @return The name of the remote URL, or an empty string if one does not exist
- */
- def getFileFromLocation(location: String): String = {
- val url = new URL(location)
- val file = url.getFile.split("/")
- if (file.length > 0) {
- file.last
- } else {
- ""
- }
- }
-
- /**
- * Downloads and adds the specified jar to the
- * interpreter/compiler/cluster classpaths.
- *
- * @param code The line containing the location of the jar
- */
- override def execute(code: String): Unit = {
- val nonOptionArgs = parseArgs(code.trim)
-
- // Check valid arguments
- if (nonOptionArgs.length != 1) {
- printHelp(printStream, """%AddJar <jar_url>""")
- return
- }
-
- // Check if the jar we want to download is valid
- val jarRemoteLocation = nonOptionArgs(0)
- if (jarRemoteLocation.isEmpty) {
- printHelp(printStream, """%AddJar <jar_url>""")
- return
- }
-
- // Get the destination of the jar
- val jarName = getFileFromLocation(jarRemoteLocation)
-
- // Ensure the URL actually contains a jar or zip file
- if (!jarName.endsWith(".jar") && !jarName.endsWith(".zip")) {
- throw new IllegalArgumentException(s"The jar file $jarName must end in .jar or .zip.")
- }
-
- val downloadLocation = getJarDir(config) + "/" + jarName
-
- logger.debug( "Downloading jar to %s".format(downloadLocation) )
-
- val fileDownloadLocation = new File(downloadLocation)
-
- // Check if exists in cache or force applied
- if (_force || !fileDownloadLocation.exists()) {
- // Report beginning of download
- printStream.println(s"Starting download from $jarRemoteLocation")
-
- downloadFile(
- new URL(jarRemoteLocation),
- new File(downloadLocation).toURI.toURL
- )
-
- // Report download finished
- printStream.println(s"Finished download of $jarName")
- } else {
- printStream.println(s"Using cached version of $jarName")
- }
-
-
- if (_magic)
- {
-
- magicLoader.addJar(fileDownloadLocation.toURI.toURL)
-
- }
- else
- {
- interpreter.addJars(fileDownloadLocation.toURI.toURL)
- sparkContext.addJar(fileDownloadLocation.getCanonicalPath)
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/magic/builtin/BuiltinLoader.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/magic/builtin/BuiltinLoader.scala b/kernel/src/main/scala/com/ibm/spark/magic/builtin/BuiltinLoader.scala
deleted file mode 100644
index aecc90f..0000000
--- a/kernel/src/main/scala/com/ibm/spark/magic/builtin/BuiltinLoader.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.magic.builtin
-
-import com.google.common.reflect.ClassPath
-import com.google.common.reflect.ClassPath.ClassInfo
-import com.ibm.spark.magic.InternalClassLoader
-import com.google.common.base.Strings._
-import scala.collection.JavaConversions._
-
-/**
- * Represents a class loader that loads classes from the builtin package.
- */
-class BuiltinLoader
- extends InternalClassLoader(classOf[BuiltinLoader].getClassLoader) {
-
- private val pkgName = this.getClass.getPackage.getName
-
- /**
- * Provides a list of ClassInfo objects for each class in the specified
- * package.
- * @param pkg package name
- * @return list of ClassInfo objects
- */
- def getClasses(pkg: String = pkgName): List[ClassInfo] = {
- isNullOrEmpty(pkg) match {
- case true =>
- List()
- case false =>
- // TODO: Decide if this.getClass.getClassLoader should just be this
- val classPath = ClassPath.from(this.getClass.getClassLoader)
- classPath.getTopLevelClasses(pkg).filter(
- _.getSimpleName != this.getClass.getSimpleName
- ).toList
- }
- }
-
- /**
- * Provides a list of Class[_] objects for each class in the specified
- * package.
- * @param pkg package name
- * @return list of Class[_] objects
- */
- def loadClasses(pkg: String = pkgName): List[Class[_]] =
- getClasses(pkg).map(_.load())
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/magic/builtin/Html.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/magic/builtin/Html.scala b/kernel/src/main/scala/com/ibm/spark/magic/builtin/Html.scala
deleted file mode 100644
index 95fa31a..0000000
--- a/kernel/src/main/scala/com/ibm/spark/magic/builtin/Html.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.magic.builtin
-
-import java.io.PrintStream
-
-import com.ibm.spark.kernel.protocol.v5.MIMEType
-import com.ibm.spark.magic._
-import com.ibm.spark.magic.dependencies.IncludeOutputStream
-import com.ibm.spark.utils.ArgumentParsingSupport
-import com.google.common.base.Strings
-
-class Html extends CellMagic with ArgumentParsingSupport
- with IncludeOutputStream {
-
- // Lazy because the outputStream is not provided at construction
- private lazy val printStream = new PrintStream(outputStream)
-
- override def execute(code: String): CellMagicOutput = {
- def printHelpAndReturn: CellMagicOutput = {
- printHelp(printStream, """%%Html <string_code>""")
- CellMagicOutput()
- }
-
- Strings.isNullOrEmpty(code) match {
- case true => printHelpAndReturn
- case false => CellMagicOutput(MIMEType.TextHtml -> code)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/magic/builtin/JavaScript.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/magic/builtin/JavaScript.scala b/kernel/src/main/scala/com/ibm/spark/magic/builtin/JavaScript.scala
deleted file mode 100644
index 42772c1..0000000
--- a/kernel/src/main/scala/com/ibm/spark/magic/builtin/JavaScript.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.magic.builtin
-
-import java.io.PrintStream
-
-import com.google.common.base.Strings
-import com.ibm.spark.kernel.protocol.v5.MIMEType
-import com.ibm.spark.magic._
-import com.ibm.spark.magic.dependencies.IncludeOutputStream
-import com.ibm.spark.utils.ArgumentParsingSupport
-import org.slf4j.LoggerFactory
-
-class JavaScript extends CellMagic with ArgumentParsingSupport
- with IncludeOutputStream {
-
- // Lazy because the outputStream is not provided at construction
- private lazy val printStream = new PrintStream(outputStream)
-
- override def execute(code: String): CellMagicOutput = {
- def printHelpAndReturn: CellMagicOutput = {
- printHelp(printStream, """%JavaScript <string_code>""")
- CellMagicOutput()
- }
-
- Strings.isNullOrEmpty(code) match {
- case true => printHelpAndReturn
- case false => CellMagicOutput(MIMEType.ApplicationJavaScript -> code)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/magic/builtin/LSMagic.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/magic/builtin/LSMagic.scala b/kernel/src/main/scala/com/ibm/spark/magic/builtin/LSMagic.scala
deleted file mode 100644
index db99cc1..0000000
--- a/kernel/src/main/scala/com/ibm/spark/magic/builtin/LSMagic.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.magic.builtin
-
-import java.io.PrintStream
-
-import com.ibm.spark.magic._
-import com.ibm.spark.magic.dependencies.IncludeOutputStream
-
-class LSMagic extends LineMagic with IncludeOutputStream {
-
- private lazy val printStream = new PrintStream(outputStream)
-
- /**
- * Lists all available magics.
- * @param code The single line of code
- * @return The output of the magic
- */
- override def execute(code: String): Unit = {
- val classes = new BuiltinLoader().loadClasses().toList
- val lineMagics = magicNames("%", classOf[LineMagic], classes)
- .mkString(" ").toLowerCase
- val cellMagics = magicNames("%%", classOf[CellMagic], classes)
- .mkString(" ").toLowerCase
- val message =
- s"""|Available line magics:
- |$lineMagics
- |
- |Available cell magics:
- |$cellMagics
- |
- |Type %<magic_name> for usage info.
- """.stripMargin
-
- printStream.println(message)
- }
-
- /**
- * Provides a list of class names from the given list that implement
- * the specified interface, with the specified prefix prepended.
- * @param prefix prepended to each name, e.g. "%%"
- * @param interface a magic interface, e.g. classOf[LineMagic]
- * @param classes a list of magic classes
- * @return list of class names with prefix
- */
- protected[magic] def magicNames(prefix: String, interface: Class[_],
- classes: List[Class[_]]) : List[String] = {
- val filteredClasses = classes.filter(_.getInterfaces.contains(interface))
- filteredClasses.map(c => s"${prefix}${c.getSimpleName}")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/magic/builtin/RDD.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/magic/builtin/RDD.scala b/kernel/src/main/scala/com/ibm/spark/magic/builtin/RDD.scala
deleted file mode 100644
index dbee517..0000000
--- a/kernel/src/main/scala/com/ibm/spark/magic/builtin/RDD.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.magic.builtin
-
-import com.ibm.spark.interpreter.{ExecuteFailure, Results, ExecuteAborted, ExecuteError}
-import com.ibm.spark.kernel.protocol.v5.MIMEType
-import com.ibm.spark.magic._
-import com.ibm.spark.magic.dependencies.{IncludeKernelInterpreter, IncludeInterpreter}
-import com.ibm.spark.utils.LogLike
-import com.ibm.spark.utils.json.RddToJson
-import org.apache.spark.sql.SchemaRDD
-
-/**
- * Temporary magic to show an RDD as JSON
- */
-class RDD extends CellMagic with IncludeKernelInterpreter with LogLike {
-
- private def convertToJson(code: String) = {
- val (result, message) = kernelInterpreter.interpret(code)
- result match {
- case Results.Success =>
- val rddVarName = kernelInterpreter.lastExecutionVariableName.getOrElse("")
- kernelInterpreter.read(rddVarName).map(rddVal => {
- try{
- CellMagicOutput(MIMEType.ApplicationJson -> RddToJson.convert(rddVal.asInstanceOf[SchemaRDD]))
- } catch {
- case _: Throwable =>
- CellMagicOutput(MIMEType.PlainText -> s"Could note convert RDD to JSON: ${rddVarName}->${rddVal}")
- }
- }).getOrElse(CellMagicOutput(MIMEType.PlainText -> "No RDD Value found!"))
- case _ =>
- val errorMessage = message.right.toOption match {
- case Some(executeFailure) => executeFailure match {
- case _: ExecuteAborted => throw new Exception("RDD magic aborted!")
- case executeError: ExecuteError => throw new Exception(executeError.value)
- }
- case _ => "No error information available!"
- }
- logger.error(s"Error retrieving RDD value: ${errorMessage}")
- CellMagicOutput(MIMEType.PlainText ->
- (s"An error occurred converting RDD to JSON.\n${errorMessage}"))
- }
- }
-
- override def execute(code: String): CellMagicOutput =
- convertToJson(code)
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/magic/builtin/ShowTypes.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/magic/builtin/ShowTypes.scala b/kernel/src/main/scala/com/ibm/spark/magic/builtin/ShowTypes.scala
deleted file mode 100644
index 47d4f65..0000000
--- a/kernel/src/main/scala/com/ibm/spark/magic/builtin/ShowTypes.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.magic.builtin
-
-import com.ibm.spark.magic.LineMagic
-import com.ibm.spark.magic.dependencies.IncludeOutputStream
-import java.io.PrintStream
-import com.ibm.spark.kernel.api.KernelOptions
-
-
-class ShowTypes extends LineMagic with IncludeOutputStream {
- private lazy val printStream = new PrintStream(outputStream)
-
- override def execute(code: String): Unit = {
- code match {
- case "on" =>
- printStream.println(s"Types will be printed.")
- KernelOptions.showTypes = true
- case "off" =>
- printStream.println(s"Types will not be printed")
- KernelOptions.showTypes = false
- case "" =>
- printStream.println(s"ShowTypes is currently ${if (KernelOptions.showTypes) "on" else "off"} ")
- case other =>
- printStream.println(s"${other} is not a valid option for the ShowTypes magic.")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/magic/builtin/Truncation.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/magic/builtin/Truncation.scala b/kernel/src/main/scala/com/ibm/spark/magic/builtin/Truncation.scala
deleted file mode 100644
index d30736e..0000000
--- a/kernel/src/main/scala/com/ibm/spark/magic/builtin/Truncation.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.magic.builtin
-
-import com.ibm.spark.magic.LineMagic
-import com.ibm.spark.magic.dependencies.IncludeOutputStream
-import java.io.PrintStream
-import com.ibm.spark.kernel.api.KernelOptions
-
-
-class Truncation extends LineMagic with IncludeOutputStream {
- private lazy val printStream = new PrintStream(outputStream)
-
- override def execute(code: String): Unit = {
- code match {
- case "on" =>
- printStream.println(s"Output WILL be truncated.")
- KernelOptions.noTruncation = false
- case "off" =>
- printStream.println(s"Output will NOT be truncated")
- KernelOptions.noTruncation = true
- case "" =>
- printStream.println(s"Truncation is currently ${if (KernelOptions.noTruncation) "off" else "on"} ")
- case other =>
- printStream.println(s"${other} is not a valid option for the NoTruncation magic.")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/utils/MessageLogSupport.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/utils/MessageLogSupport.scala b/kernel/src/main/scala/com/ibm/spark/utils/MessageLogSupport.scala
deleted file mode 100644
index 05c2216..0000000
--- a/kernel/src/main/scala/com/ibm/spark/utils/MessageLogSupport.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.utils
-
-import com.ibm.spark.kernel.protocol.v5.{MessageType, KernelMessage}
-
-trait MessageLogSupport extends LogLike {
- /**
- * Logs various pieces of a KernelMessage at different levels of logging.
- * @param km
- */
- def logMessage(km: KernelMessage): Unit = {
- logger.trace(s"Kernel message ids: ${km.ids}")
- logger.trace(s"Kernel message signature: ${km.signature}")
- logger.debug(s"Kernel message header id: ${km.header.msg_id}")
- logger.debug(s"Kernel message header type: ${km.header.msg_type}")
- val incomingMessage = isIncomingMessage(km.header.msg_type)
- (km.parentHeader, incomingMessage) match {
- case (null, true) => // Don't do anything, this is expected
- case (null, false) => // Messages coming from the kernel should have parent headers
- logger.warn(s"Parent header is null for message ${km.header.msg_id} " +
- s"of type ${km.header.msg_type}")
- case _ =>
- logger.trace(s"Kernel message parent id: ${km.parentHeader.msg_id}")
- logger.trace(s"Kernel message parent type: ${km.parentHeader.msg_type}")
- }
- logger.trace(s"Kernel message metadata: ${km.metadata}")
- logger.trace(s"Kernel message content: ${km.contentString}")
- }
-
- /**
- * Logs an action, along with message id and type for a KernelMessage.
- * @param action
- * @param km
- */
- def logKernelMessageAction(action: String, km: KernelMessage): Unit = {
- logger.debug(s"${action} KernelMessage ${km.header.msg_id} " +
- s"of type ${km.header.msg_type}")
- }
-
- // TODO: Migrate this to a helper method in MessageType.Incoming
- /**
- * This method is used to determine if a message is being received by the
- * kernel or being sent from the kernel.
- * @return true if the message is received by the kernel, false otherwise.
- */
- private def isIncomingMessage(messageType: String): Boolean ={
- MessageType.Incoming.CompleteRequest.toString.equals(messageType) ||
- MessageType.Incoming.ConnectRequest.toString.equals(messageType) ||
- MessageType.Incoming.ExecuteRequest.toString.equals(messageType) ||
- MessageType.Incoming.HistoryRequest.toString.equals(messageType) ||
- MessageType.Incoming.InspectRequest.toString.equals(messageType) ||
- MessageType.Incoming.ShutdownRequest.toString.equals(messageType)||
- MessageType.Incoming.KernelInfoRequest.toString.equals(messageType) ||
- MessageType.Incoming.CommOpen.toString.equals(messageType) ||
- MessageType.Incoming.CommMsg.toString.equals(messageType) ||
- MessageType.Incoming.CommClose.toString.equals(messageType)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/utils/json/RddToJson.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/utils/json/RddToJson.scala b/kernel/src/main/scala/com/ibm/spark/utils/json/RddToJson.scala
deleted file mode 100644
index 3439d0d..0000000
--- a/kernel/src/main/scala/com/ibm/spark/utils/json/RddToJson.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.utils.json
-
-import org.apache.spark.sql.{DataFrame, SchemaRDD}
-import play.api.libs.json.{JsObject, JsString, Json}
-
-/**
- * Utility to convert RDD to JSON.
- */
-object RddToJson {
-
- /**
- * Converts a SchemaRDD to a JSON table format.
- *
- * @param rdd The schema rdd (now a dataframe) to convert
- *
- * @return The resulting string representing the JSON
- */
- def convert(rdd: DataFrame, limit: Int = 10): String =
- JsObject(Seq(
- "type" -> JsString("rdd/schema"),
- "columns" -> Json.toJson(rdd.schema.fieldNames),
- "rows" -> Json.toJson(rdd.map(row =>
- row.toSeq.map(_.toString).toArray).take(limit))
- )).toString()
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/SparkKernel.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/SparkKernel.scala b/kernel/src/main/scala/org/apache/toree/SparkKernel.scala
new file mode 100644
index 0000000..f532de9
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/SparkKernel.scala
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark
+
+import com.ibm.spark.boot.layer._
+import com.ibm.spark.boot.{CommandLineOptions, KernelBootstrap}
+import com.ibm.spark.kernel.BuildInfo
+
+object SparkKernel extends App {
+ private val options = new CommandLineOptions(args)
+
+ if (options.help) {
+ options.printHelpOn(System.out)
+ } else if (options.version) {
+ println(s"Kernel Version: ${BuildInfo.version}")
+ println(s"Build Date: ${BuildInfo.buildDate}")
+ println(s"Scala Version: ${BuildInfo.scalaVersion}")
+ println(s"Apache Spark Version: ${BuildInfo.sparkVersion}")
+ } else {
+ (new KernelBootstrap(options.toConfig)
+ with StandardBareInitialization
+ with StandardComponentInitialization
+ with StandardHandlerInitialization
+ with StandardHookInitialization)
+ .initialize()
+ .waitForTermination()
+ .shutdown()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala b/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala
new file mode 100644
index 0000000..a5acbc2
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.boot
+
+import java.io.{File, OutputStream}
+
+import com.ibm.spark.utils.KeyValuePairUtils
+import com.typesafe.config.{Config, ConfigFactory}
+import joptsimple.util.KeyValuePair
+import joptsimple.{OptionParser, OptionSpec}
+
+import scala.collection.JavaConverters._
+
+class CommandLineOptions(args: Seq[String]) {
+ private val parser = new OptionParser()
+ parser.allowsUnrecognizedOptions()
+
+ /*
+ * Options supported by our kernel.
+ */
+ private val _help =
+ parser.acceptsAll(Seq("help", "h").asJava, "display help information").forHelp()
+
+ private val _version =
+ parser.acceptsAll(Seq("version", "v").asJava, "display version information")
+
+ private val _profile =
+ parser.accepts("profile", "path to IPython JSON connection file")
+ .withRequiredArg().ofType(classOf[File])
+
+ private val _ip =
+ parser.accepts("ip", "ip used to bind sockets")
+ .withRequiredArg().ofType(classOf[String])
+
+ private val _stdin_port = parser.accepts(
+ "stdin-port", "port of the stdin socket"
+ ).withRequiredArg().ofType(classOf[Int])
+
+ private val _shell_port = parser.accepts(
+ "shell-port", "port of the shell socket"
+ ).withRequiredArg().ofType(classOf[Int])
+
+ private val _iopub_port = parser.accepts(
+ "iopub-port", "port of the iopub socket"
+ ).withRequiredArg().ofType(classOf[Int])
+
+ private val _control_port = parser.accepts(
+ "control-port", "port of the control socket"
+ ).withRequiredArg().ofType(classOf[Int])
+
+ private val _heartbeat_port = parser.accepts(
+ "heartbeat-port", "port of the heartbeat socket"
+ ).withRequiredArg().ofType(classOf[Int])
+
+ private val _spark_configuration = parser.acceptsAll(
+ Seq("spark-configuration", "S").asJava,
+ "configuration setting for Apache Spark"
+ ).withRequiredArg().ofType(classOf[KeyValuePair])
+
+ private val _magic_url =
+ parser.accepts("magic-url", "path to a magic jar")
+ .withRequiredArg().ofType(classOf[String])
+
+ private val _max_interpreter_threads = parser.accepts(
+ "max-interpreter-threads",
+ "total number of worker threads to use to execute code"
+ ).withRequiredArg().ofType(classOf[Int])
+
+ private val _jar_dir = parser.accepts(
+ "jar-dir",
+ "directory where user added jars are stored (MUST EXIST)"
+ ).withRequiredArg().ofType(classOf[String])
+
+ private val _default_interpreter =
+ parser.accepts("default-interpreter", "default interpreter for the kernel")
+ .withRequiredArg().ofType(classOf[String])
+
+ private val _nosparkcontext =
+ parser.accepts("nosparkcontext", "kernel should not create a spark context")
+
+ private val _interpreter_plugin = parser.accepts(
+ "interpreter-plugin"
+ ).withRequiredArg().ofType(classOf[String])
+
+ private val options = parser.parse(args.map(_.trim): _*)
+
+ /*
+ * Helpers to determine if an option is provided and the value with which it
+ * was provided.
+ */
+
+ private def has[T](spec: OptionSpec[T]): Boolean =
+ options.has(spec)
+
+ private def get[T](spec: OptionSpec[T]): Option[T] =
+ Some(options.valueOf(spec)).filter(_ != null)
+
+ private def getAll[T](spec: OptionSpec[T]): Option[List[T]] =
+ Some(options.valuesOf(spec).asScala.toList).filter(_ != null)
+
+ /*
+ * Expose options in terms of their existence/value.
+ */
+
+ val help: Boolean = has(_help)
+
+ val version: Boolean = has(_version)
+
+ /*
+ * Config object has 3 levels and fallback in this order
+ * 1. Comandline Args
+ * 2. --profile file
+ * 3. Defaults
+ */
+ def toConfig: Config = {
+ val profileConfig: Config = get(_profile) match {
+ case Some(x) =>
+ ConfigFactory.parseFile(x)
+ case None =>
+ ConfigFactory.empty()
+ }
+
+ val commandLineConfig: Config = ConfigFactory.parseMap(Map(
+ "stdin_port" -> get(_stdin_port),
+ "shell_port" -> get(_shell_port),
+ "iopub_port" -> get(_iopub_port),
+ "control_port" -> get(_control_port),
+ "hb_port" -> get(_heartbeat_port),
+ "ip" -> get(_ip),
+ "interpreter_args" -> interpreterArgs,
+ "magic_urls" -> getAll(_magic_url).map(_.asJava)
+ .flatMap(list => if (list.isEmpty) None else Some(list)),
+ "spark_configuration" -> getAll(_spark_configuration)
+ .map(list => KeyValuePairUtils.keyValuePairSeqToString(list))
+ .flatMap(str => if (str.nonEmpty) Some(str) else None),
+ "max_interpreter_threads" -> get(_max_interpreter_threads),
+ "jar_dir" -> get(_jar_dir),
+ "default_interpreter" -> get(_default_interpreter),
+ "nosparkcontext" -> (if (has(_nosparkcontext)) Some(true) else Some(false)),
+ "interpreter_plugins" -> interpreterPlugins
+ ).flatMap(removeEmptyOptions).asInstanceOf[Map[String, AnyRef]].asJava)
+
+ commandLineConfig.withFallback(profileConfig).withFallback(ConfigFactory.load)
+ }
+
+ private val removeEmptyOptions: ((String, Option[Any])) => Iterable[(String, Any)] = {
+ pair => if (pair._2.isDefined) Some((pair._1, pair._2.get)) else None
+ }
+
+ /**
+ *
+ * @return
+ */
+ private def interpreterArgs: Option[java.util.List[String]] = {
+ args.dropWhile(_ != "--").drop(1).toList match {
+ case Nil => None
+ case list: List[String] => Some(list.asJava)
+ }
+ }
+
+ private def interpreterPlugins: Option[java.util.List[String]] = {
+ //val defaults = getAll(_default_interpreter_plugin).getOrElse(List())
+ //val defaults = List[String](
+ // "PySpark:com.ibm.spark.kernel.interpreter.pyspark.PySparkInterpreter",
+ // "SparkR:com.ibm.spark.kernel.interpreter.sparkr.SparkRInterpreter",
+ // "SQL:com.ibm.spark.kernel.interpreter.sql.SqlInterpreter"
+ //)
+
+ val userDefined = getAll(_interpreter_plugin) match {
+ case Some(l) => l
+ case _ => List[String]()
+ }
+
+ //val p = defaults ++ userDefined
+ Some(userDefined.asJava)
+ }
+
+ /**
+ * Prints the help message to the output stream provided.
+ * @param out The output stream to direct the help message
+ */
+ def printHelpOn(out: OutputStream) =
+ parser.printHelpOn(out)
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/boot/KernelBootstrap.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/boot/KernelBootstrap.scala b/kernel/src/main/scala/org/apache/toree/boot/KernelBootstrap.scala
new file mode 100644
index 0000000..1e7927c
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/boot/KernelBootstrap.scala
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.boot
+
+import akka.actor.{ActorRef, ActorSystem}
+import com.ibm.spark.boot.layer._
+import com.ibm.spark.interpreter.Interpreter
+import com.ibm.spark.kernel.api.Kernel
+import com.ibm.spark.kernel.protocol.v5.KernelStatusType._
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
+import com.ibm.spark.security.KernelSecurityManager
+import com.ibm.spark.utils.LogLike
+import com.typesafe.config.Config
+import org.apache.spark.SparkContext
+import org.zeromq.ZMQ
+
+import scala.util.Try
+
+class KernelBootstrap(config: Config) extends LogLike {
+ this: BareInitialization with ComponentInitialization
+ with HandlerInitialization with HookInitialization =>
+
+ private val DefaultAppName = SparkKernelInfo.banner
+ private val DefaultActorSystemName = "spark-kernel-actor-system"
+
+ private var actorSystem: ActorSystem = _
+ private var actorLoader: ActorLoader = _
+ private var kernelMessageRelayActor: ActorRef = _
+ private var statusDispatch: ActorRef = _
+ private var kernel: Kernel = _
+
+ private var sparkContext: SparkContext = _
+ private var interpreters: Seq[Interpreter] = Nil
+
+ /**
+ * Initializes all kernel systems.
+ */
+ def initialize() = {
+ // TODO: Investigate potential to initialize System out/err/in to capture
+ // Console DynamicVariable initialization (since takes System fields)
+ // and redirect it to a workable location (like an actor) with the
+ // thread's current information attached
+ //
+ // E.G. System.setOut(customPrintStream) ... all new threads will have
+ // customPrintStream as their initial Console.out value
+ //
+
+ displayVersionInfo()
+
+ // Do this first to support shutting down quickly before entire system
+ // is ready
+ initializeShutdownHook()
+
+ // Initialize the bare minimum to report a starting message
+ val (actorSystem, actorLoader, kernelMessageRelayActor, statusDispatch) =
+ initializeBare(
+ config = config,
+ actorSystemName = DefaultActorSystemName
+ )
+
+ this.actorSystem = actorSystem
+ this.actorLoader = actorLoader
+ this.kernelMessageRelayActor = kernelMessageRelayActor
+ this.statusDispatch = statusDispatch
+
+ // Indicate that the kernel is now starting
+ publishStatus(KernelStatusType.Starting)
+
+ // Initialize components needed elsewhere
+ val (commStorage, commRegistrar, commManager, interpreter,
+ kernel, dependencyDownloader,
+ magicLoader, responseMap) =
+ initializeComponents(
+ config = config,
+ appName = DefaultAppName,
+ actorLoader = actorLoader
+ )
+ //this.sparkContext = sparkContext
+ this.interpreters ++= Seq(interpreter)
+
+ this.kernel = kernel
+
+ // Initialize our handlers that take care of processing messages
+ initializeHandlers(
+ actorSystem = actorSystem,
+ actorLoader = actorLoader,
+ kernel = kernel,
+ interpreter = interpreter,
+ commRegistrar = commRegistrar,
+ commStorage = commStorage,
+ magicLoader = magicLoader,
+ responseMap = responseMap
+ )
+
+ // Initialize our non-shutdown hooks that handle various JVM events
+ initializeHooks(
+ interpreter = interpreter
+ )
+
+ logger.debug("Initializing security manager")
+ System.setSecurityManager(new KernelSecurityManager)
+
+ logger.info("Marking relay as ready for receiving messages")
+ kernelMessageRelayActor ! true
+
+ this
+ }
+
+ /**
+ * Shuts down all kernel systems.
+ */
+ def shutdown() = {
+ logger.info("Shutting down Spark Context")
+ Try(kernel.sparkContext.stop()).failed.foreach(
+ logger.error("Failed to shutdown Spark Context", _: Throwable)
+ )
+
+ logger.info("Shutting down interpreters")
+ Try(interpreters.foreach(_.stop())).failed.foreach(
+ logger.error("Failed to shutdown interpreters", _: Throwable)
+ )
+
+ logger.info("Shutting down actor system")
+ Try(actorSystem.shutdown()).failed.foreach(
+ logger.error("Failed to shutdown actor system", _: Throwable)
+ )
+
+ this
+ }
+
+ /**
+ * Waits for the main actor system to terminate.
+ */
+ def waitForTermination() = {
+ logger.debug("Waiting for actor system to terminate")
+ actorSystem.awaitTermination()
+
+ this
+ }
+
+ private def publishStatus(
+ status: KernelStatusType,
+ parentHeader: Option[ParentHeader] = None
+ ): Unit = {
+ parentHeader match {
+ case Some(header) => statusDispatch ! ((status, header))
+ case None => statusDispatch ! status
+ }
+ }
+
+ @inline private def displayVersionInfo() = {
+ logger.info("Kernel version: " + SparkKernelInfo.implementationVersion)
+ logger.info("Scala version: " + SparkKernelInfo.languageVersion)
+ logger.info("ZeroMQ (JeroMQ) version: " + ZMQ.getVersionString)
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/boot/layer/BareInitialization.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/boot/layer/BareInitialization.scala b/kernel/src/main/scala/org/apache/toree/boot/layer/BareInitialization.scala
new file mode 100644
index 0000000..d2d6ab9
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/boot/layer/BareInitialization.scala
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.boot.layer
+
+import akka.actor.{ActorRef, Props, ActorSystem}
+import com.ibm.spark.kernel.protocol.v5.dispatch.StatusDispatch
+import com.ibm.spark.kernel.protocol.v5.handler.{GenericSocketMessageHandler, ShutdownHandler}
+import com.ibm.spark.kernel.protocol.v5.kernel.{SimpleActorLoader, ActorLoader}
+import com.ibm.spark.communication.security.{SecurityActorType, SignatureManagerActor}
+import com.ibm.spark.kernel.protocol.v5.kernel.socket._
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.kernel.protocol.v5.content.{CommClose, CommMsg, CommOpen}
+import com.ibm.spark.kernel.protocol.v5.relay.KernelMessageRelay
+import com.ibm.spark.utils.LogLike
+import com.typesafe.config.Config
+import play.api.libs.json.Json
+
+/**
+ * Represents the raw initialization needed to send a "starting" message.
+ */
+trait BareInitialization {
+ /**
+ * Initializes and registers all objects needed to get the kernel to send a
+ * "starting" message.
+ *
+ * @param config The config used for initialization
+ * @param actorSystemName The name to use for the actor system
+ */
+ def initializeBare(config: Config, actorSystemName: String):
+ (ActorSystem, ActorLoader, ActorRef, ActorRef)
+}
+
+/**
+ * Represents the standard implementation of BareInitialization.
+ */
+trait StandardBareInitialization extends BareInitialization { this: LogLike =>
+ /**
+ * Initializes and registers all objects needed to get the kernel to send a
+ * "starting" message.
+ *
+ * @param config The config used for initialization
+ * @param actorSystemName The name to use for the actor system
+ */
+ def initializeBare(config: Config, actorSystemName: String) = {
+ val actorSystem = createActorSystem(actorSystemName)
+ val actorLoader = createActorLoader(actorSystem)
+ val (kernelMessageRelayActor, _, statusDispatch, _, _) =
+ initializeCoreActors(config, actorSystem, actorLoader)
+ createSockets(config, actorSystem, actorLoader)
+
+ (actorSystem, actorLoader, kernelMessageRelayActor, statusDispatch)
+ }
+
+ protected def createActorSystem(actorSystemName: String): ActorSystem = {
+ logger.info("Initializing internal actor system")
+ ActorSystem(actorSystemName)
+ }
+
+ protected def createActorLoader(actorSystem: ActorSystem): ActorLoader = {
+ logger.debug("Creating Simple Actor Loader")
+ SimpleActorLoader(actorSystem)
+ }
+
+ /**
+ * Does minimal setup in order to send the "starting" status message over
+ * the IOPub socket
+ */
+ protected def initializeCoreActors(
+ config: Config, actorSystem: ActorSystem, actorLoader: ActorLoader
+ ) = {
+ logger.debug("Creating kernel message relay actor")
+ val kernelMessageRelayActor = actorSystem.actorOf(
+ Props(
+ classOf[KernelMessageRelay], actorLoader, true,
+ Map(
+ CommOpen.toTypeString -> MessageType.Incoming.CommOpen.toString,
+ CommMsg.toTypeString -> MessageType.Incoming.CommMsg.toString,
+ CommClose.toTypeString -> MessageType.Incoming.CommClose.toString
+ ),
+ Map(
+ CommOpen.toTypeString -> MessageType.Outgoing.CommOpen.toString,
+ CommMsg.toTypeString -> MessageType.Outgoing.CommMsg.toString,
+ CommClose.toTypeString -> MessageType.Outgoing.CommClose.toString
+ )
+ ),
+ name = SystemActorType.KernelMessageRelay.toString
+ )
+
+ logger.debug("Creating signature manager actor")
+ val sigKey = config.getString("key")
+ val sigScheme = config.getString("signature_scheme")
+ logger.debug("Key = " + sigKey)
+ logger.debug("Scheme = " + sigScheme)
+ val signatureManagerActor = actorSystem.actorOf(
+ Props(
+ classOf[SignatureManagerActor], sigKey, sigScheme.replace("-", "")
+ ),
+ name = SecurityActorType.SignatureManager.toString
+ )
+
+ logger.debug("Creating status dispatch actor")
+ val statusDispatch = actorSystem.actorOf(
+ Props(classOf[StatusDispatch], actorLoader),
+ name = SystemActorType.StatusDispatch.toString
+ )
+
+ logger.debug("Creating shutdown handler and sender actors")
+ val shutdownHandler = actorSystem.actorOf(
+ Props(classOf[ShutdownHandler], actorLoader),
+ name = MessageType.Incoming.ShutdownRequest.toString
+ )
+ val shutdownSender = actorSystem.actorOf(
+ Props(classOf[GenericSocketMessageHandler], actorLoader, SocketType.Control),
+ name = MessageType.Outgoing.ShutdownReply.toString
+ )
+
+ (kernelMessageRelayActor, signatureManagerActor, statusDispatch, shutdownHandler, shutdownSender)
+ }
+
+ protected def createSockets(
+ config: Config, actorSystem: ActorSystem, actorLoader: ActorLoader
+ ): Unit = {
+ logger.debug("Creating sockets")
+
+ val socketConfig: SocketConfig = SocketConfig.fromConfig(config)
+ logger.info("Connection Profile: "
+ + Json.prettyPrint(Json.toJson(socketConfig)))
+
+ logger.debug("Constructing ServerSocketFactory")
+ val socketFactory = new SocketFactory(socketConfig)
+
+ logger.debug("Initializing Heartbeat on port " +
+ socketConfig.hb_port)
+ val heartbeatActor = actorSystem.actorOf(
+ Props(classOf[Heartbeat], socketFactory),
+ name = SocketType.Heartbeat.toString
+ )
+
+ logger.debug("Initializing Stdin on port " +
+ socketConfig.stdin_port)
+ val stdinActor = actorSystem.actorOf(
+ Props(classOf[Stdin], socketFactory, actorLoader),
+ name = SocketType.StdIn.toString
+ )
+
+ logger.debug("Initializing Shell on port " +
+ socketConfig.shell_port)
+ val shellActor = actorSystem.actorOf(
+ Props(classOf[Shell], socketFactory, actorLoader),
+ name = SocketType.Shell.toString
+ )
+
+ logger.debug("Initializing Control on port " +
+ socketConfig.control_port)
+ val controlActor = actorSystem.actorOf(
+ Props(classOf[Control], socketFactory, actorLoader),
+ name = SocketType.Control.toString
+ )
+
+ logger.debug("Initializing IOPub on port " +
+ socketConfig.iopub_port)
+ val ioPubActor = actorSystem.actorOf(
+ Props(classOf[IOPub], socketFactory),
+ name = SocketType.IOPub.toString
+ )
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala b/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala
new file mode 100644
index 0000000..939b896
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala
@@ -0,0 +1,200 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.boot.layer
+
+import java.util
+import java.util.concurrent.ConcurrentHashMap
+
+import akka.actor.ActorRef
+import com.ibm.spark.comm.{CommManager, KernelCommManager, CommRegistrar, CommStorage}
+import com.ibm.spark.dependencies.{DependencyDownloader, IvyDependencyDownloader}
+import com.ibm.spark.global
+import com.ibm.spark.interpreter._
+import com.ibm.spark.kernel.api.{KernelLike, Kernel}
+import com.ibm.spark.kernel.protocol.v5.KMBuilder
+import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
+import com.ibm.spark.kernel.protocol.v5.stream.KernelOutputStream
+import com.ibm.spark.magic.MagicLoader
+import com.ibm.spark.magic.builtin.BuiltinLoader
+import com.ibm.spark.magic.dependencies.DependencyMap
+import com.ibm.spark.utils.{MultiClassLoader, TaskManager, KeyValuePairUtils, LogLike}
+import com.typesafe.config.Config
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.{SparkContext, SparkConf}
+
+import scala.collection.JavaConverters._
+
+import scala.util.Try
+
+/**
+ * Represents the component initialization. All component-related pieces of the
+ * kernel (non-actors) should be created here. Limited items should be exposed.
+ */
+trait ComponentInitialization {
+ /**
+ * Initializes and registers all components (not needed by bare init).
+ *
+ * @param config The config used for initialization
+ * @param appName The name of the "application" for Spark
+ * @param actorLoader The actor loader to use for some initialization
+ */
+ def initializeComponents(
+ config: Config, appName: String, actorLoader: ActorLoader
+ ): (CommStorage, CommRegistrar, CommManager, Interpreter,
+ Kernel, DependencyDownloader, MagicLoader,
+ collection.mutable.Map[String, ActorRef])
+}
+
+/**
+ * Represents the standard implementation of ComponentInitialization.
+ */
+trait StandardComponentInitialization extends ComponentInitialization {
+ this: LogLike =>
+
+ /**
+ * Initializes and registers all components (not needed by bare init).
+ *
+ * @param config The config used for initialization
+ * @param appName The name of the "application" for Spark
+ * @param actorLoader The actor loader to use for some initialization
+ */
+ def initializeComponents(
+ config: Config, appName: String, actorLoader: ActorLoader
+ ) = {
+ val (commStorage, commRegistrar, commManager) =
+ initializeCommObjects(actorLoader)
+
+ val manager = InterpreterManager(config)
+ val scalaInterpreter = manager.interpreters.get("Scala").orNull
+
+ val dependencyDownloader = initializeDependencyDownloader(config)
+ val magicLoader = initializeMagicLoader(
+ config, scalaInterpreter, dependencyDownloader)
+
+ val kernel = initializeKernel(
+ config, actorLoader, manager, commManager, magicLoader
+ )
+
+ val responseMap = initializeResponseMap()
+
+ initializeSparkContext(config, kernel, appName)
+
+ (commStorage, commRegistrar, commManager,
+ manager.defaultInterpreter.orNull, kernel,
+ dependencyDownloader, magicLoader, responseMap)
+
+ }
+
+
+ def initializeSparkContext(config:Config, kernel:Kernel, appName:String) = {
+ if(!config.getBoolean("nosparkcontext")) {
+ kernel.createSparkContext(config.getString("spark.master"), appName)
+ }
+ }
+
+ private def initializeCommObjects(actorLoader: ActorLoader) = {
+ logger.debug("Constructing Comm storage")
+ val commStorage = new CommStorage()
+
+ logger.debug("Constructing Comm registrar")
+ val commRegistrar = new CommRegistrar(commStorage)
+
+ logger.debug("Constructing Comm manager")
+ val commManager = new KernelCommManager(
+ actorLoader, KMBuilder(), commRegistrar)
+
+ (commStorage, commRegistrar, commManager)
+ }
+
+ private def initializeDependencyDownloader(config: Config) = {
+ val dependencyDownloader = new IvyDependencyDownloader(
+ "http://repo1.maven.org/maven2/", config.getString("ivy_local")
+ )
+
+ dependencyDownloader
+ }
+
+ protected def initializeResponseMap(): collection.mutable.Map[String, ActorRef] =
+ new ConcurrentHashMap[String, ActorRef]().asScala
+
+ private def initializeKernel(
+ config: Config,
+ actorLoader: ActorLoader,
+ interpreterManager: InterpreterManager,
+ commManager: CommManager,
+ magicLoader: MagicLoader
+ ) = {
+ val kernel = new Kernel(
+ config,
+ actorLoader,
+ interpreterManager,
+ commManager,
+ magicLoader
+ )
+ /*
+ interpreter.doQuietly {
+ interpreter.bind(
+ "kernel", "com.ibm.spark.kernel.api.Kernel",
+ kernel, List( """@transient implicit""")
+ )
+ }
+ */
+ magicLoader.dependencyMap.setKernel(kernel)
+
+ kernel
+ }
+
+ private def initializeMagicLoader(
+ config: Config, interpreter: Interpreter,
+ dependencyDownloader: DependencyDownloader
+ ) = {
+ logger.debug("Constructing magic loader")
+
+ logger.debug("Building dependency map")
+ val dependencyMap = new DependencyMap()
+ .setInterpreter(interpreter)
+ .setKernelInterpreter(interpreter) // This is deprecated
+ .setDependencyDownloader(dependencyDownloader)
+ .setConfig(config)
+
+ logger.debug("Creating BuiltinLoader")
+ val builtinLoader = new BuiltinLoader()
+
+ val magicUrlArray = config.getStringList("magic_urls").asScala
+ .map(s => new java.net.URL(s)).toArray
+
+ if (magicUrlArray.isEmpty)
+ logger.warn("No external magics provided to MagicLoader!")
+ else
+ logger.info("Using magics from the following locations: " +
+ magicUrlArray.map(_.getPath).mkString(","))
+
+ val multiClassLoader = new MultiClassLoader(
+ builtinLoader,
+ interpreter.classLoader
+ )
+
+ logger.debug("Creating MagicLoader")
+ val magicLoader = new MagicLoader(
+ dependencyMap = dependencyMap,
+ urls = magicUrlArray,
+ parentLoader = multiClassLoader
+ )
+ magicLoader.dependencyMap.setMagicLoader(magicLoader)
+ magicLoader
+ }
+}