You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lb...@apache.org on 2016/01/22 23:07:23 UTC
[15/51] [abbrv] incubator-toree git commit: Moved scala files to new
locations based on new package
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/magic/builtin/LSMagic.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/LSMagic.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/LSMagic.scala
new file mode 100644
index 0000000..db99cc1
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/LSMagic.scala
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.magic.builtin
+
+import java.io.PrintStream
+
+import com.ibm.spark.magic._
+import com.ibm.spark.magic.dependencies.IncludeOutputStream
+
+class LSMagic extends LineMagic with IncludeOutputStream {
+
+ private lazy val printStream = new PrintStream(outputStream)
+
+ /**
+ * Lists all available magics.
+ * @param code The single line of code
+ * @return The output of the magic
+ */
+ override def execute(code: String): Unit = {
+ val classes = new BuiltinLoader().loadClasses().toList
+ val lineMagics = magicNames("%", classOf[LineMagic], classes)
+ .mkString(" ").toLowerCase
+ val cellMagics = magicNames("%%", classOf[CellMagic], classes)
+ .mkString(" ").toLowerCase
+ val message =
+ s"""|Available line magics:
+ |$lineMagics
+ |
+ |Available cell magics:
+ |$cellMagics
+ |
+ |Type %<magic_name> for usage info.
+ """.stripMargin
+
+ printStream.println(message)
+ }
+
+ /**
+ * Provides a list of class names from the given list that implement
+ * the specified interface, with the specified prefix prepended.
+ * @param prefix prepended to each name, e.g. "%%"
+ * @param interface a magic interface, e.g. classOf[LineMagic]
+ * @param classes a list of magic classes
+ * @return list of class names with prefix
+ */
+ protected[magic] def magicNames(prefix: String, interface: Class[_],
+ classes: List[Class[_]]) : List[String] = {
+ val filteredClasses = classes.filter(_.getInterfaces.contains(interface))
+ filteredClasses.map(c => s"${prefix}${c.getSimpleName}")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/magic/builtin/RDD.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/RDD.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/RDD.scala
new file mode 100644
index 0000000..dbee517
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/RDD.scala
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.magic.builtin
+
+import com.ibm.spark.interpreter.{ExecuteFailure, Results, ExecuteAborted, ExecuteError}
+import com.ibm.spark.kernel.protocol.v5.MIMEType
+import com.ibm.spark.magic._
+import com.ibm.spark.magic.dependencies.{IncludeKernelInterpreter, IncludeInterpreter}
+import com.ibm.spark.utils.LogLike
+import com.ibm.spark.utils.json.RddToJson
+import org.apache.spark.sql.SchemaRDD
+
+/**
+ * Temporary magic to show an RDD as JSON
+ */
+class RDD extends CellMagic with IncludeKernelInterpreter with LogLike {
+
+ private def convertToJson(code: String) = {
+ val (result, message) = kernelInterpreter.interpret(code)
+ result match {
+ case Results.Success =>
+ val rddVarName = kernelInterpreter.lastExecutionVariableName.getOrElse("")
+ kernelInterpreter.read(rddVarName).map(rddVal => {
+ try{
+ CellMagicOutput(MIMEType.ApplicationJson -> RddToJson.convert(rddVal.asInstanceOf[SchemaRDD]))
+ } catch {
+ case _: Throwable =>
+ CellMagicOutput(MIMEType.PlainText -> s"Could note convert RDD to JSON: ${rddVarName}->${rddVal}")
+ }
+ }).getOrElse(CellMagicOutput(MIMEType.PlainText -> "No RDD Value found!"))
+ case _ =>
+ val errorMessage = message.right.toOption match {
+ case Some(executeFailure) => executeFailure match {
+ case _: ExecuteAborted => throw new Exception("RDD magic aborted!")
+ case executeError: ExecuteError => throw new Exception(executeError.value)
+ }
+ case _ => "No error information available!"
+ }
+ logger.error(s"Error retrieving RDD value: ${errorMessage}")
+ CellMagicOutput(MIMEType.PlainText ->
+ (s"An error occurred converting RDD to JSON.\n${errorMessage}"))
+ }
+ }
+
+ override def execute(code: String): CellMagicOutput =
+ convertToJson(code)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/magic/builtin/ShowTypes.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/ShowTypes.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/ShowTypes.scala
new file mode 100644
index 0000000..47d4f65
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/ShowTypes.scala
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.magic.builtin
+
+import com.ibm.spark.magic.LineMagic
+import com.ibm.spark.magic.dependencies.IncludeOutputStream
+import java.io.PrintStream
+import com.ibm.spark.kernel.api.KernelOptions
+
+
+class ShowTypes extends LineMagic with IncludeOutputStream {
+ private lazy val printStream = new PrintStream(outputStream)
+
+ override def execute(code: String): Unit = {
+ code match {
+ case "on" =>
+ printStream.println(s"Types will be printed.")
+ KernelOptions.showTypes = true
+ case "off" =>
+ printStream.println(s"Types will not be printed")
+ KernelOptions.showTypes = false
+ case "" =>
+ printStream.println(s"ShowTypes is currently ${if (KernelOptions.showTypes) "on" else "off"} ")
+ case other =>
+ printStream.println(s"${other} is not a valid option for the ShowTypes magic.")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/magic/builtin/Truncation.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/Truncation.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/Truncation.scala
new file mode 100644
index 0000000..d30736e
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/Truncation.scala
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.magic.builtin
+
+import com.ibm.spark.magic.LineMagic
+import com.ibm.spark.magic.dependencies.IncludeOutputStream
+import java.io.PrintStream
+import com.ibm.spark.kernel.api.KernelOptions
+
+
+class Truncation extends LineMagic with IncludeOutputStream {
+ private lazy val printStream = new PrintStream(outputStream)
+
+ override def execute(code: String): Unit = {
+ code match {
+ case "on" =>
+ printStream.println(s"Output WILL be truncated.")
+ KernelOptions.noTruncation = false
+ case "off" =>
+ printStream.println(s"Output will NOT be truncated")
+ KernelOptions.noTruncation = true
+ case "" =>
+ printStream.println(s"Truncation is currently ${if (KernelOptions.noTruncation) "off" else "on"} ")
+ case other =>
+ printStream.println(s"${other} is not a valid option for the NoTruncation magic.")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/utils/MessageLogSupport.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/utils/MessageLogSupport.scala b/kernel/src/main/scala/org/apache/toree/utils/MessageLogSupport.scala
new file mode 100644
index 0000000..05c2216
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/utils/MessageLogSupport.scala
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.utils
+
+import com.ibm.spark.kernel.protocol.v5.{MessageType, KernelMessage}
+
+trait MessageLogSupport extends LogLike {
+ /**
+ * Logs various pieces of a KernelMessage at different levels of logging.
+ * @param km
+ */
+ def logMessage(km: KernelMessage): Unit = {
+ logger.trace(s"Kernel message ids: ${km.ids}")
+ logger.trace(s"Kernel message signature: ${km.signature}")
+ logger.debug(s"Kernel message header id: ${km.header.msg_id}")
+ logger.debug(s"Kernel message header type: ${km.header.msg_type}")
+ val incomingMessage = isIncomingMessage(km.header.msg_type)
+ (km.parentHeader, incomingMessage) match {
+ case (null, true) => // Don't do anything, this is expected
+ case (null, false) => // Messages coming from the kernel should have parent headers
+ logger.warn(s"Parent header is null for message ${km.header.msg_id} " +
+ s"of type ${km.header.msg_type}")
+ case _ =>
+ logger.trace(s"Kernel message parent id: ${km.parentHeader.msg_id}")
+ logger.trace(s"Kernel message parent type: ${km.parentHeader.msg_type}")
+ }
+ logger.trace(s"Kernel message metadata: ${km.metadata}")
+ logger.trace(s"Kernel message content: ${km.contentString}")
+ }
+
+ /**
+ * Logs an action, along with message id and type for a KernelMessage.
+ * @param action
+ * @param km
+ */
+ def logKernelMessageAction(action: String, km: KernelMessage): Unit = {
+ logger.debug(s"${action} KernelMessage ${km.header.msg_id} " +
+ s"of type ${km.header.msg_type}")
+ }
+
+ // TODO: Migrate this to a helper method in MessageType.Incoming
+ /**
+ * This method is used to determine if a message is being received by the
+ * kernel or being sent from the kernel.
+ * @return true if the message is received by the kernel, false otherwise.
+ */
+ private def isIncomingMessage(messageType: String): Boolean ={
+ MessageType.Incoming.CompleteRequest.toString.equals(messageType) ||
+ MessageType.Incoming.ConnectRequest.toString.equals(messageType) ||
+ MessageType.Incoming.ExecuteRequest.toString.equals(messageType) ||
+ MessageType.Incoming.HistoryRequest.toString.equals(messageType) ||
+ MessageType.Incoming.InspectRequest.toString.equals(messageType) ||
+ MessageType.Incoming.ShutdownRequest.toString.equals(messageType)||
+ MessageType.Incoming.KernelInfoRequest.toString.equals(messageType) ||
+ MessageType.Incoming.CommOpen.toString.equals(messageType) ||
+ MessageType.Incoming.CommMsg.toString.equals(messageType) ||
+ MessageType.Incoming.CommClose.toString.equals(messageType)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/utils/json/RddToJson.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/utils/json/RddToJson.scala b/kernel/src/main/scala/org/apache/toree/utils/json/RddToJson.scala
new file mode 100644
index 0000000..3439d0d
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/utils/json/RddToJson.scala
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.utils.json
+
+import org.apache.spark.sql.{DataFrame, SchemaRDD}
+import play.api.libs.json.{JsObject, JsString, Json}
+
+/**
+ * Utility to convert RDD to JSON.
+ */
+object RddToJson {
+
+ /**
+ * Converts a SchemaRDD to a JSON table format.
+ *
+ * @param rdd The schema rdd (now a dataframe) to convert
+ *
+ * @return The resulting string representing the JSON
+ */
+ def convert(rdd: DataFrame, limit: Int = 10): String =
+ JsObject(Seq(
+ "type" -> JsString("rdd/schema"),
+ "columns" -> Json.toJson(rdd.schema.fieldNames),
+ "rows" -> Json.toJson(rdd.map(row =>
+ row.toSeq.map(_.toString).toArray).take(limit))
+ )).toString()
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/boot/CommandLineOptionsSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/com/ibm/spark/boot/CommandLineOptionsSpec.scala b/kernel/src/test/scala/com/ibm/spark/boot/CommandLineOptionsSpec.scala
deleted file mode 100644
index 703d677..0000000
--- a/kernel/src/test/scala/com/ibm/spark/boot/CommandLineOptionsSpec.scala
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.boot
-
-import java.io.File
-
-import com.typesafe.config.Config
-import joptsimple.OptionException
-import org.scalatest.{FunSpec, Matchers}
-
-import scala.collection.JavaConverters._
-
-class CommandLineOptionsSpec extends FunSpec with Matchers {
-
- describe("CommandLineOptions") {
- describe("when received --max-interpreter-threads=<int>") {
- it("should set the configuration to the specified value") {
- val expected = 999
- val options = new CommandLineOptions(
- s"--max-interpreter-threads=$expected" :: Nil
- )
-
- val actual = options.toConfig.getInt("max_interpreter_threads")
-
- actual should be (expected)
- }
- }
-
- describe("when received --help") {
- it("should set the help flag to true") {
- val options = new CommandLineOptions("--help" :: Nil)
-
- options.help should be (true)
- }
- }
-
- describe("when received -h") {
- it("should set the help flag to true") {
- val options = new CommandLineOptions("-h" :: Nil)
-
- options.help should be (true)
- }
- }
-
- describe("when not received --help or -h") {
- it("should set the help flag to false") {
- val options = new CommandLineOptions(Nil)
-
- options.help should be (false)
- }
- }
-
- describe("when received --version") {
- it("should set the version flag to true") {
- val options = new CommandLineOptions("--version" :: Nil)
-
- options.version should be (true)
- }
- }
-
- describe("when received -v") {
- it("should set the version flag to true") {
- val options = new CommandLineOptions("-v" :: Nil)
-
- options.version should be (true)
- }
- }
-
- describe("when not received --version or -v") {
- it("should set the version flag to false") {
- val options = new CommandLineOptions(Nil)
-
- options.version should be (false)
- }
- }
-
- describe("when received --spark-conf=<key>=<value>") {
- it("should add the key-value pair to the string representation") {
- val expected = "key=value"
- val options = new CommandLineOptions(s"--spark-conf=$expected" :: Nil)
- val actual = options.toConfig.getString("spark_configuration")
-
- actual should be (expected)
- }
-
- it("should append to existing command line key-value pairs") {
- val expected = "key1=value1" :: "key2=value2" :: Nil
- val options = new CommandLineOptions(
- s"--spark-conf=${expected(0)}" ::
- s"--spark-conf=${expected(1)}" ::
- Nil
- )
- val actual = options.toConfig.getString("spark_configuration")
-
- actual should be (expected.mkString(","))
- }
- }
-
- describe("when received -S<key>=<value>") {
- it("should add the key-value pair to the string representation") {
- val expected = "key=value"
- val options = new CommandLineOptions(s"-S$expected" :: Nil)
- val actual = options.toConfig.getString("spark_configuration")
-
- actual should be(expected)
- }
-
- it("should append to existing command line key-value pairs") {
- val expected = "key1=value1" :: "key2=value2" :: Nil
- val options = new CommandLineOptions(
- s"-S${expected(0)}" ::
- s"-S${expected(1)}" ::
- Nil
- )
- val actual = options.toConfig.getString("spark_configuration")
-
- actual should be (expected.mkString(","))
- }
- }
-
- describe("when received --profile=<path>") {
- it("should error if path is not set") {
- intercept[OptionException] {
- new CommandLineOptions(Seq("--profile"))
- }
- }
-
- describe("#toConfig") {
- it("should include values specified in file") {
-
- val pathToProfileFixture: String = new File(getClass.getResource("/fixtures/profile.json").toURI).getAbsolutePath
- val options = new CommandLineOptions(Seq("--profile="+pathToProfileFixture))
-
- val config: Config = options.toConfig
-
- config.entrySet() should not be ('empty)
- config.getInt("stdin_port") should be(12345)
- config.getInt("shell_port") should be(54321)
- config.getInt("iopub_port") should be(11111)
- config.getInt("control_port") should be(22222)
- config.getInt("hb_port") should be(33333)
- }
- }
- }
-
- describe("when received --<protocol port name>=<value>"){
- it("should error if value is not set") {
- intercept[OptionException] {
- new CommandLineOptions(Seq("--stdin-port"))
- }
- intercept[OptionException] {
- new CommandLineOptions(Seq("--shell-port"))
- }
- intercept[OptionException] {
- new CommandLineOptions(Seq("--iopub-port"))
- }
- intercept[OptionException] {
- new CommandLineOptions(Seq("--control-port"))
- }
- intercept[OptionException] {
- new CommandLineOptions(Seq("--heartbeat-port"))
- }
- }
-
- describe("#toConfig") {
- it("should return config with commandline option values") {
-
- val options = new CommandLineOptions(List(
- "--stdin-port", "99999",
- "--shell-port", "88888",
- "--iopub-port", "77777",
- "--control-port", "55555",
- "--heartbeat-port", "44444"
- ))
-
- val config: Config = options.toConfig
-
- config.entrySet() should not be ('empty)
- config.getInt("stdin_port") should be(99999)
- config.getInt("shell_port") should be(88888)
- config.getInt("iopub_port") should be(77777)
- config.getInt("control_port") should be(55555)
- config.getInt("hb_port") should be(44444)
- }
- }
- }
-
- describe("when received --profile and --<protocol port name>=<value>"){
- describe("#toConfig") {
- it("should return config with <protocol port> argument value") {
-
- val pathToProfileFixture: String = (new File(getClass.getResource("/fixtures/profile.json").toURI)).getAbsolutePath
- val options = new CommandLineOptions(List("--profile", pathToProfileFixture, "--stdin-port", "99999", "--shell-port", "88888"))
-
- val config: Config = options.toConfig
-
- config.entrySet() should not be ('empty)
- config.getInt("stdin_port") should be(99999)
- config.getInt("shell_port") should be(88888)
- config.getInt("iopub_port") should be(11111)
- config.getInt("control_port") should be(22222)
- }
- }
-
- }
-
- describe("when no arguments are received"){
- describe("#toConfig") {
- it("should read default value set in reference.conf") {
-
- val options = new CommandLineOptions(Nil)
-
- val config: Config = options.toConfig
- config.getInt("stdin_port") should be(48691)
- config.getInt("shell_port") should be(40544)
- config.getInt("iopub_port") should be(43462)
- config.getInt("control_port") should be(44808)
- }
- }
- }
-
- describe("when using -- to separate interpreter arguments"){
- describe("#toConfig") {
- it("should return interpreter_args config property when there are args before --") {
-
- val options = new CommandLineOptions(List("--stdin-port", "99999", "--shell-port", "88888", "--", "someArg1", "someArg2", "someArg3"))
-
- val config: Config = options .toConfig
-
- config.entrySet() should not be ('empty)
- config.getStringList("interpreter_args").asScala should be (List("someArg1", "someArg2", "someArg3"))
- }
-
- it("should return interpreter_args config property when args is at the beginning") {
-
- val options = new CommandLineOptions(List("--", "someArg1", "someArg2", "someArg3"))
-
- val config: Config = options .toConfig
-
- config.entrySet() should not be ('empty)
- config.getStringList("interpreter_args").asScala should be (List("someArg1", "someArg2", "someArg3"))
- }
-
- it("should return interpreter_args config property as empty list when there is nothing after --") {
-
- val options = new CommandLineOptions(List("--stdin-port", "99999", "--shell-port", "88888", "--"))
-
- val config: Config = options .toConfig
-
- config.entrySet() should not be ('empty)
- config.getStringList("interpreter_args").asScala should be ('empty)
- }
- }
- }
-
- describe("when received --ip=<value>") {
- it("should error if value is not set") {
- intercept[OptionException] {
- new CommandLineOptions(Seq("--ip"))
- }
- }
-
- describe("#toConfig") {
- it("should set ip to specified value") {
- val expected = "1.2.3.4"
- val options = new CommandLineOptions(s"--ip=${expected}" :: Nil)
- val config: Config = options.toConfig
-
- config.getString("ip") should be(expected)
- }
-
- it("should set ip to 127.0.0.1") {
- val options = new CommandLineOptions(Nil)
- val config: Config = options.toConfig
-
- config.getString("ip") should be("127.0.0.1")
- }
- }
- }
-
- describe("when received options with surrounding whitespace") {
- it("should trim whitespace") {
- val url1 = "url1"
- val url2 = "url2"
-
- val options = new CommandLineOptions(Seq(
- " --magic-url ", s" ${url1}\t",
- "--magic-url", s" \t ${url2} \t"
- ))
- val config: Config = options.toConfig
-
- config.getList("magic_urls").unwrapped.asScala should
- be (Seq(url1, url2))
- }
- }
-
- describe("when received --interpreter-plugin") {
- it("should return the interpreter-plugin along with the defaults") {
- val options = new CommandLineOptions(Seq(
- "--interpreter-plugin",
- "dummy:test.utils.DummyInterpreter"
- ))
-
- val config: Config = options.toConfig
-
- val p = config.getList("interpreter_plugins")
-
- p should not be empty
-
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/comm/KernelCommManagerSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/com/ibm/spark/comm/KernelCommManagerSpec.scala b/kernel/src/test/scala/com/ibm/spark/comm/KernelCommManagerSpec.scala
deleted file mode 100644
index 7b4442c..0000000
--- a/kernel/src/test/scala/com/ibm/spark/comm/KernelCommManagerSpec.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.comm
-
-import com.ibm.spark.kernel.protocol.v5
-import com.ibm.spark.kernel.protocol.v5._
-import com.ibm.spark.kernel.protocol.v5.content.CommContent
-import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
-import org.scalatest.mock.MockitoSugar
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-import org.scalatest.{BeforeAndAfter, FunSpec, Matchers}
-
-class KernelCommManagerSpec extends FunSpec with Matchers with BeforeAndAfter
- with MockitoSugar
-{
- private val TestTargetName = "some target"
-
- private var mockActorLoader: ActorLoader = _
- private var mockKMBuilder: KMBuilder = _
- private var mockCommRegistrar: CommRegistrar = _
- private var kernelCommManager: KernelCommManager = _
-
- private var generatedCommWriter: CommWriter = _
-
- before {
- mockActorLoader = mock[ActorLoader]
- mockKMBuilder = mock[KMBuilder]
- mockCommRegistrar = mock[CommRegistrar]
-
- kernelCommManager = new KernelCommManager(
- mockActorLoader,
- mockKMBuilder,
- mockCommRegistrar
- ) {
- override protected def newCommWriter(commId: UUID): CommWriter = {
- val commWriter = super.newCommWriter(commId)
-
- generatedCommWriter = commWriter
-
- val spyCommWriter = spy(commWriter)
- doNothing().when(spyCommWriter)
- .sendCommKernelMessage(any[KernelMessageContent with CommContent])
-
- spyCommWriter
- }
- }
- }
-
- describe("KernelCommManager") {
- describe("#open") {
- it("should return a wrapped instance of KernelCommWriter") {
- kernelCommManager.open(TestTargetName, v5.MsgData.Empty)
-
- // Exposed hackishly for testing
- generatedCommWriter shouldBe a [KernelCommWriter]
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/comm/KernelCommWriterSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/com/ibm/spark/comm/KernelCommWriterSpec.scala b/kernel/src/test/scala/com/ibm/spark/comm/KernelCommWriterSpec.scala
deleted file mode 100644
index eb792bb..0000000
--- a/kernel/src/test/scala/com/ibm/spark/comm/KernelCommWriterSpec.scala
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.comm
-
-import java.util.UUID
-import com.ibm.spark.kernel.protocol.v5._
-import com.ibm.spark.kernel.protocol.v5.content._
-import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
-import play.api.libs.json.Json
-import scala.concurrent.duration._
-
-import akka.actor.{ActorSelection, ActorSystem}
-import akka.testkit.{TestProbe, TestKit}
-import com.typesafe.config.ConfigFactory
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-
-object KernelCommWriterSpec {
- val config ="""
- akka {
- loglevel = "WARNING"
- }"""
-}
-
-class KernelCommWriterSpec extends TestKit(
- ActorSystem("KernelCommWriterSpec",
- ConfigFactory.parseString(KernelCommWriterSpec.config))
-) with FunSpecLike with Matchers with BeforeAndAfter with MockitoSugar
-{
-
- private val commId = UUID.randomUUID().toString
- private var kernelCommWriter: KernelCommWriter = _
- private var kernelMessageBuilder: KMBuilder = _
-
- private var actorLoader: ActorLoader = _
- private var kernelMessageRelayProbe: TestProbe = _
-
- /**
- * Retrieves the next message available.
- *
- * @return The KernelMessage instance (or an error if timed out)
- */
- private def getNextMessage =
- kernelMessageRelayProbe.receiveOne(200.milliseconds)
- .asInstanceOf[KernelMessage]
-
- /**
- * Retrieves the next message available and returns its type.
- *
- * @return The type of the message (pulled from message header)
- */
- private def getNextMessageType = getNextMessage.header.msg_type
-
- /**
- * Retrieves the next message available and parses the content string.
- *
- * @tparam T The type to coerce the content string into
- *
- * @return The resulting KernelMessageContent instance
- */
- private def getNextMessageContents[T <: KernelMessageContent]
- (implicit fjs: play.api.libs.json.Reads[T], mf: Manifest[T]) =
- {
- val receivedMessage = getNextMessage
-
- Json.parse(receivedMessage.contentString).as[T]
- }
-
- before {
- kernelMessageBuilder = spy(KMBuilder())
-
- // Construct path for kernel message relay
- actorLoader = mock[ActorLoader]
- kernelMessageRelayProbe = TestProbe()
- val kernelMessageRelaySelection: ActorSelection =
- system.actorSelection(kernelMessageRelayProbe.ref.path.toString)
- doReturn(kernelMessageRelaySelection)
- .when(actorLoader).load(SystemActorType.KernelMessageRelay)
-
- // Create a new writer to use for testing
- kernelCommWriter = new KernelCommWriter(actorLoader, kernelMessageBuilder, commId)
- }
-
- describe("KernelCommWriter") {
- describe("#writeOpen") {
- it("should send a comm_open message to the relay") {
- kernelCommWriter.writeOpen(anyString())
-
- getNextMessageType should be (CommOpen.toTypeString)
- }
-
- it("should include the comm_id in the message") {
- val expected = commId
- kernelCommWriter.writeOpen(anyString())
-
- val actual = getNextMessageContents[CommOpen].comm_id
-
- actual should be (expected)
- }
-
- it("should include the target name in the message") {
- val expected = "<TARGET_NAME>"
- kernelCommWriter.writeOpen(expected)
-
- val actual = getNextMessageContents[CommOpen].target_name
-
- actual should be (expected)
- }
-
- it("should provide empty data in the message if no data is provided") {
- val expected = MsgData.Empty
- kernelCommWriter.writeOpen(anyString())
-
- val actual = getNextMessageContents[CommOpen].data
-
- actual should be (expected)
- }
-
- it("should include the data in the message") {
- val expected = MsgData("some key" -> "some value")
- kernelCommWriter.writeOpen(anyString(), expected)
-
- val actual = getNextMessageContents[CommOpen].data
-
- actual should be (expected)
- }
- }
-
- describe("#writeMsg") {
- it("should send a comm_msg message to the relay") {
- kernelCommWriter.writeMsg(MsgData.Empty)
-
- getNextMessageType should be (CommMsg.toTypeString)
- }
-
- it("should include the comm_id in the message") {
- val expected = commId
- kernelCommWriter.writeMsg(MsgData.Empty)
-
- val actual = getNextMessageContents[CommMsg].comm_id
-
- actual should be (expected)
- }
-
- it("should fail a require if the data is null") {
- intercept[IllegalArgumentException] {
- kernelCommWriter.writeMsg(null)
- }
- }
-
- it("should include the data in the message") {
- val expected = MsgData("some key" -> "some value")
- kernelCommWriter.writeMsg(expected)
-
- val actual = getNextMessageContents[CommMsg].data
-
- actual should be (expected)
- }
- }
-
- describe("#writeClose") {
- it("should send a comm_close message to the relay") {
- kernelCommWriter.writeClose()
-
- getNextMessageType should be (CommClose.toTypeString)
- }
-
- it("should include the comm_id in the message") {
- val expected = commId
- kernelCommWriter.writeClose()
-
- val actual = getNextMessageContents[CommClose].comm_id
-
- actual should be (expected)
- }
-
- it("should provide empty data in the message if no data is provided") {
- val expected = MsgData.Empty
- kernelCommWriter.writeClose()
-
- val actual = getNextMessageContents[CommClose].data
-
- actual should be (expected)
- }
-
- it("should include the data in the message") {
- val expected = MsgData("some key" -> "some value")
- kernelCommWriter.writeClose(expected)
-
- val actual = getNextMessageContents[CommClose].data
-
- actual should be (expected)
- }
- }
-
- describe("#write") {
- it("should send a comm_msg message to the relay") {
- kernelCommWriter.write(Array('a'), 0, 1)
-
- getNextMessageType should be (CommMsg.toTypeString)
- }
-
- it("should include the comm_id in the message") {
- val expected = commId
- kernelCommWriter.write(Array('a'), 0, 1)
-
- val actual = getNextMessageContents[CommMsg].comm_id
-
- actual should be (expected)
- }
-
- it("should package the string as part of the data with a 'message' key") {
- val expected = MsgData("message" -> "a")
- kernelCommWriter.write(Array('a'), 0, 1)
-
- val actual = getNextMessageContents[CommMsg].data
-
- actual should be (expected)
- }
- }
-
- describe("#flush") {
- it("should do nothing") {
- // TODO: Is this test necessary? It does nothing.
- kernelCommWriter.flush()
- }
- }
-
- describe("#close") {
- it("should send a comm_close message to the relay") {
- kernelCommWriter.close()
-
- getNextMessageType should be (CommClose.toTypeString)
- }
-
- it("should include the comm_id in the message") {
- val expected = commId
- kernelCommWriter.close()
-
- val actual = getNextMessageContents[CommClose].comm_id
-
- actual should be (expected)
- }
-
- it("should provide empty data in the message") {
- val expected = MsgData.Empty
- kernelCommWriter.close()
-
- val actual = getNextMessageContents[CommClose].data
-
- actual should be (expected)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/global/ExecutionCounterSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/com/ibm/spark/global/ExecutionCounterSpec.scala b/kernel/src/test/scala/com/ibm/spark/global/ExecutionCounterSpec.scala
deleted file mode 100644
index 4d1641f..0000000
--- a/kernel/src/test/scala/com/ibm/spark/global/ExecutionCounterSpec.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.global
-
-import org.scalatest.{FunSpec, Matchers}
-
-class ExecutionCounterSpec extends FunSpec with Matchers {
- describe("ExecutionCounter") {
- describe("#increment( String )"){
- it("should increment value when key is not present"){
- ExecutionCounter incr "foo" should be(1)
- }
- it("should increment value for key when it is present"){
- ExecutionCounter incr "bar" should be(1)
- ExecutionCounter incr "bar" should be(2)
- }
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala b/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala
deleted file mode 100644
index 58ea0c5..0000000
--- a/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala
+++ /dev/null
@@ -1,178 +0,0 @@
-package com.ibm.spark.kernel.api
-
-import java.io.{InputStream, PrintStream}
-
-import com.ibm.spark.boot.layer.InterpreterManager
-import com.ibm.spark.comm.CommManager
-import com.ibm.spark.interpreter._
-import com.ibm.spark.kernel.protocol.v5._
-import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
-import com.ibm.spark.magic.MagicLoader
-import com.typesafe.config.Config
-import org.apache.spark.{SparkConf, SparkContext}
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{BeforeAndAfter, FunSpec, Matchers}
-import com.ibm.spark.global.ExecuteRequestState
-
-class KernelSpec extends FunSpec with Matchers with MockitoSugar
- with BeforeAndAfter
-{
- private val BadCode = Some("abc foo bar")
- private val GoodCode = Some("val foo = 1")
- private val ErrorCode = Some("val foo = bar")
- private val ErrorMsg = "Name: error\n" +
- "Message: bad\n" +
- "StackTrace: 1"
-
- private var mockConfig: Config = _
- private var mockSparkContext: SparkContext = _
- private var mockSparkConf: SparkConf = _
- private var mockActorLoader: ActorLoader = _
- private var mockInterpreter: Interpreter = _
- private var mockInterpreterManager: InterpreterManager = _
- private var mockCommManager: CommManager = _
- private var mockMagicLoader: MagicLoader = _
- private var kernel: Kernel = _
- private var spyKernel: Kernel = _
-
- before {
- mockConfig = mock[Config]
- mockInterpreter = mock[Interpreter]
- mockInterpreterManager = mock[InterpreterManager]
- mockSparkContext = mock[SparkContext]
- mockSparkConf = mock[SparkConf]
- when(mockInterpreterManager.defaultInterpreter)
- .thenReturn(Some(mockInterpreter))
- when(mockInterpreterManager.interpreters)
- .thenReturn(Map[String, com.ibm.spark.interpreter.Interpreter]())
- when(mockInterpreter.interpret(BadCode.get))
- .thenReturn((Results.Incomplete, null))
- when(mockInterpreter.interpret(GoodCode.get))
- .thenReturn((Results.Success, Left(new ExecuteOutput("ok"))))
- when(mockInterpreter.interpret(ErrorCode.get))
- .thenReturn((Results.Error, Right(ExecuteError("error","bad", List("1")))))
-
-
- mockCommManager = mock[CommManager]
- mockActorLoader = mock[ActorLoader]
- mockMagicLoader = mock[MagicLoader]
-
- kernel = new Kernel(
- mockConfig, mockActorLoader, mockInterpreterManager, mockCommManager,
- mockMagicLoader
- )
-
- spyKernel = spy(kernel)
-
- }
-
- after {
- ExecuteRequestState.reset()
- }
-
- describe("Kernel") {
- describe("#eval") {
- it("should return syntax error") {
- kernel eval BadCode should be((false, "Syntax Error!"))
- }
-
- it("should return ok") {
- kernel eval GoodCode should be((true, "ok"))
- }
-
- it("should return error") {
- kernel eval ErrorCode should be((false, ErrorMsg))
- }
-
- it("should return error on None") {
- kernel eval None should be ((false, "Error!"))
- }
- }
-
- describe("#out") {
- it("should throw an exception if the ExecuteRequestState has not been set") {
- intercept[IllegalArgumentException] {
- kernel.out
- }
- }
-
- it("should create a new PrintStream instance if the ExecuteRequestState has been set") {
- ExecuteRequestState.processIncomingKernelMessage(
- new KernelMessage(Nil, "", mock[Header], mock[ParentHeader],
- mock[Metadata], "")
- )
- kernel.out shouldBe a [PrintStream]
- }
- }
-
- describe("#err") {
- it("should throw an exception if the ExecuteRequestState has not been set") {
- intercept[IllegalArgumentException] {
- kernel.err
- }
- }
-
- it("should create a new PrintStream instance if the ExecuteRequestState has been set") {
- ExecuteRequestState.processIncomingKernelMessage(
- new KernelMessage(Nil, "", mock[Header], mock[ParentHeader],
- mock[Metadata], "")
- )
-
- // TODO: Access the underlying streamType field to assert stderr?
- kernel.err shouldBe a [PrintStream]
- }
- }
-
- describe("#in") {
- it("should throw an exception if the ExecuteRequestState has not been set") {
- intercept[IllegalArgumentException] {
- kernel.in
- }
- }
-
- it("should create a new InputStream instance if the ExecuteRequestState has been set") {
- ExecuteRequestState.processIncomingKernelMessage(
- new KernelMessage(Nil, "", mock[Header], mock[ParentHeader],
- mock[Metadata], "")
- )
-
- kernel.in shouldBe a [InputStream]
- }
- }
-
- describe("#stream") {
- it("should throw an exception if the ExecuteRequestState has not been set") {
- intercept[IllegalArgumentException] {
- kernel.stream
- }
- }
-
- it("should create a StreamMethods instance if the ExecuteRequestState has been set") {
- ExecuteRequestState.processIncomingKernelMessage(
- new KernelMessage(Nil, "", mock[Header], mock[ParentHeader],
- mock[Metadata], "")
- )
-
- kernel.stream shouldBe a [StreamMethods]
- }
- }
-
- describe("when spark.master is set in config") {
-
- it("should create SparkConf") {
- val expected = "some value"
- doReturn(expected).when(mockConfig).getString("spark.master")
- doReturn("").when(mockConfig).getString("spark_configuration")
-
- // Provide stub for interpreter classServerURI since also executed
- doReturn("").when(mockInterpreter).classServerURI
-
- val sparkConf = kernel.createSparkConf(new SparkConf().setMaster(expected))
-
- sparkConf.get("spark.master") should be (expected)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/kernel/api/StreamMethodsSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/api/StreamMethodsSpec.scala b/kernel/src/test/scala/com/ibm/spark/kernel/api/StreamMethodsSpec.scala
deleted file mode 100644
index fc87588..0000000
--- a/kernel/src/test/scala/com/ibm/spark/kernel/api/StreamMethodsSpec.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-package com.ibm.spark.kernel.api
-
-import akka.actor.ActorSystem
-import akka.testkit.{ImplicitSender, TestKit, TestProbe}
-import com.ibm.spark.kernel.protocol.v5
-import com.ibm.spark.kernel.protocol.v5.KernelMessage
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FunSpecLike, BeforeAndAfter, Matchers, FunSpec}
-import play.api.libs.json.Json
-
-import scala.concurrent.duration._
-
-import org.mockito.Mockito._
-
-class StreamMethodsSpec extends TestKit(
- ActorSystem("StreamMethodsSpec")
-) with ImplicitSender with FunSpecLike with Matchers with MockitoSugar
- with BeforeAndAfter
-{
- private val MaxDuration = 300.milliseconds
-
- private var kernelMessageRelayProbe: TestProbe = _
- private var mockParentHeader: v5.ParentHeader = _
- private var mockActorLoader: v5.kernel.ActorLoader = _
- private var mockKernelMessage: v5.KernelMessage = _
- private var streamMethods: StreamMethods = _
-
- before {
- kernelMessageRelayProbe = TestProbe()
-
- mockParentHeader = mock[v5.ParentHeader]
-
- mockActorLoader = mock[v5.kernel.ActorLoader]
- doReturn(system.actorSelection(kernelMessageRelayProbe.ref.path))
- .when(mockActorLoader).load(v5.SystemActorType.KernelMessageRelay)
-
- mockKernelMessage = mock[v5.KernelMessage]
- doReturn(mockParentHeader).when(mockKernelMessage).header
-
- streamMethods = new StreamMethods(mockActorLoader, mockKernelMessage)
- }
-
- describe("StreamMethods") {
- describe("#()") {
- it("should put the header of the given message as the parent header") {
- val expected = mockKernelMessage.header
- val actual = streamMethods.kmBuilder.build.parentHeader
-
- actual should be (expected)
- }
- }
-
- describe("#sendAll") {
- it("should send a message containing all of the given text") {
- val expected = "some text"
-
- streamMethods.sendAll(expected)
-
- val outgoingMessage = kernelMessageRelayProbe.receiveOne(MaxDuration)
- val kernelMessage = outgoingMessage.asInstanceOf[KernelMessage]
-
- val actual = Json.parse(kernelMessage.contentString)
- .as[v5.content.StreamContent].text
-
- actual should be (expected)
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/dispatch/StatusDispatchSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/dispatch/StatusDispatchSpec.scala b/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/dispatch/StatusDispatchSpec.scala
deleted file mode 100644
index 60d3b42..0000000
--- a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/dispatch/StatusDispatchSpec.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.kernel.protocol.v5.dispatch
-
-import akka.actor.{ActorRef, ActorSystem, Props}
-import akka.testkit.{TestKit, TestProbe}
-import com.ibm.spark.kernel.protocol.v5._
-import com.ibm.spark.kernel.protocol.v5.content.KernelStatus
-import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
-import play.api.libs.json.Json
-
-import scala.concurrent.duration._
-
-class StatusDispatchSpec extends TestKit(ActorSystem("StatusDispatchSystem"))
-with FunSpecLike with Matchers with MockitoSugar with BeforeAndAfter{
- var statusDispatchRef: ActorRef = _
- var relayProbe: TestProbe = _
- before {
- // Mock the relay with a probe
- relayProbe = TestProbe()
- // Mock the ActorLoader
- val mockActorLoader: ActorLoader = mock[ActorLoader]
- when(mockActorLoader.load(SystemActorType.KernelMessageRelay))
- .thenReturn(system.actorSelection(relayProbe.ref.path.toString))
-
- statusDispatchRef = system.actorOf(Props(classOf[StatusDispatch],mockActorLoader))
- }
-
-
- describe("StatusDispatch") {
- describe("#receive( KernelStatusType )") {
- it("should send a status message to the relay") {
- statusDispatchRef ! KernelStatusType.Busy
- // Check the kernel message is the correct type
- val statusMessage: KernelMessage = relayProbe.receiveOne(500.milliseconds).asInstanceOf[KernelMessage]
- statusMessage.header.msg_type should be (MessageType.Outgoing.Status.toString)
- // Check the status is what we sent
- val status: KernelStatus = Json.parse(statusMessage.contentString).as[KernelStatus]
- status.execution_state should be (KernelStatusType.Busy.toString)
- }
- }
-
- describe("#receive( KernelStatusType, Header )") {
- it("should send a status message to the relay") {
- val tuple = Tuple2(KernelStatusType.Busy, mock[Header])
- statusDispatchRef ! tuple
- // Check the kernel message is the correct type
- val statusMessage: KernelMessage = relayProbe.receiveOne(500.milliseconds).asInstanceOf[KernelMessage]
- statusMessage.header.msg_type should be (MessageType.Outgoing.Status.toString)
- // Check the status is what we sent
- val status: KernelStatus = Json.parse(statusMessage.contentString).as[KernelStatus]
- status.execution_state should be (KernelStatusType.Busy.toString)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CodeCompleteHandlerSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CodeCompleteHandlerSpec.scala b/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CodeCompleteHandlerSpec.scala
deleted file mode 100644
index b44ad1c..0000000
--- a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CodeCompleteHandlerSpec.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.kernel.protocol.v5.handler
-
-import akka.actor._
-import akka.testkit.{TestProbe, ImplicitSender, TestKit}
-import com.ibm.spark.kernel.protocol.v5._
-import com.ibm.spark.kernel.protocol.v5.content.CompleteRequest
-import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
-import com.ibm.spark.kernel.protocol.v5Test._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FunSpecLike, BeforeAndAfter, Matchers}
-import org.mockito.Mockito._
-import scala.concurrent.duration._
-
-class CodeCompleteHandlerSpec extends TestKit(
- ActorSystem("CodeCompleteHandlerSpec")
-) with ImplicitSender with FunSpecLike with Matchers with MockitoSugar
- with BeforeAndAfter {
-
- var actorLoader: ActorLoader = _
- var handlerActor: ActorRef = _
- var kernelMessageRelayProbe: TestProbe = _
- var interpreterProbe: TestProbe = _
- var statusDispatchProbe: TestProbe = _
-
- before {
- actorLoader = mock[ActorLoader]
-
- handlerActor = system.actorOf(Props(classOf[CodeCompleteHandler], actorLoader))
-
- kernelMessageRelayProbe = TestProbe()
- when(actorLoader.load(SystemActorType.KernelMessageRelay))
- .thenReturn(system.actorSelection(kernelMessageRelayProbe.ref.path.toString))
-
- interpreterProbe = new TestProbe(system)
- when(actorLoader.load(SystemActorType.Interpreter))
- .thenReturn(system.actorSelection(interpreterProbe.ref.path.toString))
-
- statusDispatchProbe = new TestProbe(system)
- when(actorLoader.load(SystemActorType.StatusDispatch))
- .thenReturn(system.actorSelection(statusDispatchProbe.ref.path.toString))
- }
-
- def replyToHandlerWithOkAndResult() = {
- val expectedClass = classOf[CompleteRequest]
- interpreterProbe.expectMsgClass(expectedClass)
- interpreterProbe.reply((0, List[String]()))
- }
-
- def replyToHandlerWithOkAndBadResult() = {
- val expectedClass = classOf[CompleteRequest]
- interpreterProbe.expectMsgClass(expectedClass)
- interpreterProbe.reply("hello")
- }
-
- describe("CodeCompleteHandler (ActorLoader)") {
- it("should send a CompleteRequest") {
- handlerActor ! MockCompleteRequestKernelMessage
- replyToHandlerWithOkAndResult()
- kernelMessageRelayProbe.fishForMessage(500.milliseconds) {
- case KernelMessage(_, _, header, _, _, _) =>
- header.msg_type == MessageType.Outgoing.CompleteReply.toString
- }
- }
-
- it("should throw an error for bad JSON") {
- handlerActor ! MockKernelMessageWithBadJSON
- var result = false
- try {
- replyToHandlerWithOkAndResult()
- }
- catch {
- case t: Throwable => result = true
- }
- result should be (true)
- }
-
- it("should throw an error for bad code completion") {
- handlerActor ! MockCompleteRequestKernelMessage
- try {
- replyToHandlerWithOkAndBadResult()
- }
- catch {
- case error: Exception => error.getMessage should be ("Parse error in CodeCompleteHandler")
- }
- }
-
- it("should send an idle message") {
- handlerActor ! MockCompleteRequestKernelMessage
- replyToHandlerWithOkAndResult()
- statusDispatchProbe.fishForMessage(500.milliseconds) {
- case Tuple2(status, _) =>
- status == KernelStatusType.Idle
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommCloseHandlerSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommCloseHandlerSpec.scala b/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommCloseHandlerSpec.scala
deleted file mode 100644
index 49582f8..0000000
--- a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommCloseHandlerSpec.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.kernel.protocol.v5.handler
-
-import java.util.UUID
-
-import akka.actor.{Props, ActorRef, ActorSystem}
-import akka.testkit.{TestProbe, ImplicitSender, TestKit}
-import com.ibm.spark.kernel.protocol.v5
-import com.ibm.spark.kernel.protocol.v5.content.{ClearOutput, CommClose}
-import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
-import com.ibm.spark.kernel.protocol.v5.{KernelMessage, SystemActorType, KMBuilder}
-import com.ibm.spark.comm.{CommRegistrar, CommWriter, CommCallbacks, CommStorage}
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-
-import scala.concurrent.duration._
-
-class CommCloseHandlerSpec extends TestKit(
- ActorSystem("CommCloseHandlerSpec")
-) with ImplicitSender with FunSpecLike with Matchers with MockitoSugar
- with BeforeAndAfter
-{
- private val TestCommId = UUID.randomUUID().toString
- private val TestTargetName = "some test target"
-
- private var kmBuilder: KMBuilder = _
- private var spyCommStorage: CommStorage = _
- private var mockCommCallbacks: CommCallbacks = _
- private var mockCommRegistrar: CommRegistrar = _
- private var mockActorLoader: ActorLoader = _
- private var commCloseHandler: ActorRef = _
- private var kernelMessageRelayProbe: TestProbe = _
- private var statusDispatchProbe: TestProbe = _
-
- before {
- kmBuilder = KMBuilder()
- mockCommCallbacks = mock[CommCallbacks]
- spyCommStorage = spy(new CommStorage())
- mockCommRegistrar = mock[CommRegistrar]
-
- mockActorLoader = mock[ActorLoader]
-
- commCloseHandler = system.actorOf(Props(
- classOf[CommCloseHandler],
- mockActorLoader, mockCommRegistrar, spyCommStorage
- ))
-
- // Used to intercept responses
- kernelMessageRelayProbe = TestProbe()
- when(mockActorLoader.load(SystemActorType.KernelMessageRelay))
- .thenReturn(system.actorSelection(kernelMessageRelayProbe.ref.path.toString))
-
- // Used to intercept busy/idle messages
- statusDispatchProbe = new TestProbe(system)
- when(mockActorLoader.load(SystemActorType.StatusDispatch))
- .thenReturn(system.actorSelection(statusDispatchProbe.ref.path.toString))
- }
-
- describe("CommCloseHandler") {
- describe("#process") {
- it("should execute close callbacks if the id is registered") {
- // Mark our id as registered
- doReturn(Some(mockCommCallbacks)).when(spyCommStorage)
- .getCommIdCallbacks(TestCommId)
-
- // Send a comm_open message with the test target
- commCloseHandler ! kmBuilder
- .withHeader(CommClose.toTypeString)
- .withContentString(CommClose(TestCommId, v5.MsgData.Empty))
- .build
-
- // Should receive a busy and an idle message
- statusDispatchProbe.receiveN(2, 200.milliseconds)
-
- // Verify that the msg callbacks were triggered along the way
- verify(mockCommCallbacks).executeCloseCallbacks(
- any[CommWriter], any[v5.UUID], any[v5.MsgData])
- }
-
- it("should not execute close callbacks if the id is not registered") {
- // Mark our target as not registered
- doReturn(None).when(spyCommStorage).getCommIdCallbacks(TestCommId)
-
- // Send a comm_msg message with the test id
- commCloseHandler ! kmBuilder
- .withHeader(CommClose.toTypeString)
- .withContentString(CommClose(TestCommId, v5.MsgData.Empty))
- .build
-
- // Should receive a busy and an idle message
- statusDispatchProbe.receiveN(2, 200.milliseconds)
-
- // Verify that the msg callbacks were NOT triggered along the way
- verify(mockCommCallbacks, never()).executeCloseCallbacks(
- any[CommWriter], any[v5.UUID], any[v5.MsgData])
- }
-
- it("should do nothing if there is a parsing error") {
- // Send a comm_open message with an invalid content string
- commCloseHandler ! kmBuilder
- .withHeader(CommClose.toTypeString)
- .withContentString(ClearOutput(_wait = true))
- .build
-
- // TODO: Is there a better way to test for this without an upper time
- // limit? Is there a different logical approach?
- kernelMessageRelayProbe.expectNoMsg(200.milliseconds)
- }
-
- it("should include the parent's header in the parent header of " +
- "outgoing messages"){
-
- // Register a callback that sends a message using the comm writer
- val closeCallback: CommCallbacks.CloseCallback =
- new CommCallbacks.CloseCallback() {
- def apply(v1: CommWriter, v2: v5.UUID, v4: v5.MsgData) =
- v1.writeMsg(v5.MsgData.Empty)
- }
- val callbacks = (new CommCallbacks).addCloseCallback(closeCallback)
- doReturn(Some(callbacks)).when(spyCommStorage)
- .getCommIdCallbacks(TestCommId)
-
- // Send a comm close message
- val msg = kmBuilder
- .withHeader(CommClose.toTypeString)
- .withContentString(CommClose(TestCommId, v5.MsgData.Empty))
- .build
- commCloseHandler ! msg
-
- // Verify that the message sent by the handler has the desired property
- kernelMessageRelayProbe.fishForMessage(200.milliseconds) {
- case KernelMessage(_, _, _, parentHeader, _, _) =>
- parentHeader == msg.header
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandlerSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandlerSpec.scala b/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandlerSpec.scala
deleted file mode 100644
index 582a08e..0000000
--- a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandlerSpec.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.kernel.protocol.v5.handler
-
-import java.util.UUID
-
-import akka.actor.{ActorRef, ActorSystem, Props}
-import akka.testkit.{ImplicitSender, TestKit, TestProbe}
-import com.ibm.spark.kernel.protocol.v5
-import com.ibm.spark.kernel.protocol.v5._
-import com.ibm.spark.comm._
-import com.ibm.spark.kernel.protocol.v5.content.{CommMsg, ClearOutput, CommOpen}
-import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
-
-import scala.concurrent.duration._
-
-class CommMsgHandlerSpec extends TestKit(
- ActorSystem("CommMsgHandlerSpec")
-) with ImplicitSender with FunSpecLike with Matchers with MockitoSugar
- with BeforeAndAfter
-{
- private val TestCommId = UUID.randomUUID().toString
- private val TestTargetName = "some test target"
-
- private var kmBuilder: KMBuilder = _
- private var spyCommStorage: CommStorage = _
- private var mockCommCallbacks: CommCallbacks = _
- private var mockActorLoader: ActorLoader = _
- private var commMsgHandler: ActorRef = _
- private var kernelMessageRelayProbe: TestProbe = _
- private var statusDispatchProbe: TestProbe = _
-
- before {
- kmBuilder = KMBuilder()
- mockCommCallbacks = mock[CommCallbacks]
- spyCommStorage = spy(new CommStorage())
-
- mockActorLoader = mock[ActorLoader]
-
- commMsgHandler = system.actorOf(Props(
- classOf[CommMsgHandler],
- mockActorLoader, mock[CommRegistrar], spyCommStorage
- ))
-
- // Used to intercept responses
- kernelMessageRelayProbe = TestProbe()
- when(mockActorLoader.load(SystemActorType.KernelMessageRelay))
- .thenReturn(system.actorSelection(kernelMessageRelayProbe.ref.path.toString))
-
- // Used to intercept busy/idle messages
- statusDispatchProbe = new TestProbe(system)
- when(mockActorLoader.load(SystemActorType.StatusDispatch))
- .thenReturn(system.actorSelection(statusDispatchProbe.ref.path.toString))
- }
-
- describe("CommMsgHandler") {
- describe("#process") {
- it("should execute msg callbacks if the id is registered") {
- // Mark our id as registered
- doReturn(Some(mockCommCallbacks)).when(spyCommStorage)
- .getCommIdCallbacks(TestCommId)
-
- // Send a comm_open message with the test target
- commMsgHandler ! kmBuilder
- .withHeader(CommMsg.toTypeString)
- .withContentString(CommMsg(TestCommId, v5.MsgData.Empty))
- .build
-
- // Should receive a busy and an idle message
- statusDispatchProbe.receiveN(2, 200.milliseconds)
-
- // Verify that the msg callbacks were triggered along the way
- verify(mockCommCallbacks).executeMsgCallbacks(
- any[CommWriter], any[v5.UUID], any[v5.MsgData])
- }
-
- it("should not execute msg callbacks if the id is not registered") {
- // Mark our target as not registered
- doReturn(None).when(spyCommStorage).getCommIdCallbacks(TestCommId)
-
- // Send a comm_msg message with the test id
- commMsgHandler ! kmBuilder
- .withHeader(CommMsg.toTypeString)
- .withContentString(CommMsg(TestCommId, v5.MsgData.Empty))
- .build
-
- // Should receive a busy and an idle message
- statusDispatchProbe.receiveN(2, 200.milliseconds)
-
- // Verify that the msg callbacks were NOT triggered along the way
- verify(mockCommCallbacks, never()).executeMsgCallbacks(
- any[CommWriter], any[v5.UUID], any[v5.MsgData])
- }
-
- it("should do nothing if there is a parsing error") {
- // Send a comm_open message with an invalid content string
- commMsgHandler ! kmBuilder
- .withHeader(CommMsg.toTypeString)
- .withContentString(ClearOutput(_wait = true))
- .build
-
- // TODO: Is there a better way to test for this without an upper time
- // limit? Is there a different logical approach?
- kernelMessageRelayProbe.expectNoMsg(200.milliseconds)
- }
-
- it("should include the parent's header in the parent header of " +
- "outgoing messages"){
-
- // Register a callback that sends a message using the comm writer
- val msgCallback: CommCallbacks.MsgCallback =
- new CommCallbacks.MsgCallback() {
- def apply(v1: CommWriter, v2: v5.UUID, v3: v5.MsgData): Unit =
- v1.writeMsg(MsgData.Empty)
- }
- val callbacks = (new CommCallbacks).addMsgCallback(msgCallback)
- doReturn(Some(callbacks)).when(spyCommStorage)
- .getCommIdCallbacks(TestCommId)
-
- // Send a comm_msg message with the test id
- val msg = kmBuilder
- .withHeader(CommMsg.toTypeString)
- .withContentString(CommMsg(TestCommId, v5.MsgData.Empty))
- .build
- commMsgHandler ! msg
-
- // Verify that the message sent by the handler has the desired property
- kernelMessageRelayProbe.fishForMessage(200.milliseconds) {
- case KernelMessage(_, _, _, parentHeader, _, _) =>
- parentHeader == msg.header
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommOpenHandlerSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommOpenHandlerSpec.scala b/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommOpenHandlerSpec.scala
deleted file mode 100644
index 64013e9..0000000
--- a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommOpenHandlerSpec.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.kernel.protocol.v5.handler
-
-import java.util.UUID
-
-import com.ibm.spark.kernel.protocol.v5
-
-import akka.actor.{Props, ActorRef, ActorSystem}
-import akka.testkit.{TestProbe, ImplicitSender, TestKit}
-import com.ibm.spark.kernel.protocol.v5.content.{CommClose, ClearOutput, CommOpen}
-import com.ibm.spark.kernel.protocol.v5._
-import com.ibm.spark.comm._
-import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
-
-import scala.concurrent.duration._
-
-class CommOpenHandlerSpec extends TestKit(
- ActorSystem("CommOpenHandlerSpec")
-) with ImplicitSender with FunSpecLike with Matchers with MockitoSugar
- with BeforeAndAfter
-{
- private val TestCommId = UUID.randomUUID().toString
- private val TestTargetName = "some test target"
-
- private var kmBuilder: KMBuilder = _
- private var spyCommStorage: CommStorage = _
- private var mockCommCallbacks: CommCallbacks = _
- private var mockCommRegistrar: CommRegistrar = _
- private var mockActorLoader: ActorLoader = _
- private var commOpenHandler: ActorRef = _
- private var kernelMessageRelayProbe: TestProbe = _
- private var statusDispatchProbe: TestProbe = _
-
- before {
- kmBuilder = KMBuilder()
- mockCommCallbacks = mock[CommCallbacks]
- spyCommStorage = spy(new CommStorage())
- mockCommRegistrar = mock[CommRegistrar]
-
- mockActorLoader = mock[ActorLoader]
-
- commOpenHandler = system.actorOf(Props(
- classOf[CommOpenHandler],
- mockActorLoader, mockCommRegistrar, spyCommStorage
- ))
-
- // Used to intercept responses
- kernelMessageRelayProbe = TestProbe()
- when(mockActorLoader.load(SystemActorType.KernelMessageRelay))
- .thenReturn(system.actorSelection(kernelMessageRelayProbe.ref.path.toString))
-
- // Used to intercept busy/idle messages
- statusDispatchProbe = new TestProbe(system)
- when(mockActorLoader.load(SystemActorType.StatusDispatch))
- .thenReturn(system.actorSelection(statusDispatchProbe.ref.path.toString))
- }
-
- describe("CommOpenHandler") {
- describe("#process") {
- it("should execute open callbacks if the target exists") {
- // Mark our target as registered
- doReturn(Some(mockCommCallbacks)).when(spyCommStorage)
- .getTargetCallbacks(TestTargetName)
-
- // Send a comm_open message with the test target
- commOpenHandler ! kmBuilder
- .withHeader(CommOpen.toTypeString)
- .withContentString(CommOpen(TestCommId, TestTargetName, v5.MsgData.Empty))
- .build
-
- // Should receive a busy and an idle message
- statusDispatchProbe.receiveN(2, 200.milliseconds)
-
- // Verify that the open callbacks were triggered along the way
- verify(mockCommCallbacks).executeOpenCallbacks(
- any[CommWriter], any[v5.UUID], anyString(), any[v5.MsgData])
- }
-
- it("should close the comm connection if the target does not exist") {
- // Mark our target as not registered
- doReturn(None).when(spyCommStorage).getTargetCallbacks(TestTargetName)
-
- // Send a comm_open message with the test target
- commOpenHandler ! kmBuilder
- .withHeader(CommOpen.toTypeString)
- .withContentString(CommOpen(TestCommId, TestTargetName, v5.MsgData.Empty))
- .build
-
- // Should receive a close message as a result of the target missing
- kernelMessageRelayProbe.expectMsgPF(200.milliseconds) {
- case KernelMessage(_, _, header, _, _, _) =>
- header.msg_type should be (CommClose.toTypeString)
- }
- }
-
- it("should do nothing if there is a parsing error") {
- // Send a comm_open message with an invalid content string
- commOpenHandler ! kmBuilder
- .withHeader(CommOpen.toTypeString)
- .withContentString(ClearOutput(_wait = true))
- .build
-
- // TODO: Is there a better way to test for this without an upper time
- // limit? Is there a different logical approach?
- kernelMessageRelayProbe.expectNoMsg(200.milliseconds)
- }
-
- it("should include the parent's header in the parent header of " +
- "outgoing messages"){
-
- // Register a callback that sends a message using the comm writer
- val openCallback: CommCallbacks.OpenCallback =
- new CommCallbacks.OpenCallback() {
- def apply(v1: CommWriter, v2: v5.UUID, v3: String, v4: v5.MsgData) =
- v1.writeMsg(MsgData.Empty)
- }
- val callbacks = (new CommCallbacks).addOpenCallback(openCallback)
- doReturn(Some(callbacks)).when(spyCommStorage)
- .getCommIdCallbacks(TestCommId)
-
- // Send a comm_open message
- val msg = kmBuilder
- .withHeader(CommOpen.toTypeString)
- .withContentString(
- CommOpen(TestCommId, TestTargetName, v5.MsgData.Empty)
- )
- .build
- commOpenHandler ! msg
-
- // Verify that the message sent by the handler has the desired property
- kernelMessageRelayProbe.fishForMessage(200.milliseconds) {
- case KernelMessage(_, _, _, parentHeader, _, _) =>
- parentHeader == msg.header
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/ExecuteRequestHandlerSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/ExecuteRequestHandlerSpec.scala b/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/ExecuteRequestHandlerSpec.scala
deleted file mode 100644
index c22bc41..0000000
--- a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/ExecuteRequestHandlerSpec.scala
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.kernel.protocol.v5.handler
-
-import java.io.OutputStream
-import java.util.concurrent.atomic.AtomicInteger
-
-import akka.actor._
-import akka.testkit.{ImplicitSender, TestKit, TestProbe}
-import com.ibm.spark.kernel.api.{FactoryMethods, FactoryMethodsLike, Kernel}
-import com.ibm.spark.kernel.protocol.v5._
-import com.ibm.spark.kernel.protocol.v5.content._
-import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
-import com.ibm.spark.kernel.protocol.v5Test._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
-import play.api.libs.json.Json
-
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-import scala.concurrent.duration._
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent._
-
-class ExecuteRequestHandlerSpec extends TestKit(
- ActorSystem("ExecuteRequestHandlerSpec")
-) with ImplicitSender with FunSpecLike with Matchers with MockitoSugar
- with BeforeAndAfter {
-
- private var mockActorLoader: ActorLoader = _
- private var mockFactoryMethods: FactoryMethods = _
- private var mockKernel: Kernel = _
- private var mockOutputStream: OutputStream = _
- private var handlerActor: ActorRef = _
- private var kernelMessageRelayProbe: TestProbe = _
- private var executeRequestRelayProbe: TestProbe = _
- private var statusDispatchProbe: TestProbe = _
-
- before {
- mockActorLoader = mock[ActorLoader]
- mockFactoryMethods = mock[FactoryMethods]
- mockKernel = mock[Kernel]
- mockOutputStream = mock[OutputStream]
- doReturn(mockFactoryMethods).when(mockKernel)
- .factory(any[KernelMessage], any[KMBuilder])
-
- doReturn(mockOutputStream).when(mockFactoryMethods)
- .newKernelOutputStream(anyString(), anyBoolean())
-
- // Add our handler and mock interpreter to the actor system
- handlerActor = system.actorOf(Props(
- classOf[ExecuteRequestHandler],
- mockActorLoader,
- mockKernel
- ))
-
- kernelMessageRelayProbe = new TestProbe(system)
- when(mockActorLoader.load(SystemActorType.KernelMessageRelay))
- .thenReturn(system.actorSelection(kernelMessageRelayProbe.ref.path.toString))
-
- executeRequestRelayProbe = new TestProbe(system)
- when(mockActorLoader.load(SystemActorType.ExecuteRequestRelay))
- .thenReturn(system.actorSelection(executeRequestRelayProbe.ref.path.toString))
-
- statusDispatchProbe = new TestProbe(system)
- when(mockActorLoader.load(SystemActorType.StatusDispatch))
- .thenReturn(system.actorSelection(statusDispatchProbe.ref.path.toString))
- }
-
- /**
- * This method simulates the interpreter passing back an
- * execute result and reply.
- */
- def replyToHandlerWithOkAndResult() = {
- // This stubs the behaviour of the interpreter executing code
- val expectedClass = classOf[(ExecuteRequest, KernelMessage, OutputStream)]
- executeRequestRelayProbe.expectMsgClass(expectedClass)
- executeRequestRelayProbe.reply((
- ExecuteReplyOk(1, Some(Payloads()), Some(UserExpressions())),
- ExecuteResult(1, Data("text/plain" -> "resulty result"), Metadata())
- ))
- }
-
- def replyToHandlerWithOk() = {
- // This stubs the behaviour of the interpreter executing code
- val expectedClass = classOf[(ExecuteRequest, KernelMessage, OutputStream)]
- executeRequestRelayProbe.expectMsgClass(expectedClass)
- executeRequestRelayProbe.reply((
- ExecuteReplyOk(1, Some(Payloads()), Some(UserExpressions())),
- ExecuteResult(1, Data("text/plain" -> ""), Metadata())
- ))
- }
-
- /**
- * This method simulates the interpreter passing back an
- * execute result and reply
- */
- def replyToHandlerWithErrorAndResult() = {
- // This stubs the behaviour of the interpreter executing code
- val expectedClass = classOf[(ExecuteRequest, KernelMessage, OutputStream)]
- executeRequestRelayProbe.expectMsgClass(expectedClass)
- executeRequestRelayProbe.reply((
- ExecuteReplyError(1, Some(""), Some(""), Some(Nil)),
- ExecuteResult(1, Data("text/plain" -> "resulty result"), Metadata())
- ))
- }
-
- describe("ExecuteRequestHandler( ActorLoader )") {
- describe("#receive( KernelMessage ) when interpreter replies") {
-
- it("should send an execute result message if the result is not empty") {
- handlerActor ! MockExecuteRequestKernelMessage
- replyToHandlerWithOkAndResult()
- kernelMessageRelayProbe.fishForMessage(100.milliseconds) {
- case KernelMessage(_, _, header, _, _, _) =>
- header.msg_type == ExecuteResult.toTypeString
- }
- }
-
- it("should not send an execute result message if there is no result") {
- handlerActor ! MockExecuteRequestKernelMessage
- replyToHandlerWithOk()
- kernelMessageRelayProbe.fishForMessage(200.milliseconds) {
- case KernelMessage(_, _, header, _, _, _) =>
- header.msg_type != ExecuteResult.toTypeString
- }
-
- }
-
- it("should send an execute reply message") {
- handlerActor ! MockExecuteRequestKernelMessage
- replyToHandlerWithOkAndResult()
- kernelMessageRelayProbe.fishForMessage(200.milliseconds) {
- case KernelMessage(_, _, header, _, _, _) =>
- header.msg_type == ExecuteResult.toTypeString
- }
- }
-
- it("should send a status idle message after the reply and result") {
- handlerActor ! MockExecuteRequestKernelMessage
- replyToHandlerWithOkAndResult()
-
- val msgCount = new AtomicInteger(0)
- var statusMsgNum = -1
- var statusReceived = false
-
- val f1 = future {
- kernelMessageRelayProbe.fishForMessage(4.seconds) {
- case KernelMessage(_, _, header, _, _, _) =>
- if (header.msg_type == ExecuteResult.toTypeString &&
- !statusReceived)
- msgCount.incrementAndGet()
- else if (header.msg_type == ExecuteReply.toTypeString &&
- !statusReceived)
- msgCount.incrementAndGet()
- statusReceived || (msgCount.get() >= 2)
- }
- }
-
- val f2 = future {
- statusDispatchProbe.fishForMessage(4.seconds) {
- case (status, header) =>
- if (status == KernelStatusIdle.toString)
- statusReceived = true
- statusMsgNum = msgCount.get()
- statusReceived || (msgCount.get() >= 2)
- }
- }
- val fs = (f1 zip f2)
- Await.ready(fs, 5.seconds)
-
- statusMsgNum should equal(2)
- }
-
- it("should send an execute input message") {
- handlerActor ! MockExecuteRequestKernelMessage
- kernelMessageRelayProbe.fishForMessage(200.milliseconds) {
- case KernelMessage(_, _, header, _, _, _) =>
- header.msg_type == ExecuteInput.toTypeString
- }
- }
-
- it("should send a message with ids equal to the incoming " +
- "KernelMessage's ids") {
- handlerActor ! MockExecuteRequestKernelMessage
- kernelMessageRelayProbe.fishForMessage(200.milliseconds) {
- case KernelMessage(ids, _, _, _, _, _) =>
- ids == MockExecuteRequestKernelMessage.ids
- }
- }
-
- it("should send a message with parent header equal to the incoming " +
- "KernelMessage's header") {
- handlerActor ! MockExecuteRequestKernelMessage
- kernelMessageRelayProbe.fishForMessage(200.milliseconds) {
- case KernelMessage(_, _, _, parentHeader, _, _) =>
- parentHeader == MockExecuteRequestKernelMessage.header
- }
- }
-
- // TODO: Investigate if this is still relevant at all
-// it("should send a status busy and idle message") {
-// handlerActor ! MockExecuteRequestKernelMessage
-// replyToHandlerWithOkAndResult()
-// var busy = false
-// var idle = false
-//
-// statusDispatchProbe.receiveWhile(100.milliseconds) {
-// case Tuple2(status: KernelStatusType, header: Header)=>
-// if(status == KernelStatusType.Busy)
-// busy = true
-// if(status == KernelStatusType.Idle)
-// idle = true
-// }
-//
-// idle should be (true)
-// busy should be (true)
-// }
- }
- }
-
- // Testing error timeout for interpreter future
- describe("ExecuteRequestHandler( ActorLoader )") {
- describe("#receive( KernelMessage with bad JSON content )"){
- it("should respond with an execute_reply with status error") {
- handlerActor ! MockKernelMessageWithBadExecuteRequest
-
- kernelMessageRelayProbe.fishForMessage(200.milliseconds) {
- // Only mark as successful if this specific message was received
- case KernelMessage(_, _, header, _, _, contentString)
- if header.msg_type == ExecuteReply.toTypeString =>
- val reply = Json.parse(contentString).as[ExecuteReply]
- reply.status == "error"
- case _ => false
- }
- }
-
- it("should send error message to relay") {
- handlerActor ! MockKernelMessageWithBadExecuteRequest
-
- kernelMessageRelayProbe.fishForMessage(200.milliseconds) {
- // Only mark as successful if this specific message was received
- case KernelMessage(_, _, header, _, _, _)
- if header.msg_type == ErrorContent.toTypeString => true
- case _ => false
- }
- }
-
- // TODO: Investigate if this is still relevant at all
-// it("should send a status idle message") {
-// handlerActor ! MockKernelMessageWithBadExecuteRequest
-// var busy = false
-// var idle = false
-//
-// statusDispatchProbe.receiveWhile(100.milliseconds) {
-// case Tuple2(status: KernelStatusType, header: Header)=>
-// if(status == KernelStatusType.Busy)
-// busy = true
-// if(status == KernelStatusType.Idle)
-// idle = true
-// }
-//
-// idle should be (true)
-// busy should be (false)
-// }
- }
- }
-}