You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lb...@apache.org on 2016/01/22 23:07:36 UTC
[28/51] [abbrv] incubator-toree git commit: Moved scala files to new
locations based on new package
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketConfig.scala
----------------------------------------------------------------------
diff --git a/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketConfig.scala b/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketConfig.scala
new file mode 100644
index 0000000..6f3789d
--- /dev/null
+++ b/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketConfig.scala
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.client.socket
+
+import com.typesafe.config.Config
+import play.api.libs.json.Json
+
+case class SocketConfig (
+ stdin_port: Int,
+ control_port: Int,
+ hb_port: Int,
+ shell_port: Int,
+ iopub_port: Int,
+ ip : String,
+ transport: String,
+ signature_scheme: String,
+ key: String
+)
+
+object SocketConfig {
+ implicit val socketConfigReads = Json.reads[SocketConfig]
+ implicit val socketConfigWrites = Json.writes[SocketConfig]
+
+ def fromConfig(config: Config) = {
+ new SocketConfig(
+ config.getInt("stdin_port"),
+ config.getInt("control_port"),
+ config.getInt("hb_port"),
+ config.getInt("shell_port"),
+ config.getInt("iopub_port"),
+ config.getString("ip"),
+ config.getString("transport"),
+ config.getString("signature_scheme"),
+ config.getString("key")
+ )
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketConnection.scala
----------------------------------------------------------------------
diff --git a/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketConnection.scala b/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketConnection.scala
new file mode 100644
index 0000000..fae1d0e
--- /dev/null
+++ b/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketConnection.scala
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.client.socket
+
+object SocketConnection {
+ def apply(protocol: String, ip: String, port: Int) =
+ new SocketConnection(protocol, ip, port)
+}
+
+/**
+ * Represent a connection string for a socket
+ * @param protocol The protocol portion of the connection (e.g. tcp, akka, udp)
+ * @param ip The hostname or ip address to bind on (e.g. *, myhost, 127.0.0.1)
+ * @param port The port for the socket to listen on
+ */
+class SocketConnection(protocol: String, ip: String, port: Int) {
+ private val SocketConnectionString : String = "%s://%s:%d"
+
+ override def toString: String = {
+ SocketConnectionString.format(protocol, ip, port)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketFactory.scala
----------------------------------------------------------------------
diff --git a/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketFactory.scala b/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketFactory.scala
new file mode 100644
index 0000000..07b05d7
--- /dev/null
+++ b/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketFactory.scala
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.client.socket
+
+import java.util.UUID
+
+import akka.actor.{Props, ActorRef, ActorSystem}
+import com.ibm.spark.communication.actors.{DealerSocketActor, ReqSocketActor, SubSocketActor}
+
+object SocketFactory {
+ def apply(socketConfig: SocketConfig) = {
+ new SocketFactory(socketConfig)
+ }
+}
+
+/**
+ * A Factor class to provide various socket connections for IPython Kernel Spec.
+ *
+ * @param socketConfig The configuration for the sockets to be properly
+ * instantiated
+ */
+class SocketFactory(socketConfig: SocketConfig) {
+ /**
+ * Represents the identity shared between Shell and Stdin connections.
+ */
+ private val ZmqIdentity = UUID.randomUUID().toString
+
+ val HeartbeatConnection = SocketConnection(
+ socketConfig.transport, socketConfig.ip, socketConfig.hb_port)
+ val ShellConnection = SocketConnection(
+ socketConfig.transport, socketConfig.ip, socketConfig.shell_port)
+ val IOPubConnection = SocketConnection(
+ socketConfig.transport, socketConfig.ip, socketConfig.iopub_port)
+ val StdinConnection = SocketConnection(
+ socketConfig.transport, socketConfig.ip, socketConfig.stdin_port)
+
+ /**
+ * Creates a ZeroMQ request socket representing the client endpoint for
+ * heartbeat messages.
+ *
+ * @param system The actor system the socket actor will belong
+ * @param listener The actor who will receive
+ *
+ * @return The ActorRef created for the socket connection
+ */
+ def HeartbeatClient(system: ActorSystem, listener: ActorRef) : ActorRef = {
+ system.actorOf(Props(classOf[ReqSocketActor], HeartbeatConnection.toString, listener))
+// ZeroMQExtension(system).newReqSocket(Array(
+// Listener(listener), Connect(HeartbeatConnection.toString)
+// ))
+ }
+
+ /**
+ * Creates a ZeroMQ request socket representing the client endpoint for shell
+ * messages. Generates an id for
+ * <a href="http://api.zeromq.org/2-1:zmq-setsockopt#toc6">
+ * Router/Dealer message routing</a>.
+ *
+ * @param system The actor system the socket actor will belong
+ * @param listener The actor who will receive
+ *
+ * @return The ActorRef created for the socket connection
+ */
+ def ShellClient(system: ActorSystem, listener: ActorRef) : ActorRef = {
+ system.actorOf(Props(classOf[DealerSocketActor], ShellConnection.toString, listener))
+ //socket.setIdentity(ZmqIdentity)
+// ZeroMQExtension(system).newDealerSocket(Array(
+// Listener(listener), Connect(ShellConnection.toString),
+// Identity(ZmqIdentity)
+// ))
+ }
+
+ /**
+ * Creates a ZeroMQ reply socket representing the client endpoint for stdin
+ * messages. Generates an id for
+ * <a href="http://api.zeromq.org/2-1:zmq-setsockopt#toc6">
+ * Router/Dealer message routing</a>.
+ *
+ * @param system The actor system the socket actor will belong
+ * @param listener The actor who will receive
+ *
+ * @return The ActorRef created for the socket connection
+ */
+ def StdinClient(system: ActorSystem, listener: ActorRef) : ActorRef = {
+ system.actorOf(Props(classOf[DealerSocketActor], StdinConnection.toString, listener))
+ //socket.setIdentity(ZmqIdentity)
+// ZeroMQExtension(system).newDealerSocket(Array(
+// Listener(listener), Connect(StdinConnection.toString),
+// Identity(ZmqIdentity)
+// ))
+ }
+
+ /**
+ * Creates a ZeroMQ request socket representing the client endpoint for IOPub
+ * messages.
+ *
+ * @param system The actor system the socket actor will belong
+ * @param listener The actor who will receive
+ *
+ * @return The ActorRef created for the socket connection
+ */
+ def IOPubClient(system: ActorSystem, listener: ActorRef) : ActorRef = {
+ system.actorOf(Props(classOf[SubSocketActor], IOPubConnection.toString, listener))
+ //socket.subscribe(ZMQ.SUBSCRIPTION_ALL)
+// ZeroMQExtension(system).newSubSocket(Array(
+// Listener(listener), Connect(IOPubConnection.toString), SubscribeAll
+// ))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/StdinClient.scala
----------------------------------------------------------------------
diff --git a/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/StdinClient.scala b/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/StdinClient.scala
new file mode 100644
index 0000000..22c6071
--- /dev/null
+++ b/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/StdinClient.scala
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.client.socket
+
+import akka.actor.Actor
+import com.ibm.spark.communication.ZMQMessage
+import com.ibm.spark.communication.security.SecurityActorType
+import com.ibm.spark.kernel.protocol.v5.client.ActorLoader
+import com.ibm.spark.kernel.protocol.v5.{HeaderBuilder, KMBuilder, KernelMessage}
+import com.ibm.spark.kernel.protocol.v5.content.{InputReply, InputRequest}
+import com.ibm.spark.utils.LogLike
+import com.ibm.spark.kernel.protocol.v5.client.Utilities._
+import play.api.libs.json.Json
+
+import StdinClient._
+import akka.pattern.ask
+
+import scala.concurrent.duration._
+import scala.concurrent.Await
+
+object StdinClient {
+ type ResponseFunction = (String, Boolean) => String
+ val EmptyResponseFunction: ResponseFunction = (_, _) => ""
+}
+
+/**
+ * The client endpoint for Stdin messages specified in the IPython Kernel Spec
+ * @param socketFactory A factory to create the ZeroMQ socket connection
+ * @param actorLoader The loader used to retrieve actors
+ * @param signatureEnabled Whether or not to check and provide signatures
+ */
+class StdinClient(
+ socketFactory: SocketFactory,
+ actorLoader: ActorLoader,
+ signatureEnabled: Boolean
+) extends Actor with LogLike {
+ logger.debug("Created stdin client actor")
+
+ private val socket = socketFactory.StdinClient(context.system, self)
+
+ /**
+ * The function to use for generating a response from an input_request
+ * message.
+ */
+ private var responseFunc: ResponseFunction = EmptyResponseFunction
+
+ override def receive: Receive = {
+ case responseFunc: ResponseFunction =>
+ logger.debug("Updating response function")
+ this.responseFunc = responseFunc
+
+ case message: ZMQMessage =>
+ logger.debug("Received stdin kernel message")
+ val kernelMessage: KernelMessage = message
+ val messageType = kernelMessage.header.msg_type
+
+ if (messageType == InputRequest.toTypeString) {
+ logger.debug("Message is an input request")
+
+ val inputRequest =
+ Json.parse(kernelMessage.contentString).as[InputRequest]
+ val value = responseFunc(inputRequest.prompt, inputRequest.password)
+ val inputReply = InputReply(value)
+
+ val newKernelMessage = KMBuilder()
+ .withParent(kernelMessage)
+ .withHeader(HeaderBuilder.empty.copy(
+ msg_type = InputReply.toTypeString,
+ session = getSessionId
+ ))
+ .withContentString(inputReply)
+ .build
+
+ import scala.concurrent.ExecutionContext.Implicits.global
+ val messageWithSignature = if (signatureEnabled) {
+ val signatureManager =
+ actorLoader.load(SecurityActorType.SignatureManager)
+ val signatureMessage = signatureManager ? newKernelMessage
+ Await.result(signatureMessage, 100.milliseconds)
+ .asInstanceOf[KernelMessage]
+ } else newKernelMessage
+
+ val zmqMessage: ZMQMessage = messageWithSignature
+
+ socket ! zmqMessage
+ } else {
+ logger.debug(s"Unknown message of type $messageType")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/com/ibm/spark/comm/ClientCommManagerSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/com/ibm/spark/comm/ClientCommManagerSpec.scala b/client/src/test/scala/com/ibm/spark/comm/ClientCommManagerSpec.scala
deleted file mode 100644
index 85ef4aa..0000000
--- a/client/src/test/scala/com/ibm/spark/comm/ClientCommManagerSpec.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.comm
-
-import com.ibm.spark.kernel.protocol.v5
-import com.ibm.spark.kernel.protocol.v5._
-import com.ibm.spark.kernel.protocol.v5.client.ActorLoader
-import com.ibm.spark.kernel.protocol.v5.content.CommContent
-import org.scalatest.mock.MockitoSugar
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-import org.scalatest.{BeforeAndAfter, FunSpec, Matchers}
-
-class ClientCommManagerSpec extends FunSpec with Matchers with BeforeAndAfter
- with MockitoSugar
-{
- private val TestTargetName = "some target"
-
- private var mockActorLoader: ActorLoader = _
- private var mockKMBuilder: KMBuilder = _
- private var mockCommRegistrar: CommRegistrar = _
- private var clientCommManager: ClientCommManager = _
-
- private var generatedCommWriter: CommWriter = _
-
- before {
- mockActorLoader = mock[ActorLoader]
- mockKMBuilder = mock[KMBuilder]
- mockCommRegistrar = mock[CommRegistrar]
-
- clientCommManager = new ClientCommManager(
- mockActorLoader,
- mockKMBuilder,
- mockCommRegistrar
- ) {
- override protected def newCommWriter(commId: UUID): CommWriter = {
- val commWriter = super.newCommWriter(commId)
-
- generatedCommWriter = commWriter
-
- val spyCommWriter = spy(commWriter)
- doNothing().when(spyCommWriter)
- .sendCommKernelMessage(any[KernelMessageContent with CommContent])
-
- spyCommWriter
- }
- }
- }
-
- describe("ClientCommManager") {
- describe("#open") {
- it("should return a wrapped instance of ClientCommWriter") {
- clientCommManager.open(TestTargetName, v5.MsgData.Empty)
-
- // Exposed hackishly for testing
- generatedCommWriter shouldBe a [ClientCommWriter]
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/com/ibm/spark/comm/ClientCommWriterSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/com/ibm/spark/comm/ClientCommWriterSpec.scala b/client/src/test/scala/com/ibm/spark/comm/ClientCommWriterSpec.scala
deleted file mode 100644
index f5a4b43..0000000
--- a/client/src/test/scala/com/ibm/spark/comm/ClientCommWriterSpec.scala
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.comm
-
-import java.util.UUID
-
-import akka.actor.{ActorSelection, ActorSystem}
-import akka.testkit.{TestKit, TestProbe}
-import com.ibm.spark.kernel.protocol.v5._
-import com.ibm.spark.kernel.protocol.v5.client.ActorLoader
-import com.ibm.spark.kernel.protocol.v5.content._
-import com.typesafe.config.ConfigFactory
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
-import play.api.libs.json.Json
-
-import scala.concurrent.duration._
-
-object ClientCommWriterSpec {
- val config ="""
- akka {
- loglevel = "WARNING"
- }"""
-}
-
-class ClientCommWriterSpec extends TestKit(
- ActorSystem("ClientCommWriterSpec",
- ConfigFactory.parseString(ClientCommWriterSpec.config))
-) with FunSpecLike with Matchers with BeforeAndAfter with MockitoSugar
-{
-
- private val commId = UUID.randomUUID().toString
- private var clientCommWriter: ClientCommWriter = _
- private var kernelMessageBuilder: KMBuilder = _
-
- private var actorLoader: ActorLoader = _
- private var shellSocketProbe: TestProbe = _
-
- /**
- * Retrieves the next message available.
- *
- * @return The KernelMessage instance (or an error if timed out)
- */
- private def getNextMessage =
- shellSocketProbe.receiveOne(200.milliseconds)
- .asInstanceOf[KernelMessage]
-
- /**
- * Retrieves the next message available and returns its type.
- *
- * @return The type of the message (pulled from message header)
- */
- private def getNextMessageType = getNextMessage.header.msg_type
-
- /**
- * Retrieves the next message available and parses the content string.
- *
- * @tparam T The type to coerce the content string into
- *
- * @return The resulting KernelMessageContent instance
- */
- private def getNextMessageContents[T <: KernelMessageContent]
- (implicit fjs: play.api.libs.json.Reads[T], mf: Manifest[T]) =
- {
- val receivedMessage = getNextMessage
-
- Json.parse(receivedMessage.contentString).as[T]
- }
-
- before {
- kernelMessageBuilder = spy(KMBuilder())
-
- // Construct path for kernel message relay
- actorLoader = mock[ActorLoader]
- shellSocketProbe = TestProbe()
- val shellSocketSelection: ActorSelection =
- system.actorSelection(shellSocketProbe.ref.path.toString)
- doReturn(shellSocketSelection)
- .when(actorLoader).load(SocketType.ShellClient)
-
- // Create a new writer to use for testing
- clientCommWriter =
- new ClientCommWriter(actorLoader, kernelMessageBuilder, commId)
- }
-
- describe("ClientCommWriter") {
- describe("#writeOpen") {
- it("should send a comm_open message to the relay") {
- clientCommWriter.writeOpen(anyString())
-
- getNextMessageType should be (CommOpen.toTypeString)
- }
-
- it("should include the comm_id in the message") {
- val expected = commId
- clientCommWriter.writeOpen(anyString())
-
- val actual = getNextMessageContents[CommOpen].comm_id
-
- actual should be (expected)
- }
-
- it("should include the target name in the message") {
- val expected = "<TARGET_NAME>"
- clientCommWriter.writeOpen(expected)
-
- val actual = getNextMessageContents[CommOpen].target_name
-
- actual should be (expected)
- }
-
- it("should provide empty data in the message if no data is provided") {
- val expected = MsgData.Empty
- clientCommWriter.writeOpen(anyString())
-
- val actual = getNextMessageContents[CommOpen].data
-
- actual should be (expected)
- }
-
- it("should include the data in the message") {
- val expected = MsgData("some key" -> "some value")
- clientCommWriter.writeOpen(anyString(), expected)
-
- val actual = getNextMessageContents[CommOpen].data
-
- actual should be (expected)
- }
- }
-
- describe("#writeMsg") {
- it("should send a comm_msg message to the relay") {
- clientCommWriter.writeMsg(MsgData.Empty)
-
- getNextMessageType should be (CommMsg.toTypeString)
- }
-
- it("should include the comm_id in the message") {
- val expected = commId
- clientCommWriter.writeMsg(MsgData.Empty)
-
- val actual = getNextMessageContents[CommMsg].comm_id
-
- actual should be (expected)
- }
-
- it("should fail a require if the data is null") {
- intercept[IllegalArgumentException] {
- clientCommWriter.writeMsg(null)
- }
- }
-
- it("should include the data in the message") {
- val expected = MsgData("some key" -> "some value")
- clientCommWriter.writeMsg(expected)
-
- val actual = getNextMessageContents[CommMsg].data
-
- actual should be (expected)
- }
- }
-
- describe("#writeClose") {
- it("should send a comm_close message to the relay") {
- clientCommWriter.writeClose()
-
- getNextMessageType should be (CommClose.toTypeString)
- }
-
- it("should include the comm_id in the message") {
- val expected = commId
- clientCommWriter.writeClose()
-
- val actual = getNextMessageContents[CommClose].comm_id
-
- actual should be (expected)
- }
-
- it("should provide empty data in the message if no data is provided") {
- val expected = MsgData.Empty
- clientCommWriter.writeClose()
-
- val actual = getNextMessageContents[CommClose].data
-
- actual should be (expected)
- }
-
- it("should include the data in the message") {
- val expected = MsgData("some key" -> "some value")
- clientCommWriter.writeClose(expected)
-
- val actual = getNextMessageContents[CommClose].data
-
- actual should be (expected)
- }
- }
-
- describe("#write") {
- it("should send a comm_msg message to the relay") {
- clientCommWriter.write(Array('a'), 0, 1)
-
- getNextMessageType should be (CommMsg.toTypeString)
- }
-
- it("should include the comm_id in the message") {
- val expected = commId
- clientCommWriter.write(Array('a'), 0, 1)
-
- val actual = getNextMessageContents[CommMsg].comm_id
-
- actual should be (expected)
- }
-
- it("should package the string as part of the data with a 'message' key") {
- val expected = MsgData("message" -> "a")
- clientCommWriter.write(Array('a'), 0, 1)
-
- val actual = getNextMessageContents[CommMsg].data
-
- actual should be (expected)
- }
- }
-
- describe("#flush") {
- it("should do nothing") {
- // TODO: Is this test necessary? It does nothing.
- clientCommWriter.flush()
- }
- }
-
- describe("#close") {
- it("should send a comm_close message to the relay") {
- clientCommWriter.close()
-
- getNextMessageType should be (CommClose.toTypeString)
- }
-
- it("should include the comm_id in the message") {
- val expected = commId
- clientCommWriter.close()
-
- val actual = getNextMessageContents[CommClose].comm_id
-
- actual should be (expected)
- }
-
- it("should provide empty data in the message") {
- val expected = MsgData.Empty
- clientCommWriter.close()
-
- val actual = getNextMessageContents[CommClose].data
-
- actual should be (expected)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/SparkKernelClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/SparkKernelClientSpec.scala b/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/SparkKernelClientSpec.scala
deleted file mode 100644
index 8db29f8..0000000
--- a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/SparkKernelClientSpec.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.kernel.protocol.v5.client
-
-import akka.actor.ActorSystem
-import akka.testkit.{TestKit, TestProbe}
-import com.ibm.spark.comm.{CommCallbacks, CommStorage, CommRegistrar}
-import com.ibm.spark.kernel.protocol.v5
-import com.ibm.spark.kernel.protocol.v5._
-import com.ibm.spark.kernel.protocol.v5.client.execution.ExecuteRequestTuple
-import scala.concurrent.duration._
-import org.mockito.Mockito._
-import org.mockito.Matchers.{eq => mockEq, _}
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
-
-class SparkKernelClientSpec
- extends TestKit(ActorSystem("SparkKernelClientActorSystem"))
- with Matchers with MockitoSugar with FunSpecLike with BeforeAndAfter
-{
- private val TestTargetName = "some target"
-
- private var mockActorLoader: ActorLoader = _
- private var mockCommRegistrar: CommRegistrar = _
- private var sparkKernelClient: SparkKernelClient = _
- private var executeRequestProbe: TestProbe = _
- private var shellClientProbe: TestProbe = _
-
- before {
- mockActorLoader = mock[ActorLoader]
- mockCommRegistrar = mock[CommRegistrar]
-
- executeRequestProbe = TestProbe()
- when(mockActorLoader.load(MessageType.Incoming.ExecuteRequest))
- .thenReturn(system.actorSelection(executeRequestProbe.ref.path.toString))
-
- shellClientProbe = TestProbe()
- when(mockActorLoader.load(SocketType.ShellClient))
- .thenReturn(system.actorSelection(shellClientProbe.ref.path.toString))
-
- sparkKernelClient = new SparkKernelClient(
- mockActorLoader, system, mockCommRegistrar)
- }
-
- describe("SparkKernelClient") {
- describe("#execute") {
- it("should send an ExecuteRequest message") {
- val func = (x: Any) => println(x)
- sparkKernelClient.execute("val foo = 2")
- executeRequestProbe.expectMsgClass(classOf[ExecuteRequestTuple])
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/execution/DeferredExecutionTest.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/execution/DeferredExecutionTest.scala b/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/execution/DeferredExecutionTest.scala
deleted file mode 100644
index db3205c..0000000
--- a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/execution/DeferredExecutionTest.scala
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.kernel.protocol.v5.client.execution
-
-import com.ibm.spark.kernel.protocol.v5.content.{StreamContent, ExecuteResult}
-import com.ibm.spark.kernel.protocol.v5.content._
-import com.ibm.spark.kernel.protocol.v5._
-import org.scalatest.{Matchers, FunSpec}
-import org.scalatest.concurrent.ScalaFutures
-
-import scala.concurrent.Promise
-import scala.util.{Try, Failure, Success}
-
-object DeferredExecutionTest {
- val executeReplyError = Option(ExecuteReplyError(1, None, None, None))
- val executeReplyOk = Option(ExecuteReplyOk(1, None, None))
- val executeResult = Option(ExecuteResult(1, Data(), Metadata()))
-
- def mockSendingKernelMessages(deferredExecution: DeferredExecution,
- executeReplyOption: Option[ExecuteReply],
- executeResultOption: Option[ExecuteResult]): Unit = {
- // Mock behaviour of the kernel sending messages to us
- executeResultOption.map {
- executeResult =>
- deferredExecution.resolveResult(executeResult)
- }
- executeReplyOption.map {
- executeReply=>
- deferredExecution.resolveReply(executeReply)
- }
- }
-
- def processExecuteResult(executeResult: ExecuteResult)
- (implicit promise: Promise[Try[ExecuteResultPromise]]): Unit = {
- promise.success(Success(new ExecuteResultPromise))
- }
-
- def processExecuteReply(executeReplyError: ExecuteReplyError)
- (implicit promise: Promise[Try[ExecuteReplyPromise]]): Unit = {
- promise.success(Success(new ExecuteReplyPromise))
- }
-
- def processOnSuccessfulCompletion(executeReplyOk: ExecuteReplyOk)
- (implicit promise: Promise[Try[String]]): Unit = {
- promise.success(Success("Success"))
- }
-}
-
-class ExecuteResultPromise {}
-class ExecuteReplyPromise {}
-
-class DeferredExecutionTest extends FunSpec with ScalaFutures with Matchers {
- import DeferredExecutionTest._
- describe("DeferredExecution") {
- describe("onResult( callback )"){
- it("should run all onResult callbacks when ExecuteResult and " +
- "successful ExecuteReply are returned") {
- implicit val executeResultPromise: Promise[Try[ExecuteResultPromise]] = Promise()
- val deferredExecution: DeferredExecution = DeferredExecution()
- .onResult(processExecuteResult)
-
- mockSendingKernelMessages(deferredExecution, executeReplyOk, executeResult)
-
- whenReady(executeResultPromise.future) {
- case Success(v) => assert(true)
- case Failure(exception: Throwable) =>
- fail("Promise resolved with failure when processing " +
- "execute result.", exception)
- case unknownValue=>
- fail(s"Promised resolved with unknown value: ${unknownValue}")
- }
- }
- it("should run all onResultCallbacks registered after deferred has " +
- "been resolved") {
- val executeResultPromise: Promise[Int] = Promise()
- var counter = 0
- def processExecuteResult (executeResult: ExecuteResult) : Unit = {
- counter = counter + 1
- // Hack to allow callbacks to occur after meeting our assertion value
- if(counter == 2) {
- Thread.sleep(500)
- executeResultPromise.success(counter)
- }
- }
-
- val deferredExecution: DeferredExecution = DeferredExecution()
- .onResult(processExecuteResult)
-
- mockSendingKernelMessages(deferredExecution, executeReplyOk, executeResult)
- // register callback after code execution has completed
- deferredExecution.onResult(processExecuteResult)
-
- whenReady(executeResultPromise.future){ _ => counter should be(2) }
-
- }
- it("should not run onResult callbacks when ExecuteReply is a failure") {
- // This promise should be resolved by the deferred
- implicit val executeReplyPromise: Promise[Try[ExecuteReplyPromise]] = Promise()
- // This promise should not be resolved by the deferred
- implicit val executeResultPromise: Promise[Try[ExecuteResultPromise]] = Promise()
-
- val deferredExecution: DeferredExecution = DeferredExecution()
- .onError(processExecuteReply)
- .onResult(processExecuteResult)
- mockSendingKernelMessages(deferredExecution, executeReplyError, executeResult)
-
- whenReady(executeReplyPromise.future) { _ =>
- executeResultPromise.isCompleted should be(false)
- }
- }
- }
- describe("onStream( callback )"){
- it("should execute all streaming callbacks") {
- var counter = 0
- val streamingResultPromise: Promise[Int] = Promise()
- def processStreamContent (streamContent: StreamContent) : Unit = {
- counter = counter + 1
- if (counter == 4)
- streamingResultPromise.success(counter)
- }
-
- val deferredExecution: DeferredExecution = DeferredExecution()
- .onStream(processStreamContent)
- .onStream(processStreamContent)
-
- deferredExecution.emitStreamContent(StreamContent("stdout","msg"))
- deferredExecution.emitStreamContent(StreamContent("stdout","msg2"))
-
- whenReady(streamingResultPromise.future){ counterValue =>
- counterValue should be(4)
- }
- }
- }
-
- describe("onError( callback )") {
- it("should run all onError callbacks") {
- implicit val executeReplyPromise: Promise[Try[ExecuteReplyPromise]] = Promise()
- val deferredExecution: DeferredExecution = DeferredExecution()
- .onError(processExecuteReply)
-
- mockSendingKernelMessages(deferredExecution, executeReplyError, executeResult)
-
- whenReady(executeReplyPromise.future) {
- case Success(v) => assert(true)
- case Failure(exception: Throwable) =>
- fail("Promised resolved with failure while trying to " +
- "process execute result.", exception)
- case unknownValue=>
- fail(s"Promised resolved with unknown value: ${unknownValue}")
- }
- }
- it("should not run onError callbacks when ExecuteReply is a success") {
- // This promise and callback should not be executed by the deferred
- implicit val executeReplyPromise: Promise[Try[ExecuteReplyPromise]] = Promise()
- // This promise and callback should be executed by the deferred
- implicit val executeResultPromise: Promise[Try[ExecuteResultPromise]] = Promise()
-
- val deferredExecution: DeferredExecution = DeferredExecution()
- .onError(processExecuteReply)
- .onResult(processExecuteResult)
-
- mockSendingKernelMessages(deferredExecution, executeReplyOk, executeResult)
-
- whenReady(executeResultPromise.future) {
- case _ =>
- executeReplyPromise.isCompleted should be(false)
- }
- }
- }
- describe("onSuccessfulCompletion( callback )") {
- it("should run all onSuccessfulCompletion callbacks on ExecuteReplyOk and ExecuteResult") {
- implicit val executeCompletePromise: Promise[Try[String]] = Promise()
- val deferredExecution: DeferredExecution = DeferredExecution()
- .onSuccess(processOnSuccessfulCompletion)
-
- mockSendingKernelMessages(deferredExecution, executeReplyOk, executeResult)
-
- whenReady(executeCompletePromise.future) {
- case Success(s) => // Nothing to do for the successful case
- case Failure(exception: Throwable) =>
- fail("Promised resolved with failure while trying to " +
- "process execute result.", exception)
- case unknownValue=>
- fail(s"Promised resolved with unknown value: ${unknownValue}")
- }
- }
- it("should run all onSuccessfulCompletion callbacks on ExecuteReplyOk and None") {
- implicit val executeCompletePromise: Promise[Try[String]] = Promise()
- val deferredExecution: DeferredExecution = DeferredExecution()
- .onSuccess(processOnSuccessfulCompletion)
-
- mockSendingKernelMessages(deferredExecution, executeReplyOk, None)
-
- whenReady(executeCompletePromise.future) {
- case Success(s) => // Nothing to do for the successful case
- case Failure(exception: Throwable) =>
- fail("Promised resolved with failure while trying to " +
- "process execute result.", exception)
- case unknownValue=>
- fail(s"Promised resolved with unknown value: ${unknownValue}")
- }
- }
- it("should not run onSuccessfulCompletion callbacks on ExecuteReplyError") {
- implicit val executeCompletePromise: Promise[Try[String]] = Promise()
- // This promise and callback should not be executed by the deferred
- implicit val executeReplyPromise: Promise[Try[ExecuteReplyPromise]] = Promise()
-
- val deferredExecution: DeferredExecution = DeferredExecution()
- .onError(processExecuteReply)
- .onSuccess(processOnSuccessfulCompletion)
-
- mockSendingKernelMessages(deferredExecution, executeReplyError, executeResult)
-
- whenReady(executeReplyPromise.future) {
- case _ =>
- executeCompletePromise.isCompleted should be(false)
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/HeartbeatClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/HeartbeatClientSpec.scala b/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/HeartbeatClientSpec.scala
deleted file mode 100644
index 9fdd702..0000000
--- a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/HeartbeatClientSpec.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.kernel.protocol.v5.client.socket
-
-import akka.actor.{ActorRef, ActorSystem, Props}
-import akka.testkit.{TestProbe, ImplicitSender, TestKit}
-import com.ibm.spark.communication.ZMQMessage
-import com.ibm.spark.kernel.protocol.v5.client.ActorLoader
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, FunSpecLike}
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-
-class HeartbeatClientSpec extends TestKit(ActorSystem("HeartbeatActorSpec"))
- with ImplicitSender with FunSpecLike with Matchers with MockitoSugar {
-
- describe("HeartbeatClientActor") {
- val socketFactory = mock[SocketFactory]
- val mockActorLoader = mock[ActorLoader]
- val probe : TestProbe = TestProbe()
- when(socketFactory.HeartbeatClient(any(classOf[ActorSystem]), any(classOf[ActorRef]))).thenReturn(probe.ref)
-
- val heartbeatClient = system.actorOf(Props(
- classOf[HeartbeatClient], socketFactory, mockActorLoader, true
- ))
-
- describe("send heartbeat") {
- it("should send ping ZMQMessage") {
- heartbeatClient ! HeartbeatMessage
- probe.expectMsgClass(classOf[ZMQMessage])
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/IOPubClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/IOPubClientSpec.scala b/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/IOPubClientSpec.scala
deleted file mode 100644
index b592dcd..0000000
--- a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/IOPubClientSpec.scala
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.kernel.protocol.v5.client.socket
-
-import java.util.UUID
-
-import akka.actor.{ActorRef, ActorSystem, Props}
-import akka.pattern.ask
-import akka.testkit.{ImplicitSender, TestKit, TestProbe}
-import akka.util.Timeout
-import com.ibm.spark.comm.{CommCallbacks, CommRegistrar, CommStorage, CommWriter}
-import com.ibm.spark.communication.ZMQMessage
-import com.ibm.spark.kernel.protocol.v5
-import com.ibm.spark.kernel.protocol.v5._
-import com.ibm.spark.kernel.protocol.v5.client.Utilities._
-import com.ibm.spark.kernel.protocol.v5.client.execution.{DeferredExecution, DeferredExecutionManager}
-import com.ibm.spark.kernel.protocol.v5.client.{ActorLoader, Utilities}
-import com.ibm.spark.kernel.protocol.v5.content.{CommClose, CommMsg, CommOpen, StreamContent}
-import com.typesafe.config.ConfigFactory
-import org.mockito.Matchers.{eq => mockEq, _}
-import org.mockito.Mockito._
-import org.scalatest.concurrent.{Eventually, ScalaFutures}
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.time.{Milliseconds, Span}
-import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
-import play.api.libs.json.Json
-
-import scala.concurrent.duration._
-import scala.concurrent.{Future, Promise}
-import scala.util.Failure
-
-object IOPubClientSpec {
- val config ="""
- akka {
- loglevel = "WARNING"
- }"""
-}
-
-class IOPubClientSpec extends TestKit(ActorSystem(
- "IOPubClientSpecSystem", ConfigFactory.parseString(IOPubClientSpec.config)
-)) with ImplicitSender with FunSpecLike with Matchers with MockitoSugar
- with ScalaFutures with BeforeAndAfter with Eventually
-{
- private val TestTimeout = Timeout(10.seconds)
- implicit override val patienceConfig = PatienceConfig(
- timeout = scaled(Span(200, Milliseconds)),
- interval = scaled(Span(5, Milliseconds))
- )
- private val SignatureEnabled = true
-
- private var clientSocketProbe: TestProbe = _
- private var mockClientSocketFactory: SocketFactory = _
- private var mockActorLoader: ActorLoader = _
- private var mockCommRegistrar: CommRegistrar = _
- private var spyCommStorage: CommStorage = _
- private var mockCommCallbacks: CommCallbacks = _
- private var ioPubClient: ActorRef = _
-
- private var kmBuilder: KMBuilder = _
-
- private val id = UUID.randomUUID().toString
- private val TestTargetName = "some target"
- private val TestCommId = UUID.randomUUID().toString
-
- before {
- kmBuilder = KMBuilder()
- mockCommCallbacks = mock[CommCallbacks]
- mockCommRegistrar = mock[CommRegistrar]
-
- spyCommStorage = spy(new CommStorage())
-
- clientSocketProbe = TestProbe()
- mockActorLoader = mock[ActorLoader]
- mockClientSocketFactory = mock[SocketFactory]
-
- // Stub the return value for the socket factory
- when(mockClientSocketFactory.IOPubClient(anyObject(), any[ActorRef]))
- .thenReturn(clientSocketProbe.ref)
-
- // Construct the object we will test against
- ioPubClient = system.actorOf(Props(
- classOf[IOPubClient], mockClientSocketFactory, mockActorLoader,
- SignatureEnabled, mockCommRegistrar, spyCommStorage
- ))
- }
-
- describe("IOPubClient") {
- describe("#receive") {
- it("should execute all Comm open callbacks on comm_open message") {
- val message: ZMQMessage = kmBuilder
- .withHeader(CommOpen.toTypeString)
- .withContentString(CommOpen(TestCommId, TestTargetName, v5.MsgData.Empty))
- .build
-
- // Mark as target being provided
- doReturn(Some(mockCommCallbacks)).when(spyCommStorage)
- .getTargetCallbacks(anyString())
-
- // Simulate receiving a message from the kernel
- ioPubClient ! message
-
- // Check to see if "eventually" the callback is triggered
- eventually {
- verify(mockCommCallbacks).executeOpenCallbacks(
- any[CommWriter], mockEq(TestCommId),
- mockEq(TestTargetName), any[v5.MsgData])
- }
- }
-
- it("should not execute Comm open callbacks if the target is not found") {
- val message: ZMQMessage = kmBuilder
- .withHeader(CommOpen.toTypeString)
- .withContentString(CommOpen(TestCommId, TestTargetName, v5.MsgData.Empty))
- .build
-
- // Mark as target NOT being provided
- doReturn(None).when(spyCommStorage).getTargetCallbacks(anyString())
-
- // Simulate receiving a message from the kernel
- ioPubClient ! message
-
- // Check to see if "eventually" the callback is NOT triggered
- eventually {
- // Check that we have checked if the target exists
- verify(spyCommStorage).getTargetCallbacks(TestTargetName)
-
- verify(mockCommCallbacks, never()).executeOpenCallbacks(
- any[CommWriter], mockEq(TestCommId),
- mockEq(TestTargetName), any[v5.MsgData])
- verify(mockCommRegistrar, never()).link(TestTargetName, TestCommId)
- }
- }
-
- it("should execute all Comm msg callbacks on comm_msg message") {
- val message: ZMQMessage = kmBuilder
- .withHeader(CommMsg.toTypeString)
- .withContentString(CommMsg(TestCommId, v5.MsgData.Empty))
- .build
-
- // Mark as target being provided
- doReturn(Some(mockCommCallbacks)).when(spyCommStorage)
- .getCommIdCallbacks(any[v5.UUID])
-
- // Simulate receiving a message from the kernel
- ioPubClient ! message
-
- // Check to see if "eventually" the callback is triggered
- eventually {
- verify(mockCommCallbacks).executeMsgCallbacks(
- any[CommWriter], mockEq(TestCommId), any[v5.MsgData])
- }
- }
-
- it("should not execute Comm msg callbacks if the Comm id is not found") {
- val message: ZMQMessage = kmBuilder
- .withHeader(CommMsg.toTypeString)
- .withContentString(CommMsg(TestCommId, v5.MsgData.Empty))
- .build
-
- // Mark as target NOT being provided
- doReturn(None).when(spyCommStorage).getCommIdCallbacks(any[v5.UUID])
-
- // Simulate receiving a message from the kernel
- ioPubClient ! message
-
- // Check to see if "eventually" the callback is NOT triggered
- eventually {
- // Check that we have checked if the target exists
- verify(spyCommStorage).getCommIdCallbacks(TestCommId)
-
- verify(mockCommCallbacks, never()).executeMsgCallbacks(
- any[CommWriter], mockEq(TestCommId), any[v5.MsgData])
- }
- }
-
- it("should execute all Comm close callbacks on comm_close message") {
- val message: ZMQMessage = kmBuilder
- .withHeader(CommClose.toTypeString)
- .withContentString(CommClose(TestCommId, v5.MsgData.Empty))
- .build
-
- // Mark as target being provided
- doReturn(Some(mockCommCallbacks)).when(spyCommStorage)
- .getCommIdCallbacks(any[v5.UUID])
-
- // Simulate receiving a message from the kernel
- ioPubClient ! message
-
- // Check to see if "eventually" the callback is triggered
- eventually {
- verify(mockCommCallbacks).executeCloseCallbacks(
- any[CommWriter], mockEq(TestCommId), any[v5.MsgData])
- }
- }
-
- it("should not execute Comm close callbacks if Comm id is not found") {
- val message: ZMQMessage = kmBuilder
- .withHeader(CommClose.toTypeString)
- .withContentString(CommClose(TestCommId, v5.MsgData.Empty))
- .build
-
- // Mark as target NOT being provided
- doReturn(None).when(spyCommStorage).getCommIdCallbacks(any[v5.UUID])
-
- // Simulate receiving a message from the kernel
- ioPubClient ! message
-
- // Check to see if "eventually" the callback is NOT triggered
- eventually {
- // Check that we have checked if the target exists
- verify(spyCommStorage).getCommIdCallbacks(TestCommId)
-
- verify(mockCommCallbacks, never()).executeCloseCallbacks(
- any[CommWriter], mockEq(TestCommId), any[v5.MsgData])
- }
- }
-
- it("should call a registered callback on stream message") {
- val result = StreamContent("foo", "bar")
- val header = Header(id, "spark", id,
- MessageType.Outgoing.Stream.toString, "5.0")
- val parentHeader = Header(id, "spark", id,
- MessageType.Incoming.ExecuteRequest.toString, "5.0")
-
- val kernelMessage = new KernelMessage(
- Seq[String](),
- "",
- header,
- parentHeader,
- Metadata(),
- Json.toJson(result).toString()
- )
- val promise: Promise[String] = Promise()
- val de: DeferredExecution = DeferredExecution().onStream(
- (content: StreamContent) => {
- promise.success(content.text)
- }
- )
- DeferredExecutionManager.add(id, de)
- // Send the message to the IOPubClient
- val zmqMessage: ZMQMessage = kernelMessage
-
- ioPubClient ! zmqMessage
-
- whenReady(promise.future) {
- case res: String =>
- res should be eq("bar")
- case _ =>
- fail(s"Received failure when asking IOPubClient")
- }
- }
-
- it("should not invoke callback when stream message's parent header is null") {
- // Construct the kernel message
- val result = StreamContent("foo", "bar")
- val header = Header(id, "spark", id,
- MessageType.Outgoing.Stream.toString, "5.0")
-
- val kernelMessage = new KernelMessage(
- Seq[String](),
- "",
- header,
- null,
- Metadata(),
- Json.toJson(result).toString()
- )
-
- // Send the message to the IOPubClient
- val zmqMessage: ZMQMessage = kernelMessage
- val futureResult: Future[Any] = ioPubClient.ask(zmqMessage)(TestTimeout)
- whenReady(futureResult) {
- case result: Failure[Any] =>
- // Getting the value of the failure will cause the underlying exception will be thrown
- try {
- result.get
- } catch {
- case t:RuntimeException =>
- t.getMessage should be("Parent Header was null in Kernel Message.")
- }
- case result =>
- fail(s"Did not receive failure!! ${result}")
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/ShellClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/ShellClientSpec.scala b/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/ShellClientSpec.scala
deleted file mode 100644
index 0110dfd..0000000
--- a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/ShellClientSpec.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.kernel.protocol.v5.client.socket
-
-import java.util.UUID
-
-import akka.actor.{ActorRef, ActorSystem, Props}
-import akka.testkit.{TestProbe, ImplicitSender, TestKit}
-import com.ibm.spark.communication.ZMQMessage
-import com.ibm.spark.communication.security.SecurityActorType
-import com.ibm.spark.kernel.protocol.v5._
-import com.ibm.spark.kernel.protocol.v5.client.ActorLoader
-import com.ibm.spark.kernel.protocol.v5.content.ExecuteRequest
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, FunSpecLike}
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-import play.api.libs.json.Json
-
-class ShellClientSpec extends TestKit(ActorSystem("ShellActorSpec"))
- with ImplicitSender with FunSpecLike with Matchers with MockitoSugar {
- private val SignatureEnabled = true
-
- describe("ShellClientActor") {
- val socketFactory = mock[SocketFactory]
- val mockActorLoader = mock[ActorLoader]
- val probe : TestProbe = TestProbe()
- when(socketFactory.ShellClient(
- any(classOf[ActorSystem]), any(classOf[ActorRef])
- )).thenReturn(probe.ref)
-
- val signatureManagerProbe = TestProbe()
- doReturn(system.actorSelection(signatureManagerProbe.ref.path.toString))
- .when(mockActorLoader).load(SecurityActorType.SignatureManager)
-
- val shellClient = system.actorOf(Props(
- classOf[ShellClient], socketFactory, mockActorLoader, SignatureEnabled
- ))
-
- describe("send execute request") {
- it("should send execute request") {
- val request = ExecuteRequest(
- "foo", false, true, UserExpressions(), true
- )
- val header = Header(
- UUID.randomUUID().toString, "spark",
- UUID.randomUUID().toString, MessageType.Incoming.ExecuteRequest.toString,
- "5.0"
- )
- val kernelMessage = KernelMessage(
- Seq[String](), "",
- header, HeaderBuilder.empty,
- Metadata(), Json.toJson(request).toString
- )
- shellClient ! kernelMessage
-
- // Echo back the kernel message sent to have a signature injected
- signatureManagerProbe.expectMsgClass(classOf[KernelMessage])
- signatureManagerProbe.reply(kernelMessage)
-
- probe.expectMsgClass(classOf[ZMQMessage])
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/StdinClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/StdinClientSpec.scala b/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/StdinClientSpec.scala
deleted file mode 100644
index 877c4f5..0000000
--- a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/StdinClientSpec.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.kernel.protocol.v5.client.socket
-
-import akka.actor.{ActorRef, Props, ActorSystem}
-import akka.testkit.{TestProbe, ImplicitSender, TestKit}
-import com.ibm.spark.communication.ZMQMessage
-import com.ibm.spark.communication.security.SecurityActorType
-import com.ibm.spark.kernel.protocol.v5._
-import com.ibm.spark.kernel.protocol.v5.client.ActorLoader
-import com.ibm.spark.kernel.protocol.v5.client.socket.StdinClient.ResponseFunction
-import com.ibm.spark.kernel.protocol.v5.content.{InputReply, InputRequest, ClearOutput, ExecuteRequest}
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
-import com.ibm.spark.kernel.protocol.v5.client.Utilities._
-import play.api.libs.json.Json
-import scala.concurrent.duration._
-
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-
-class StdinClientSpec extends TestKit(ActorSystem("StdinActorSpec"))
- with ImplicitSender with FunSpecLike with Matchers with MockitoSugar
- with BeforeAndAfter
-{
- private val SignatureEnabled = true
- private val TestReplyString = "some value"
- private val TestResponseFunc: ResponseFunction = (_, _) => TestReplyString
-
- private var mockSocketFactory: SocketFactory = _
- private var mockActorLoader: ActorLoader = _
- private var signatureManagerProbe: TestProbe = _
- private var socketProbe: TestProbe = _
- private var stdinClient: ActorRef = _
-
- before {
- socketProbe = TestProbe()
- signatureManagerProbe = TestProbe()
- mockSocketFactory = mock[SocketFactory]
- mockActorLoader = mock[ActorLoader]
- doReturn(system.actorSelection(signatureManagerProbe.ref.path.toString))
- .when(mockActorLoader).load(SecurityActorType.SignatureManager)
- doReturn(socketProbe.ref).when(mockSocketFactory)
- .StdinClient(any[ActorSystem], any[ActorRef])
-
- stdinClient = system.actorOf(Props(
- classOf[StdinClient], mockSocketFactory, mockActorLoader, SignatureEnabled
- ))
-
- // Set the response function for our client socket
- stdinClient ! TestResponseFunc
- }
-
- describe("StdinClient") {
- describe("#receive") {
- it("should update the response function if receiving a new one") {
- val expected = "some other value"
- val replacementFunc: ResponseFunction = (_, _) => expected
-
- // Update the function
- stdinClient ! replacementFunc
-
- val inputRequestMessage: ZMQMessage = KMBuilder()
- .withHeader(InputRequest.toTypeString)
- .withContentString(InputRequest("", false))
- .build
-
- stdinClient ! inputRequestMessage
-
- // Echo back the kernel message sent to have a signature injected
- signatureManagerProbe.expectMsgPF() {
- case kernelMessage: KernelMessage =>
- signatureManagerProbe.reply(kernelMessage)
- true
- }
-
- socketProbe.expectMsgPF() {
- case zmqMessage: ZMQMessage =>
- val kernelMessage: KernelMessage = zmqMessage
- val inputReply =
- Json.parse(kernelMessage.contentString).as[InputReply]
- inputReply.value should be (expected)
- }
- }
-
- it("should do nothing if the incoming message is not an input_request") {
- val notInputRequestMessage: ZMQMessage = KMBuilder()
- .withHeader(ClearOutput.toTypeString)
- .build
-
- stdinClient ! notInputRequestMessage
-
- socketProbe.expectNoMsg(300.milliseconds)
- }
-
- it("should respond with an input_reply if the incoming message is " +
- "an input_request") {
- val inputRequestMessage: ZMQMessage = KMBuilder()
- .withHeader(InputRequest.toTypeString)
- .withContentString(InputRequest("", false))
- .build
-
- stdinClient ! inputRequestMessage
-
- // Echo back the kernel message sent to have a signature injected
- signatureManagerProbe.expectMsgPF() {
- case kernelMessage: KernelMessage =>
- signatureManagerProbe.reply(kernelMessage)
- true
- }
-
- socketProbe.expectMsgPF() {
- case zmqMessage: ZMQMessage =>
- val kernelMessage: KernelMessage = zmqMessage
- val messageType = kernelMessage.header.msg_type
- messageType should be (InputReply.toTypeString)
- }
- }
-
- it("should use the result from the response function if the incoming " +
- "message is an input_request") {
- val inputRequestMessage: ZMQMessage = KMBuilder()
- .withHeader(InputRequest.toTypeString)
- .withContentString(InputRequest("", false))
- .build
-
- stdinClient ! inputRequestMessage
-
- // Echo back the kernel message sent to have a signature injected
- signatureManagerProbe.expectMsgPF() {
- case kernelMessage: KernelMessage =>
- signatureManagerProbe.reply(kernelMessage)
- true
- }
-
- socketProbe.expectMsgPF() {
- case zmqMessage: ZMQMessage =>
- val kernelMessage: KernelMessage = zmqMessage
- val inputReply =
- Json.parse(kernelMessage.contentString).as[InputReply]
- inputReply.value should be (TestReplyString)
- }
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/org/apache/toree/comm/ClientCommManagerSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/org/apache/toree/comm/ClientCommManagerSpec.scala b/client/src/test/scala/org/apache/toree/comm/ClientCommManagerSpec.scala
new file mode 100644
index 0000000..85ef4aa
--- /dev/null
+++ b/client/src/test/scala/org/apache/toree/comm/ClientCommManagerSpec.scala
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.comm
+
+import com.ibm.spark.kernel.protocol.v5
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.kernel.protocol.v5.client.ActorLoader
+import com.ibm.spark.kernel.protocol.v5.content.CommContent
+import org.scalatest.mock.MockitoSugar
+import org.mockito.Mockito._
+import org.mockito.Matchers._
+import org.scalatest.{BeforeAndAfter, FunSpec, Matchers}
+
+class ClientCommManagerSpec extends FunSpec with Matchers with BeforeAndAfter
+ with MockitoSugar
+{
+ private val TestTargetName = "some target"
+
+ private var mockActorLoader: ActorLoader = _
+ private var mockKMBuilder: KMBuilder = _
+ private var mockCommRegistrar: CommRegistrar = _
+ private var clientCommManager: ClientCommManager = _
+
+ private var generatedCommWriter: CommWriter = _
+
+ before {
+ mockActorLoader = mock[ActorLoader]
+ mockKMBuilder = mock[KMBuilder]
+ mockCommRegistrar = mock[CommRegistrar]
+
+ clientCommManager = new ClientCommManager(
+ mockActorLoader,
+ mockKMBuilder,
+ mockCommRegistrar
+ ) {
+ override protected def newCommWriter(commId: UUID): CommWriter = {
+ val commWriter = super.newCommWriter(commId)
+
+ generatedCommWriter = commWriter
+
+ val spyCommWriter = spy(commWriter)
+ doNothing().when(spyCommWriter)
+ .sendCommKernelMessage(any[KernelMessageContent with CommContent])
+
+ spyCommWriter
+ }
+ }
+ }
+
+ describe("ClientCommManager") {
+ describe("#open") {
+ it("should return a wrapped instance of ClientCommWriter") {
+ clientCommManager.open(TestTargetName, v5.MsgData.Empty)
+
+ // Exposed hackishly for testing
+ generatedCommWriter shouldBe a [ClientCommWriter]
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/org/apache/toree/comm/ClientCommWriterSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/org/apache/toree/comm/ClientCommWriterSpec.scala b/client/src/test/scala/org/apache/toree/comm/ClientCommWriterSpec.scala
new file mode 100644
index 0000000..f5a4b43
--- /dev/null
+++ b/client/src/test/scala/org/apache/toree/comm/ClientCommWriterSpec.scala
@@ -0,0 +1,272 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.comm
+
+import java.util.UUID
+
+import akka.actor.{ActorSelection, ActorSystem}
+import akka.testkit.{TestKit, TestProbe}
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.kernel.protocol.v5.client.ActorLoader
+import com.ibm.spark.kernel.protocol.v5.content._
+import com.typesafe.config.ConfigFactory
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
+import play.api.libs.json.Json
+
+import scala.concurrent.duration._
+
+object ClientCommWriterSpec {
+ val config ="""
+ akka {
+ loglevel = "WARNING"
+ }"""
+}
+
+class ClientCommWriterSpec extends TestKit(
+ ActorSystem("ClientCommWriterSpec",
+ ConfigFactory.parseString(ClientCommWriterSpec.config))
+) with FunSpecLike with Matchers with BeforeAndAfter with MockitoSugar
+{
+
+ private val commId = UUID.randomUUID().toString
+ private var clientCommWriter: ClientCommWriter = _
+ private var kernelMessageBuilder: KMBuilder = _
+
+ private var actorLoader: ActorLoader = _
+ private var shellSocketProbe: TestProbe = _
+
+ /**
+ * Retrieves the next message available.
+ *
+ * @return The KernelMessage instance (or an error if timed out)
+ */
+ private def getNextMessage =
+ shellSocketProbe.receiveOne(200.milliseconds)
+ .asInstanceOf[KernelMessage]
+
+ /**
+ * Retrieves the next message available and returns its type.
+ *
+ * @return The type of the message (pulled from message header)
+ */
+ private def getNextMessageType = getNextMessage.header.msg_type
+
+ /**
+ * Retrieves the next message available and parses the content string.
+ *
+ * @tparam T The type to coerce the content string into
+ *
+ * @return The resulting KernelMessageContent instance
+ */
+ private def getNextMessageContents[T <: KernelMessageContent]
+ (implicit fjs: play.api.libs.json.Reads[T], mf: Manifest[T]) =
+ {
+ val receivedMessage = getNextMessage
+
+ Json.parse(receivedMessage.contentString).as[T]
+ }
+
+ before {
+ kernelMessageBuilder = spy(KMBuilder())
+
+ // Construct path for kernel message relay
+ actorLoader = mock[ActorLoader]
+ shellSocketProbe = TestProbe()
+ val shellSocketSelection: ActorSelection =
+ system.actorSelection(shellSocketProbe.ref.path.toString)
+ doReturn(shellSocketSelection)
+ .when(actorLoader).load(SocketType.ShellClient)
+
+ // Create a new writer to use for testing
+ clientCommWriter =
+ new ClientCommWriter(actorLoader, kernelMessageBuilder, commId)
+ }
+
+ describe("ClientCommWriter") {
+ describe("#writeOpen") {
+ it("should send a comm_open message to the relay") {
+ clientCommWriter.writeOpen(anyString())
+
+ getNextMessageType should be (CommOpen.toTypeString)
+ }
+
+ it("should include the comm_id in the message") {
+ val expected = commId
+ clientCommWriter.writeOpen(anyString())
+
+ val actual = getNextMessageContents[CommOpen].comm_id
+
+ actual should be (expected)
+ }
+
+ it("should include the target name in the message") {
+ val expected = "<TARGET_NAME>"
+ clientCommWriter.writeOpen(expected)
+
+ val actual = getNextMessageContents[CommOpen].target_name
+
+ actual should be (expected)
+ }
+
+ it("should provide empty data in the message if no data is provided") {
+ val expected = MsgData.Empty
+ clientCommWriter.writeOpen(anyString())
+
+ val actual = getNextMessageContents[CommOpen].data
+
+ actual should be (expected)
+ }
+
+ it("should include the data in the message") {
+ val expected = MsgData("some key" -> "some value")
+ clientCommWriter.writeOpen(anyString(), expected)
+
+ val actual = getNextMessageContents[CommOpen].data
+
+ actual should be (expected)
+ }
+ }
+
+ describe("#writeMsg") {
+ it("should send a comm_msg message to the relay") {
+ clientCommWriter.writeMsg(MsgData.Empty)
+
+ getNextMessageType should be (CommMsg.toTypeString)
+ }
+
+ it("should include the comm_id in the message") {
+ val expected = commId
+ clientCommWriter.writeMsg(MsgData.Empty)
+
+ val actual = getNextMessageContents[CommMsg].comm_id
+
+ actual should be (expected)
+ }
+
+ it("should fail a require if the data is null") {
+ intercept[IllegalArgumentException] {
+ clientCommWriter.writeMsg(null)
+ }
+ }
+
+ it("should include the data in the message") {
+ val expected = MsgData("some key" -> "some value")
+ clientCommWriter.writeMsg(expected)
+
+ val actual = getNextMessageContents[CommMsg].data
+
+ actual should be (expected)
+ }
+ }
+
+ describe("#writeClose") {
+ it("should send a comm_close message to the relay") {
+ clientCommWriter.writeClose()
+
+ getNextMessageType should be (CommClose.toTypeString)
+ }
+
+ it("should include the comm_id in the message") {
+ val expected = commId
+ clientCommWriter.writeClose()
+
+ val actual = getNextMessageContents[CommClose].comm_id
+
+ actual should be (expected)
+ }
+
+ it("should provide empty data in the message if no data is provided") {
+ val expected = MsgData.Empty
+ clientCommWriter.writeClose()
+
+ val actual = getNextMessageContents[CommClose].data
+
+ actual should be (expected)
+ }
+
+ it("should include the data in the message") {
+ val expected = MsgData("some key" -> "some value")
+ clientCommWriter.writeClose(expected)
+
+ val actual = getNextMessageContents[CommClose].data
+
+ actual should be (expected)
+ }
+ }
+
+ describe("#write") {
+ it("should send a comm_msg message to the relay") {
+ clientCommWriter.write(Array('a'), 0, 1)
+
+ getNextMessageType should be (CommMsg.toTypeString)
+ }
+
+ it("should include the comm_id in the message") {
+ val expected = commId
+ clientCommWriter.write(Array('a'), 0, 1)
+
+ val actual = getNextMessageContents[CommMsg].comm_id
+
+ actual should be (expected)
+ }
+
+ it("should package the string as part of the data with a 'message' key") {
+ val expected = MsgData("message" -> "a")
+ clientCommWriter.write(Array('a'), 0, 1)
+
+ val actual = getNextMessageContents[CommMsg].data
+
+ actual should be (expected)
+ }
+ }
+
+ describe("#flush") {
+ it("should do nothing") {
+ // TODO: Is this test necessary? It does nothing.
+ clientCommWriter.flush()
+ }
+ }
+
+ describe("#close") {
+ it("should send a comm_close message to the relay") {
+ clientCommWriter.close()
+
+ getNextMessageType should be (CommClose.toTypeString)
+ }
+
+ it("should include the comm_id in the message") {
+ val expected = commId
+ clientCommWriter.close()
+
+ val actual = getNextMessageContents[CommClose].comm_id
+
+ actual should be (expected)
+ }
+
+ it("should provide empty data in the message") {
+ val expected = MsgData.Empty
+ clientCommWriter.close()
+
+ val actual = getNextMessageContents[CommClose].data
+
+ actual should be (expected)
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/SparkKernelClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/SparkKernelClientSpec.scala b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/SparkKernelClientSpec.scala
new file mode 100644
index 0000000..8db29f8
--- /dev/null
+++ b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/SparkKernelClientSpec.scala
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.client
+
+import akka.actor.ActorSystem
+import akka.testkit.{TestKit, TestProbe}
+import com.ibm.spark.comm.{CommCallbacks, CommStorage, CommRegistrar}
+import com.ibm.spark.kernel.protocol.v5
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.kernel.protocol.v5.client.execution.ExecuteRequestTuple
+import scala.concurrent.duration._
+import org.mockito.Mockito._
+import org.mockito.Matchers.{eq => mockEq, _}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
+
+class SparkKernelClientSpec
+ extends TestKit(ActorSystem("SparkKernelClientActorSystem"))
+ with Matchers with MockitoSugar with FunSpecLike with BeforeAndAfter
+{
+ private val TestTargetName = "some target"
+
+ private var mockActorLoader: ActorLoader = _
+ private var mockCommRegistrar: CommRegistrar = _
+ private var sparkKernelClient: SparkKernelClient = _
+ private var executeRequestProbe: TestProbe = _
+ private var shellClientProbe: TestProbe = _
+
+ before {
+ mockActorLoader = mock[ActorLoader]
+ mockCommRegistrar = mock[CommRegistrar]
+
+ executeRequestProbe = TestProbe()
+ when(mockActorLoader.load(MessageType.Incoming.ExecuteRequest))
+ .thenReturn(system.actorSelection(executeRequestProbe.ref.path.toString))
+
+ shellClientProbe = TestProbe()
+ when(mockActorLoader.load(SocketType.ShellClient))
+ .thenReturn(system.actorSelection(shellClientProbe.ref.path.toString))
+
+ sparkKernelClient = new SparkKernelClient(
+ mockActorLoader, system, mockCommRegistrar)
+ }
+
+ describe("SparkKernelClient") {
+ describe("#execute") {
+ it("should send an ExecuteRequest message") {
+ val func = (x: Any) => println(x)
+ sparkKernelClient.execute("val foo = 2")
+ executeRequestProbe.expectMsgClass(classOf[ExecuteRequestTuple])
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/execution/DeferredExecutionTest.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/execution/DeferredExecutionTest.scala b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/execution/DeferredExecutionTest.scala
new file mode 100644
index 0000000..db3205c
--- /dev/null
+++ b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/execution/DeferredExecutionTest.scala
@@ -0,0 +1,235 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.client.execution
+
+import com.ibm.spark.kernel.protocol.v5.content.{StreamContent, ExecuteResult}
+import com.ibm.spark.kernel.protocol.v5.content._
+import com.ibm.spark.kernel.protocol.v5._
+import org.scalatest.{Matchers, FunSpec}
+import org.scalatest.concurrent.ScalaFutures
+
+import scala.concurrent.Promise
+import scala.util.{Try, Failure, Success}
+
+object DeferredExecutionTest {
+ val executeReplyError = Option(ExecuteReplyError(1, None, None, None))
+ val executeReplyOk = Option(ExecuteReplyOk(1, None, None))
+ val executeResult = Option(ExecuteResult(1, Data(), Metadata()))
+
+ def mockSendingKernelMessages(deferredExecution: DeferredExecution,
+ executeReplyOption: Option[ExecuteReply],
+ executeResultOption: Option[ExecuteResult]): Unit = {
+ // Mock behaviour of the kernel sending messages to us
+ executeResultOption.map {
+ executeResult =>
+ deferredExecution.resolveResult(executeResult)
+ }
+ executeReplyOption.map {
+ executeReply=>
+ deferredExecution.resolveReply(executeReply)
+ }
+ }
+
+ def processExecuteResult(executeResult: ExecuteResult)
+ (implicit promise: Promise[Try[ExecuteResultPromise]]): Unit = {
+ promise.success(Success(new ExecuteResultPromise))
+ }
+
+ def processExecuteReply(executeReplyError: ExecuteReplyError)
+ (implicit promise: Promise[Try[ExecuteReplyPromise]]): Unit = {
+ promise.success(Success(new ExecuteReplyPromise))
+ }
+
+ def processOnSuccessfulCompletion(executeReplyOk: ExecuteReplyOk)
+ (implicit promise: Promise[Try[String]]): Unit = {
+ promise.success(Success("Success"))
+ }
+}
+
+class ExecuteResultPromise {}
+class ExecuteReplyPromise {}
+
+class DeferredExecutionTest extends FunSpec with ScalaFutures with Matchers {
+ import DeferredExecutionTest._
+ describe("DeferredExecution") {
+ describe("onResult( callback )"){
+ it("should run all onResult callbacks when ExecuteResult and " +
+ "successful ExecuteReply are returned") {
+ implicit val executeResultPromise: Promise[Try[ExecuteResultPromise]] = Promise()
+ val deferredExecution: DeferredExecution = DeferredExecution()
+ .onResult(processExecuteResult)
+
+ mockSendingKernelMessages(deferredExecution, executeReplyOk, executeResult)
+
+ whenReady(executeResultPromise.future) {
+ case Success(v) => assert(true)
+ case Failure(exception: Throwable) =>
+ fail("Promise resolved with failure when processing " +
+ "execute result.", exception)
+ case unknownValue=>
+ fail(s"Promised resolved with unknown value: ${unknownValue}")
+ }
+ }
+ it("should run all onResultCallbacks registered after deferred has " +
+ "been resolved") {
+ val executeResultPromise: Promise[Int] = Promise()
+ var counter = 0
+ def processExecuteResult (executeResult: ExecuteResult) : Unit = {
+ counter = counter + 1
+ // Hack to allow callbacks to occur after meeting our assertion value
+ if(counter == 2) {
+ Thread.sleep(500)
+ executeResultPromise.success(counter)
+ }
+ }
+
+ val deferredExecution: DeferredExecution = DeferredExecution()
+ .onResult(processExecuteResult)
+
+ mockSendingKernelMessages(deferredExecution, executeReplyOk, executeResult)
+ // register callback after code execution has completed
+ deferredExecution.onResult(processExecuteResult)
+
+ whenReady(executeResultPromise.future){ _ => counter should be(2) }
+
+ }
+ it("should not run onResult callbacks when ExecuteReply is a failure") {
+ // This promise should be resolved by the deferred
+ implicit val executeReplyPromise: Promise[Try[ExecuteReplyPromise]] = Promise()
+ // This promise should not be resolved by the deferred
+ implicit val executeResultPromise: Promise[Try[ExecuteResultPromise]] = Promise()
+
+ val deferredExecution: DeferredExecution = DeferredExecution()
+ .onError(processExecuteReply)
+ .onResult(processExecuteResult)
+ mockSendingKernelMessages(deferredExecution, executeReplyError, executeResult)
+
+ whenReady(executeReplyPromise.future) { _ =>
+ executeResultPromise.isCompleted should be(false)
+ }
+ }
+ }
+ describe("onStream( callback )"){
+ it("should execute all streaming callbacks") {
+ var counter = 0
+ val streamingResultPromise: Promise[Int] = Promise()
+ def processStreamContent (streamContent: StreamContent) : Unit = {
+ counter = counter + 1
+ if (counter == 4)
+ streamingResultPromise.success(counter)
+ }
+
+ val deferredExecution: DeferredExecution = DeferredExecution()
+ .onStream(processStreamContent)
+ .onStream(processStreamContent)
+
+ deferredExecution.emitStreamContent(StreamContent("stdout","msg"))
+ deferredExecution.emitStreamContent(StreamContent("stdout","msg2"))
+
+ whenReady(streamingResultPromise.future){ counterValue =>
+ counterValue should be(4)
+ }
+ }
+ }
+
+ describe("onError( callback )") {
+ it("should run all onError callbacks") {
+ implicit val executeReplyPromise: Promise[Try[ExecuteReplyPromise]] = Promise()
+ val deferredExecution: DeferredExecution = DeferredExecution()
+ .onError(processExecuteReply)
+
+ mockSendingKernelMessages(deferredExecution, executeReplyError, executeResult)
+
+ whenReady(executeReplyPromise.future) {
+ case Success(v) => assert(true)
+ case Failure(exception: Throwable) =>
+ fail("Promised resolved with failure while trying to " +
+ "process execute result.", exception)
+ case unknownValue=>
+ fail(s"Promised resolved with unknown value: ${unknownValue}")
+ }
+ }
+ it("should not run onError callbacks when ExecuteReply is a success") {
+ // This promise and callback should not be executed by the deferred
+ implicit val executeReplyPromise: Promise[Try[ExecuteReplyPromise]] = Promise()
+ // This promise and callback should be executed by the deferred
+ implicit val executeResultPromise: Promise[Try[ExecuteResultPromise]] = Promise()
+
+ val deferredExecution: DeferredExecution = DeferredExecution()
+ .onError(processExecuteReply)
+ .onResult(processExecuteResult)
+
+ mockSendingKernelMessages(deferredExecution, executeReplyOk, executeResult)
+
+ whenReady(executeResultPromise.future) {
+ case _ =>
+ executeReplyPromise.isCompleted should be(false)
+ }
+ }
+ }
+ describe("onSuccessfulCompletion( callback )") {
+ it("should run all onSuccessfulCompletion callbacks on ExecuteReplyOk and ExecuteResult") {
+ implicit val executeCompletePromise: Promise[Try[String]] = Promise()
+ val deferredExecution: DeferredExecution = DeferredExecution()
+ .onSuccess(processOnSuccessfulCompletion)
+
+ mockSendingKernelMessages(deferredExecution, executeReplyOk, executeResult)
+
+ whenReady(executeCompletePromise.future) {
+ case Success(s) => // Nothing to do for the successful case
+ case Failure(exception: Throwable) =>
+ fail("Promised resolved with failure while trying to " +
+ "process execute result.", exception)
+ case unknownValue=>
+ fail(s"Promised resolved with unknown value: ${unknownValue}")
+ }
+ }
+ it("should run all onSuccessfulCompletion callbacks on ExecuteReplyOk and None") {
+ implicit val executeCompletePromise: Promise[Try[String]] = Promise()
+ val deferredExecution: DeferredExecution = DeferredExecution()
+ .onSuccess(processOnSuccessfulCompletion)
+
+ mockSendingKernelMessages(deferredExecution, executeReplyOk, None)
+
+ whenReady(executeCompletePromise.future) {
+ case Success(s) => // Nothing to do for the successful case
+ case Failure(exception: Throwable) =>
+ fail("Promised resolved with failure while trying to " +
+ "process execute result.", exception)
+ case unknownValue=>
+ fail(s"Promised resolved with unknown value: ${unknownValue}")
+ }
+ }
+ it("should not run onSuccessfulCompletion callbacks on ExecuteReplyError") {
+ implicit val executeCompletePromise: Promise[Try[String]] = Promise()
+ // This promise and callback should not be executed by the deferred
+ implicit val executeReplyPromise: Promise[Try[ExecuteReplyPromise]] = Promise()
+
+ val deferredExecution: DeferredExecution = DeferredExecution()
+ .onError(processExecuteReply)
+ .onSuccess(processOnSuccessfulCompletion)
+
+ mockSendingKernelMessages(deferredExecution, executeReplyError, executeResult)
+
+ whenReady(executeReplyPromise.future) {
+ case _ =>
+ executeCompletePromise.isCompleted should be(false)
+ }
+ }
+ }
+ }
+}