You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lb...@apache.org on 2016/01/22 23:07:36 UTC

[28/51] [abbrv] incubator-toree git commit: Moved scala files to new locations based on new package

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketConfig.scala
----------------------------------------------------------------------
diff --git a/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketConfig.scala b/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketConfig.scala
new file mode 100644
index 0000000..6f3789d
--- /dev/null
+++ b/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketConfig.scala
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.client.socket
+
+import com.typesafe.config.Config
+import play.api.libs.json.Json
+
+case class SocketConfig (
+  stdin_port: Int,
+  control_port: Int,
+  hb_port: Int,
+  shell_port: Int,
+  iopub_port: Int,
+  ip : String,
+  transport: String,
+  signature_scheme: String,
+  key: String
+)
+
+object SocketConfig {
+  implicit val socketConfigReads = Json.reads[SocketConfig]
+  implicit val socketConfigWrites = Json.writes[SocketConfig]
+
+  def fromConfig(config: Config) = {
+    new SocketConfig(
+      config.getInt("stdin_port"),
+      config.getInt("control_port"),
+      config.getInt("hb_port"),
+      config.getInt("shell_port"),
+      config.getInt("iopub_port"),
+      config.getString("ip"),
+      config.getString("transport"),
+      config.getString("signature_scheme"),
+      config.getString("key")
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketConnection.scala
----------------------------------------------------------------------
diff --git a/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketConnection.scala b/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketConnection.scala
new file mode 100644
index 0000000..fae1d0e
--- /dev/null
+++ b/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketConnection.scala
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.client.socket
+
+object SocketConnection {
+  def apply(protocol: String, ip: String, port: Int) =
+    new SocketConnection(protocol, ip, port)
+}
+
+/**
+ * Represent a connection string for a socket
+ * @param protocol The protocol portion of the connection (e.g. tcp, akka, udp)
+ * @param ip The hostname or ip address to bind on (e.g. *, myhost, 127.0.0.1)
+ * @param port The port for the socket to listen on
+ */
+class SocketConnection(protocol: String, ip: String, port: Int) {
+  private val SocketConnectionString : String = "%s://%s:%d"
+
+  override def toString: String = {
+    SocketConnectionString.format(protocol, ip, port)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketFactory.scala
----------------------------------------------------------------------
diff --git a/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketFactory.scala b/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketFactory.scala
new file mode 100644
index 0000000..07b05d7
--- /dev/null
+++ b/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/SocketFactory.scala
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.client.socket
+
+import java.util.UUID
+
+import akka.actor.{Props, ActorRef, ActorSystem}
+import com.ibm.spark.communication.actors.{DealerSocketActor, ReqSocketActor, SubSocketActor}
+
+object SocketFactory {
+  def apply(socketConfig: SocketConfig) = {
+    new SocketFactory(socketConfig)
+  }
+}
+
+/**
+ * A Factor class to provide various socket connections for IPython Kernel Spec.
+ *
+ * @param socketConfig The configuration for the sockets to be properly
+ *                     instantiated
+ */
+class  SocketFactory(socketConfig: SocketConfig) {
+  /**
+   * Represents the identity shared between Shell and Stdin connections.
+   */
+  private val ZmqIdentity = UUID.randomUUID().toString
+
+  val HeartbeatConnection = SocketConnection(
+    socketConfig.transport, socketConfig.ip, socketConfig.hb_port)
+  val ShellConnection = SocketConnection(
+    socketConfig.transport, socketConfig.ip, socketConfig.shell_port)
+  val IOPubConnection = SocketConnection(
+    socketConfig.transport, socketConfig.ip, socketConfig.iopub_port)
+  val StdinConnection = SocketConnection(
+    socketConfig.transport, socketConfig.ip, socketConfig.stdin_port)
+
+  /**
+   * Creates a ZeroMQ request socket representing the client endpoint for
+   * heartbeat messages.
+   *
+   * @param system The actor system the socket actor will belong
+   * @param listener The actor who will receive
+   *
+   * @return The ActorRef created for the socket connection
+   */
+  def HeartbeatClient(system: ActorSystem, listener: ActorRef) : ActorRef = {
+    system.actorOf(Props(classOf[ReqSocketActor], HeartbeatConnection.toString, listener))
+//    ZeroMQExtension(system).newReqSocket(Array(
+//      Listener(listener), Connect(HeartbeatConnection.toString)
+//    ))
+  }
+
+  /**
+   * Creates a ZeroMQ request socket representing the client endpoint for shell
+   * messages. Generates an id for
+   * <a href="http://api.zeromq.org/2-1:zmq-setsockopt#toc6">
+   * Router/Dealer message routing</a>.
+   *
+   * @param system The actor system the socket actor will belong
+   * @param listener The actor who will receive
+   *
+   * @return The ActorRef created for the socket connection
+   */
+  def ShellClient(system: ActorSystem, listener: ActorRef) : ActorRef = {
+    system.actorOf(Props(classOf[DealerSocketActor], ShellConnection.toString, listener))
+    //socket.setIdentity(ZmqIdentity)
+//    ZeroMQExtension(system).newDealerSocket(Array(
+//      Listener(listener), Connect(ShellConnection.toString),
+//      Identity(ZmqIdentity)
+//    ))
+  }
+
+  /**
+   * Creates a ZeroMQ reply socket representing the client endpoint for stdin
+   * messages. Generates an id for
+   * <a href="http://api.zeromq.org/2-1:zmq-setsockopt#toc6">
+   * Router/Dealer message routing</a>.
+   *
+   * @param system The actor system the socket actor will belong
+   * @param listener The actor who will receive
+   *
+   * @return The ActorRef created for the socket connection
+   */
+  def StdinClient(system: ActorSystem, listener: ActorRef) : ActorRef = {
+    system.actorOf(Props(classOf[DealerSocketActor], StdinConnection.toString, listener))
+    //socket.setIdentity(ZmqIdentity)
+//    ZeroMQExtension(system).newDealerSocket(Array(
+//      Listener(listener), Connect(StdinConnection.toString),
+//      Identity(ZmqIdentity)
+//    ))
+  }
+
+  /**
+   * Creates a ZeroMQ request socket representing the client endpoint for IOPub
+   * messages.
+   *
+   * @param system The actor system the socket actor will belong
+   * @param listener The actor who will receive
+   *
+   * @return The ActorRef created for the socket connection
+   */
+  def IOPubClient(system: ActorSystem, listener: ActorRef) : ActorRef = {
+    system.actorOf(Props(classOf[SubSocketActor], IOPubConnection.toString, listener))
+    //socket.subscribe(ZMQ.SUBSCRIPTION_ALL)
+//    ZeroMQExtension(system).newSubSocket(Array(
+//      Listener(listener), Connect(IOPubConnection.toString), SubscribeAll
+//    ))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/StdinClient.scala
----------------------------------------------------------------------
diff --git a/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/StdinClient.scala b/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/StdinClient.scala
new file mode 100644
index 0000000..22c6071
--- /dev/null
+++ b/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/socket/StdinClient.scala
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.client.socket
+
+import akka.actor.Actor
+import com.ibm.spark.communication.ZMQMessage
+import com.ibm.spark.communication.security.SecurityActorType
+import com.ibm.spark.kernel.protocol.v5.client.ActorLoader
+import com.ibm.spark.kernel.protocol.v5.{HeaderBuilder, KMBuilder, KernelMessage}
+import com.ibm.spark.kernel.protocol.v5.content.{InputReply, InputRequest}
+import com.ibm.spark.utils.LogLike
+import com.ibm.spark.kernel.protocol.v5.client.Utilities._
+import play.api.libs.json.Json
+
+import StdinClient._
+import akka.pattern.ask
+
+import scala.concurrent.duration._
+import scala.concurrent.Await
+
+object StdinClient {
+  type ResponseFunction = (String, Boolean) => String
+  val EmptyResponseFunction: ResponseFunction = (_, _) => ""
+}
+
+/**
+ * The client endpoint for Stdin messages specified in the IPython Kernel Spec
+ * @param socketFactory A factory to create the ZeroMQ socket connection
+ * @param actorLoader The loader used to retrieve actors
+ * @param signatureEnabled Whether or not to check and provide signatures
+ */
+class StdinClient(
+  socketFactory: SocketFactory,
+  actorLoader: ActorLoader,
+  signatureEnabled: Boolean
+) extends Actor with LogLike {
+  logger.debug("Created stdin client actor")
+
+  private val socket = socketFactory.StdinClient(context.system, self)
+
+  /**
+   * The function to use for generating a response from an input_request
+   * message.
+   */
+  private var responseFunc: ResponseFunction = EmptyResponseFunction
+
+  override def receive: Receive = {
+    case responseFunc: ResponseFunction =>
+      logger.debug("Updating response function")
+      this.responseFunc = responseFunc
+
+    case message: ZMQMessage =>
+      logger.debug("Received stdin kernel message")
+      val kernelMessage: KernelMessage = message
+      val messageType = kernelMessage.header.msg_type
+
+      if (messageType == InputRequest.toTypeString) {
+        logger.debug("Message is an input request")
+
+        val inputRequest =
+          Json.parse(kernelMessage.contentString).as[InputRequest]
+        val value = responseFunc(inputRequest.prompt, inputRequest.password)
+        val inputReply = InputReply(value)
+
+        val newKernelMessage = KMBuilder()
+          .withParent(kernelMessage)
+          .withHeader(HeaderBuilder.empty.copy(
+            msg_type = InputReply.toTypeString,
+            session = getSessionId
+          ))
+          .withContentString(inputReply)
+          .build
+
+        import scala.concurrent.ExecutionContext.Implicits.global
+        val messageWithSignature = if (signatureEnabled) {
+          val signatureManager =
+            actorLoader.load(SecurityActorType.SignatureManager)
+          val signatureMessage = signatureManager ? newKernelMessage
+          Await.result(signatureMessage, 100.milliseconds)
+            .asInstanceOf[KernelMessage]
+        } else newKernelMessage
+
+        val zmqMessage: ZMQMessage = messageWithSignature
+
+        socket ! zmqMessage
+      } else {
+        logger.debug(s"Unknown message of type $messageType")
+      }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/com/ibm/spark/comm/ClientCommManagerSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/com/ibm/spark/comm/ClientCommManagerSpec.scala b/client/src/test/scala/com/ibm/spark/comm/ClientCommManagerSpec.scala
deleted file mode 100644
index 85ef4aa..0000000
--- a/client/src/test/scala/com/ibm/spark/comm/ClientCommManagerSpec.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package com.ibm.spark.comm
-
-import com.ibm.spark.kernel.protocol.v5
-import com.ibm.spark.kernel.protocol.v5._
-import com.ibm.spark.kernel.protocol.v5.client.ActorLoader
-import com.ibm.spark.kernel.protocol.v5.content.CommContent
-import org.scalatest.mock.MockitoSugar
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-import org.scalatest.{BeforeAndAfter, FunSpec, Matchers}
-
-class ClientCommManagerSpec extends FunSpec with Matchers with BeforeAndAfter
-  with MockitoSugar
-{
-  private val TestTargetName = "some target"
-
-  private var mockActorLoader: ActorLoader = _
-  private var mockKMBuilder: KMBuilder = _
-  private var mockCommRegistrar: CommRegistrar = _
-  private var clientCommManager: ClientCommManager = _
-
-  private var generatedCommWriter: CommWriter = _
-
-  before {
-    mockActorLoader = mock[ActorLoader]
-    mockKMBuilder = mock[KMBuilder]
-    mockCommRegistrar = mock[CommRegistrar]
-
-    clientCommManager = new ClientCommManager(
-      mockActorLoader,
-      mockKMBuilder,
-      mockCommRegistrar
-    ) {
-      override protected def newCommWriter(commId: UUID): CommWriter = {
-        val commWriter = super.newCommWriter(commId)
-
-        generatedCommWriter = commWriter
-
-        val spyCommWriter = spy(commWriter)
-        doNothing().when(spyCommWriter)
-          .sendCommKernelMessage(any[KernelMessageContent with CommContent])
-
-        spyCommWriter
-      }
-    }
-  }
-
-  describe("ClientCommManager") {
-    describe("#open") {
-      it("should return a wrapped instance of ClientCommWriter") {
-        clientCommManager.open(TestTargetName, v5.MsgData.Empty)
-
-        // Exposed hackishly for testing
-        generatedCommWriter shouldBe a [ClientCommWriter]
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/com/ibm/spark/comm/ClientCommWriterSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/com/ibm/spark/comm/ClientCommWriterSpec.scala b/client/src/test/scala/com/ibm/spark/comm/ClientCommWriterSpec.scala
deleted file mode 100644
index f5a4b43..0000000
--- a/client/src/test/scala/com/ibm/spark/comm/ClientCommWriterSpec.scala
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.ibm.spark.comm
-
-import java.util.UUID
-
-import akka.actor.{ActorSelection, ActorSystem}
-import akka.testkit.{TestKit, TestProbe}
-import com.ibm.spark.kernel.protocol.v5._
-import com.ibm.spark.kernel.protocol.v5.client.ActorLoader
-import com.ibm.spark.kernel.protocol.v5.content._
-import com.typesafe.config.ConfigFactory
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
-import play.api.libs.json.Json
-
-import scala.concurrent.duration._
-
-object ClientCommWriterSpec {
-  val config ="""
-    akka {
-      loglevel = "WARNING"
-    }"""
-}
-
-class ClientCommWriterSpec extends TestKit(
-  ActorSystem("ClientCommWriterSpec",
-    ConfigFactory.parseString(ClientCommWriterSpec.config))
-) with FunSpecLike with Matchers with BeforeAndAfter with MockitoSugar
-{
-
-  private val commId = UUID.randomUUID().toString
-  private var clientCommWriter: ClientCommWriter = _
-  private var kernelMessageBuilder: KMBuilder = _
-
-  private var actorLoader: ActorLoader = _
-  private var shellSocketProbe: TestProbe = _
-
-  /**
-   * Retrieves the next message available.
-   *
-   * @return The KernelMessage instance (or an error if timed out)
-   */
-  private def getNextMessage =
-    shellSocketProbe.receiveOne(200.milliseconds)
-      .asInstanceOf[KernelMessage]
-
-  /**
-   * Retrieves the next message available and returns its type.
-   *
-   * @return The type of the message (pulled from message header)
-   */
-  private def getNextMessageType = getNextMessage.header.msg_type
-
-  /**
-   * Retrieves the next message available and parses the content string.
-   *
-   * @tparam T The type to coerce the content string into
-   *
-   * @return The resulting KernelMessageContent instance
-   */
-  private def getNextMessageContents[T <: KernelMessageContent]
-    (implicit fjs: play.api.libs.json.Reads[T], mf: Manifest[T]) =
-  {
-    val receivedMessage = getNextMessage
-
-    Json.parse(receivedMessage.contentString).as[T]
-  }
-
-  before {
-    kernelMessageBuilder = spy(KMBuilder())
-
-    // Construct path for kernel message relay
-    actorLoader = mock[ActorLoader]
-    shellSocketProbe = TestProbe()
-    val shellSocketSelection: ActorSelection =
-      system.actorSelection(shellSocketProbe.ref.path.toString)
-    doReturn(shellSocketSelection)
-      .when(actorLoader).load(SocketType.ShellClient)
-
-    // Create a new writer to use for testing
-    clientCommWriter =
-      new ClientCommWriter(actorLoader, kernelMessageBuilder, commId)
-  }
-
-  describe("ClientCommWriter") {
-    describe("#writeOpen") {
-      it("should send a comm_open message to the relay") {
-        clientCommWriter.writeOpen(anyString())
-
-        getNextMessageType should be (CommOpen.toTypeString)
-      }
-
-      it("should include the comm_id in the message") {
-        val expected = commId
-        clientCommWriter.writeOpen(anyString())
-
-        val actual = getNextMessageContents[CommOpen].comm_id
-
-        actual should be (expected)
-      }
-
-      it("should include the target name in the message") {
-        val expected = "<TARGET_NAME>"
-        clientCommWriter.writeOpen(expected)
-
-        val actual = getNextMessageContents[CommOpen].target_name
-
-        actual should be (expected)
-      }
-
-      it("should provide empty data in the message if no data is provided") {
-        val expected = MsgData.Empty
-        clientCommWriter.writeOpen(anyString())
-
-        val actual = getNextMessageContents[CommOpen].data
-
-        actual should be (expected)
-      }
-
-      it("should include the data in the message") {
-        val expected = MsgData("some key" -> "some value")
-        clientCommWriter.writeOpen(anyString(), expected)
-
-        val actual = getNextMessageContents[CommOpen].data
-
-        actual should be (expected)
-      }
-    }
-
-    describe("#writeMsg") {
-      it("should send a comm_msg message to the relay") {
-        clientCommWriter.writeMsg(MsgData.Empty)
-
-        getNextMessageType should be (CommMsg.toTypeString)
-      }
-
-      it("should include the comm_id in the message") {
-        val expected = commId
-        clientCommWriter.writeMsg(MsgData.Empty)
-
-        val actual = getNextMessageContents[CommMsg].comm_id
-
-        actual should be (expected)
-      }
-
-      it("should fail a require if the data is null") {
-        intercept[IllegalArgumentException] {
-          clientCommWriter.writeMsg(null)
-        }
-      }
-
-      it("should include the data in the message") {
-        val expected = MsgData("some key" -> "some value")
-        clientCommWriter.writeMsg(expected)
-
-        val actual = getNextMessageContents[CommMsg].data
-
-        actual should be (expected)
-      }
-    }
-
-    describe("#writeClose") {
-      it("should send a comm_close message to the relay") {
-        clientCommWriter.writeClose()
-
-        getNextMessageType should be (CommClose.toTypeString)
-      }
-
-      it("should include the comm_id in the message") {
-        val expected = commId
-        clientCommWriter.writeClose()
-
-        val actual = getNextMessageContents[CommClose].comm_id
-
-        actual should be (expected)
-      }
-
-      it("should provide empty data in the message if no data is provided") {
-        val expected = MsgData.Empty
-        clientCommWriter.writeClose()
-
-        val actual = getNextMessageContents[CommClose].data
-
-        actual should be (expected)
-      }
-
-      it("should include the data in the message") {
-        val expected = MsgData("some key" -> "some value")
-        clientCommWriter.writeClose(expected)
-
-        val actual = getNextMessageContents[CommClose].data
-
-        actual should be (expected)
-      }
-    }
-
-    describe("#write") {
-      it("should send a comm_msg message to the relay") {
-        clientCommWriter.write(Array('a'), 0, 1)
-
-        getNextMessageType should be (CommMsg.toTypeString)
-      }
-
-      it("should include the comm_id in the message") {
-        val expected = commId
-        clientCommWriter.write(Array('a'), 0, 1)
-
-        val actual = getNextMessageContents[CommMsg].comm_id
-
-        actual should be (expected)
-      }
-
-      it("should package the string as part of the data with a 'message' key") {
-        val expected = MsgData("message" -> "a")
-        clientCommWriter.write(Array('a'), 0, 1)
-
-        val actual = getNextMessageContents[CommMsg].data
-
-        actual should be (expected)
-      }
-    }
-
-    describe("#flush") {
-      it("should do nothing") {
-        // TODO: Is this test necessary? It does nothing.
-        clientCommWriter.flush()
-      }
-    }
-
-    describe("#close") {
-      it("should send a comm_close message to the relay") {
-        clientCommWriter.close()
-
-        getNextMessageType should be (CommClose.toTypeString)
-      }
-
-      it("should include the comm_id in the message") {
-        val expected = commId
-        clientCommWriter.close()
-
-        val actual = getNextMessageContents[CommClose].comm_id
-
-        actual should be (expected)
-      }
-
-      it("should provide empty data in the message") {
-        val expected = MsgData.Empty
-        clientCommWriter.close()
-
-        val actual = getNextMessageContents[CommClose].data
-
-        actual should be (expected)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/SparkKernelClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/SparkKernelClientSpec.scala b/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/SparkKernelClientSpec.scala
deleted file mode 100644
index 8db29f8..0000000
--- a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/SparkKernelClientSpec.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.kernel.protocol.v5.client
-
-import akka.actor.ActorSystem
-import akka.testkit.{TestKit, TestProbe}
-import com.ibm.spark.comm.{CommCallbacks, CommStorage, CommRegistrar}
-import com.ibm.spark.kernel.protocol.v5
-import com.ibm.spark.kernel.protocol.v5._
-import com.ibm.spark.kernel.protocol.v5.client.execution.ExecuteRequestTuple
-import scala.concurrent.duration._
-import org.mockito.Mockito._
-import org.mockito.Matchers.{eq => mockEq, _}
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
-
-class SparkKernelClientSpec
-  extends TestKit(ActorSystem("SparkKernelClientActorSystem"))
-  with Matchers with MockitoSugar with FunSpecLike with BeforeAndAfter
-{
-  private val TestTargetName = "some target"
-
-  private var mockActorLoader: ActorLoader = _
-  private var mockCommRegistrar: CommRegistrar = _
-  private var sparkKernelClient: SparkKernelClient = _
-  private var executeRequestProbe: TestProbe = _
-  private var shellClientProbe: TestProbe = _
-
-  before {
-    mockActorLoader = mock[ActorLoader]
-    mockCommRegistrar = mock[CommRegistrar]
-
-    executeRequestProbe = TestProbe()
-    when(mockActorLoader.load(MessageType.Incoming.ExecuteRequest))
-      .thenReturn(system.actorSelection(executeRequestProbe.ref.path.toString))
-
-    shellClientProbe = TestProbe()
-    when(mockActorLoader.load(SocketType.ShellClient))
-      .thenReturn(system.actorSelection(shellClientProbe.ref.path.toString))
-
-    sparkKernelClient = new SparkKernelClient(
-      mockActorLoader, system, mockCommRegistrar)
-  }
-
-  describe("SparkKernelClient") {
-    describe("#execute") {
-      it("should send an ExecuteRequest message") {
-        val func = (x: Any) => println(x)
-        sparkKernelClient.execute("val foo = 2")
-        executeRequestProbe.expectMsgClass(classOf[ExecuteRequestTuple])
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/execution/DeferredExecutionTest.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/execution/DeferredExecutionTest.scala b/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/execution/DeferredExecutionTest.scala
deleted file mode 100644
index db3205c..0000000
--- a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/execution/DeferredExecutionTest.scala
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.kernel.protocol.v5.client.execution
-
-import com.ibm.spark.kernel.protocol.v5.content.{StreamContent, ExecuteResult}
-import com.ibm.spark.kernel.protocol.v5.content._
-import com.ibm.spark.kernel.protocol.v5._
-import org.scalatest.{Matchers, FunSpec}
-import org.scalatest.concurrent.ScalaFutures
-
-import scala.concurrent.Promise
-import scala.util.{Try, Failure, Success}
-
-object DeferredExecutionTest {
-  val executeReplyError  = Option(ExecuteReplyError(1, None, None, None))
-  val executeReplyOk     = Option(ExecuteReplyOk(1, None, None))
-  val executeResult      = Option(ExecuteResult(1, Data(), Metadata()))
-
-  def mockSendingKernelMessages(deferredExecution: DeferredExecution,
-                                executeReplyOption: Option[ExecuteReply],
-                                executeResultOption: Option[ExecuteResult]): Unit = {
-    //  Mock behaviour of the kernel sending messages to us
-    executeResultOption.map {
-      executeResult =>
-      deferredExecution.resolveResult(executeResult)
-    }
-    executeReplyOption.map {
-      executeReply=>
-        deferredExecution.resolveReply(executeReply)
-    }
-  }
-  
-  def processExecuteResult(executeResult: ExecuteResult)
-          (implicit promise: Promise[Try[ExecuteResultPromise]]): Unit = {
-    promise.success(Success(new ExecuteResultPromise))
-  }
-
-  def processExecuteReply(executeReplyError: ExecuteReplyError)
-          (implicit promise: Promise[Try[ExecuteReplyPromise]]): Unit = {
-    promise.success(Success(new ExecuteReplyPromise))
-  }
-
-  def processOnSuccessfulCompletion(executeReplyOk: ExecuteReplyOk)
-    (implicit promise: Promise[Try[String]]): Unit = {
-    promise.success(Success("Success"))
-  }
-}
-
-class ExecuteResultPromise {}
-class ExecuteReplyPromise {}
-
-class DeferredExecutionTest extends FunSpec with ScalaFutures with Matchers {
-  import DeferredExecutionTest._
-  describe("DeferredExecution") {
-    describe("onResult( callback )"){
-      it("should run all onResult callbacks when ExecuteResult and " +
-         "successful ExecuteReply are returned") {
-        implicit val executeResultPromise: Promise[Try[ExecuteResultPromise]] = Promise()
-        val deferredExecution: DeferredExecution = DeferredExecution()
-          .onResult(processExecuteResult)
-
-        mockSendingKernelMessages(deferredExecution, executeReplyOk, executeResult)
-
-        whenReady(executeResultPromise.future) {
-          case Success(v) => assert(true)
-          case Failure(exception: Throwable) =>
-            fail("Promise resolved with failure when processing " +
-                 "execute result.", exception)
-          case unknownValue=>
-            fail(s"Promised resolved with unknown value: ${unknownValue}")
-        }
-      }
-      it("should run all onResultCallbacks registered after deferred has " +
-         "been resolved") {
-        val executeResultPromise: Promise[Int] = Promise()
-        var counter = 0
-        def processExecuteResult (executeResult: ExecuteResult) : Unit = {
-          counter = counter + 1
-          // Hack to allow callbacks to occur after meeting our assertion value
-          if(counter == 2) {
-            Thread.sleep(500)
-            executeResultPromise.success(counter)
-          }
-        }
-
-        val deferredExecution: DeferredExecution = DeferredExecution()
-          .onResult(processExecuteResult)
-
-        mockSendingKernelMessages(deferredExecution, executeReplyOk, executeResult)
-        //  register callback after code execution has completed
-        deferredExecution.onResult(processExecuteResult)
-
-        whenReady(executeResultPromise.future){ _ => counter should be(2) }
-
-      }
-      it("should not run onResult callbacks when ExecuteReply is a failure") {
-        //  This promise should be resolved by the deferred
-        implicit val executeReplyPromise: Promise[Try[ExecuteReplyPromise]] = Promise()
-        //  This promise should not be resolved by the deferred
-        implicit val executeResultPromise: Promise[Try[ExecuteResultPromise]] = Promise()
-
-        val deferredExecution: DeferredExecution = DeferredExecution()
-          .onError(processExecuteReply)
-          .onResult(processExecuteResult)
-        mockSendingKernelMessages(deferredExecution, executeReplyError, executeResult)
-
-        whenReady(executeReplyPromise.future) { _ =>
-          executeResultPromise.isCompleted should be(false)
-        }
-      }
-    }
-    describe("onStream( callback )"){
-      it("should execute all streaming callbacks") {
-        var counter = 0
-        val streamingResultPromise: Promise[Int] = Promise()
-        def processStreamContent (streamContent: StreamContent) : Unit = {
-          counter = counter + 1
-          if (counter == 4)
-            streamingResultPromise.success(counter)
-        }
-
-        val deferredExecution: DeferredExecution = DeferredExecution()
-          .onStream(processStreamContent)
-          .onStream(processStreamContent)
-
-        deferredExecution.emitStreamContent(StreamContent("stdout","msg"))
-        deferredExecution.emitStreamContent(StreamContent("stdout","msg2"))
-
-        whenReady(streamingResultPromise.future){ counterValue =>
-          counterValue should be(4)
-        }
-      }
-    }
-    
-    describe("onError( callback )") {
-      it("should run all onError callbacks") {
-        implicit val executeReplyPromise: Promise[Try[ExecuteReplyPromise]] = Promise()
-        val deferredExecution: DeferredExecution = DeferredExecution()
-          .onError(processExecuteReply)
-
-        mockSendingKernelMessages(deferredExecution, executeReplyError, executeResult)
-
-        whenReady(executeReplyPromise.future) {
-          case Success(v) => assert(true)
-          case Failure(exception: Throwable) =>
-            fail("Promised resolved with failure while trying to " +
-                 "process execute result.", exception)
-          case unknownValue=>
-            fail(s"Promised resolved with unknown value: ${unknownValue}")
-        }
-      }
-      it("should not run onError callbacks when ExecuteReply is a success") {
-        //  This promise and callback should not be executed by the deferred
-        implicit val executeReplyPromise: Promise[Try[ExecuteReplyPromise]] = Promise()
-        //  This promise and callback should be executed by the deferred
-        implicit val executeResultPromise: Promise[Try[ExecuteResultPromise]] = Promise()
-
-        val deferredExecution: DeferredExecution = DeferredExecution()
-          .onError(processExecuteReply)
-          .onResult(processExecuteResult)
-
-        mockSendingKernelMessages(deferredExecution, executeReplyOk, executeResult)
-
-        whenReady(executeResultPromise.future) {
-          case _ =>
-            executeReplyPromise.isCompleted should be(false)
-        }
-      }
-    }
-    describe("onSuccessfulCompletion( callback )") {
-      it("should run all onSuccessfulCompletion callbacks on ExecuteReplyOk and ExecuteResult") {
-        implicit val executeCompletePromise: Promise[Try[String]] = Promise()
-        val deferredExecution: DeferredExecution = DeferredExecution()
-          .onSuccess(processOnSuccessfulCompletion)
-
-        mockSendingKernelMessages(deferredExecution, executeReplyOk, executeResult)
-
-        whenReady(executeCompletePromise.future) {
-          case Success(s) =>  //  Nothing to do for the successful case
-          case Failure(exception: Throwable) =>
-            fail("Promised resolved with failure while trying to " +
-              "process execute result.", exception)
-          case unknownValue=>
-            fail(s"Promised resolved with unknown value: ${unknownValue}")
-        }
-      }
-      it("should run all onSuccessfulCompletion callbacks on ExecuteReplyOk and None") {
-        implicit val executeCompletePromise: Promise[Try[String]] = Promise()
-        val deferredExecution: DeferredExecution = DeferredExecution()
-          .onSuccess(processOnSuccessfulCompletion)
-
-        mockSendingKernelMessages(deferredExecution, executeReplyOk, None)
-
-        whenReady(executeCompletePromise.future) {
-          case Success(s) =>  //  Nothing to do for the successful case
-          case Failure(exception: Throwable) =>
-            fail("Promised resolved with failure while trying to " +
-              "process execute result.", exception)
-          case unknownValue=>
-            fail(s"Promised resolved with unknown value: ${unknownValue}")
-        }
-      }
-      it("should not run onSuccessfulCompletion callbacks on ExecuteReplyError") {
-        implicit val executeCompletePromise: Promise[Try[String]] = Promise()
-        //  This promise and callback should not be executed by the deferred
-        implicit val executeReplyPromise: Promise[Try[ExecuteReplyPromise]] = Promise()
-
-        val deferredExecution: DeferredExecution = DeferredExecution()
-          .onError(processExecuteReply)
-          .onSuccess(processOnSuccessfulCompletion)
-
-        mockSendingKernelMessages(deferredExecution, executeReplyError, executeResult)
-
-        whenReady(executeReplyPromise.future) {
-          case _ =>
-            executeCompletePromise.isCompleted should be(false)
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/HeartbeatClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/HeartbeatClientSpec.scala b/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/HeartbeatClientSpec.scala
deleted file mode 100644
index 9fdd702..0000000
--- a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/HeartbeatClientSpec.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.kernel.protocol.v5.client.socket
-
-import akka.actor.{ActorRef, ActorSystem, Props}
-import akka.testkit.{TestProbe, ImplicitSender, TestKit}
-import com.ibm.spark.communication.ZMQMessage
-import com.ibm.spark.kernel.protocol.v5.client.ActorLoader
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, FunSpecLike}
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-
-class HeartbeatClientSpec extends TestKit(ActorSystem("HeartbeatActorSpec"))
-  with ImplicitSender with FunSpecLike with Matchers with MockitoSugar {
-
-  describe("HeartbeatClientActor") {
-    val socketFactory = mock[SocketFactory]
-    val mockActorLoader = mock[ActorLoader]
-    val probe : TestProbe = TestProbe()
-    when(socketFactory.HeartbeatClient(any(classOf[ActorSystem]), any(classOf[ActorRef]))).thenReturn(probe.ref)
-
-    val heartbeatClient = system.actorOf(Props(
-      classOf[HeartbeatClient], socketFactory, mockActorLoader, true
-    ))
-
-    describe("send heartbeat") {
-      it("should send ping ZMQMessage") {
-        heartbeatClient ! HeartbeatMessage
-        probe.expectMsgClass(classOf[ZMQMessage])
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/IOPubClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/IOPubClientSpec.scala b/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/IOPubClientSpec.scala
deleted file mode 100644
index b592dcd..0000000
--- a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/IOPubClientSpec.scala
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.kernel.protocol.v5.client.socket
-
-import java.util.UUID
-
-import akka.actor.{ActorRef, ActorSystem, Props}
-import akka.pattern.ask
-import akka.testkit.{ImplicitSender, TestKit, TestProbe}
-import akka.util.Timeout
-import com.ibm.spark.comm.{CommCallbacks, CommRegistrar, CommStorage, CommWriter}
-import com.ibm.spark.communication.ZMQMessage
-import com.ibm.spark.kernel.protocol.v5
-import com.ibm.spark.kernel.protocol.v5._
-import com.ibm.spark.kernel.protocol.v5.client.Utilities._
-import com.ibm.spark.kernel.protocol.v5.client.execution.{DeferredExecution, DeferredExecutionManager}
-import com.ibm.spark.kernel.protocol.v5.client.{ActorLoader, Utilities}
-import com.ibm.spark.kernel.protocol.v5.content.{CommClose, CommMsg, CommOpen, StreamContent}
-import com.typesafe.config.ConfigFactory
-import org.mockito.Matchers.{eq => mockEq, _}
-import org.mockito.Mockito._
-import org.scalatest.concurrent.{Eventually, ScalaFutures}
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.time.{Milliseconds, Span}
-import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
-import play.api.libs.json.Json
-
-import scala.concurrent.duration._
-import scala.concurrent.{Future, Promise}
-import scala.util.Failure
-
-object IOPubClientSpec {
-  val config ="""
-    akka {
-      loglevel = "WARNING"
-    }"""
-}
-
-class IOPubClientSpec extends TestKit(ActorSystem(
-  "IOPubClientSpecSystem", ConfigFactory.parseString(IOPubClientSpec.config)
-)) with ImplicitSender with FunSpecLike with Matchers with MockitoSugar
-  with ScalaFutures with BeforeAndAfter with Eventually
-{
-  private val TestTimeout = Timeout(10.seconds)
-  implicit override val patienceConfig = PatienceConfig(
-    timeout = scaled(Span(200, Milliseconds)),
-    interval = scaled(Span(5, Milliseconds))
-  )
-  private val SignatureEnabled = true
-
-  private var clientSocketProbe: TestProbe = _
-  private var mockClientSocketFactory: SocketFactory = _
-  private var mockActorLoader: ActorLoader = _
-  private var mockCommRegistrar: CommRegistrar = _
-  private var spyCommStorage: CommStorage = _
-  private var mockCommCallbacks: CommCallbacks = _
-  private var ioPubClient: ActorRef = _
-
-  private var kmBuilder: KMBuilder = _
-
-  private val id = UUID.randomUUID().toString
-  private val TestTargetName = "some target"
-  private val TestCommId = UUID.randomUUID().toString
-
-  before {
-    kmBuilder = KMBuilder()
-    mockCommCallbacks = mock[CommCallbacks]
-    mockCommRegistrar = mock[CommRegistrar]
-
-    spyCommStorage = spy(new CommStorage())
-
-    clientSocketProbe = TestProbe()
-    mockActorLoader = mock[ActorLoader]
-    mockClientSocketFactory = mock[SocketFactory]
-
-    //  Stub the return value for the socket factory
-    when(mockClientSocketFactory.IOPubClient(anyObject(), any[ActorRef]))
-      .thenReturn(clientSocketProbe.ref)
-
-    //  Construct the object we will test against
-    ioPubClient = system.actorOf(Props(
-      classOf[IOPubClient], mockClientSocketFactory, mockActorLoader,
-      SignatureEnabled, mockCommRegistrar, spyCommStorage
-    ))
-  }
-
-  describe("IOPubClient") {
-    describe("#receive") {
-      it("should execute all Comm open callbacks on comm_open message") {
-        val message: ZMQMessage = kmBuilder
-          .withHeader(CommOpen.toTypeString)
-          .withContentString(CommOpen(TestCommId, TestTargetName, v5.MsgData.Empty))
-          .build
-
-        // Mark as target being provided
-        doReturn(Some(mockCommCallbacks)).when(spyCommStorage)
-          .getTargetCallbacks(anyString())
-
-        // Simulate receiving a message from the kernel
-        ioPubClient ! message
-
-        // Check to see if "eventually" the callback is triggered
-        eventually {
-          verify(mockCommCallbacks).executeOpenCallbacks(
-            any[CommWriter], mockEq(TestCommId),
-            mockEq(TestTargetName), any[v5.MsgData])
-        }
-      }
-
-      it("should not execute Comm open callbacks if the target is not found") {
-        val message: ZMQMessage = kmBuilder
-          .withHeader(CommOpen.toTypeString)
-          .withContentString(CommOpen(TestCommId, TestTargetName, v5.MsgData.Empty))
-          .build
-
-        // Mark as target NOT being provided
-        doReturn(None).when(spyCommStorage).getTargetCallbacks(anyString())
-
-        // Simulate receiving a message from the kernel
-        ioPubClient ! message
-
-        // Check to see if "eventually" the callback is NOT triggered
-        eventually {
-          // Check that we have checked if the target exists
-          verify(spyCommStorage).getTargetCallbacks(TestTargetName)
-
-          verify(mockCommCallbacks, never()).executeOpenCallbacks(
-            any[CommWriter], mockEq(TestCommId),
-            mockEq(TestTargetName), any[v5.MsgData])
-          verify(mockCommRegistrar, never()).link(TestTargetName, TestCommId)
-        }
-      }
-
-      it("should execute all Comm msg callbacks on comm_msg message") {
-        val message: ZMQMessage = kmBuilder
-          .withHeader(CommMsg.toTypeString)
-          .withContentString(CommMsg(TestCommId, v5.MsgData.Empty))
-          .build
-
-        // Mark as target being provided
-        doReturn(Some(mockCommCallbacks)).when(spyCommStorage)
-          .getCommIdCallbacks(any[v5.UUID])
-
-        // Simulate receiving a message from the kernel
-        ioPubClient ! message
-
-        // Check to see if "eventually" the callback is triggered
-        eventually {
-          verify(mockCommCallbacks).executeMsgCallbacks(
-            any[CommWriter], mockEq(TestCommId), any[v5.MsgData])
-        }
-      }
-
-      it("should not execute Comm msg callbacks if the Comm id is not found") {
-        val message: ZMQMessage = kmBuilder
-          .withHeader(CommMsg.toTypeString)
-          .withContentString(CommMsg(TestCommId, v5.MsgData.Empty))
-          .build
-
-        // Mark as target NOT being provided
-        doReturn(None).when(spyCommStorage).getCommIdCallbacks(any[v5.UUID])
-
-        // Simulate receiving a message from the kernel
-        ioPubClient ! message
-
-        // Check to see if "eventually" the callback is NOT triggered
-        eventually {
-          // Check that we have checked if the target exists
-          verify(spyCommStorage).getCommIdCallbacks(TestCommId)
-
-          verify(mockCommCallbacks, never()).executeMsgCallbacks(
-            any[CommWriter], mockEq(TestCommId), any[v5.MsgData])
-        }
-      }
-
-      it("should execute all Comm close callbacks on comm_close message") {
-        val message: ZMQMessage = kmBuilder
-          .withHeader(CommClose.toTypeString)
-          .withContentString(CommClose(TestCommId, v5.MsgData.Empty))
-          .build
-
-        // Mark as target being provided
-        doReturn(Some(mockCommCallbacks)).when(spyCommStorage)
-          .getCommIdCallbacks(any[v5.UUID])
-
-        // Simulate receiving a message from the kernel
-        ioPubClient ! message
-
-        // Check to see if "eventually" the callback is triggered
-        eventually {
-          verify(mockCommCallbacks).executeCloseCallbacks(
-            any[CommWriter], mockEq(TestCommId), any[v5.MsgData])
-        }
-      }
-
-      it("should not execute Comm close callbacks if Comm id is not found") {
-        val message: ZMQMessage = kmBuilder
-          .withHeader(CommClose.toTypeString)
-          .withContentString(CommClose(TestCommId, v5.MsgData.Empty))
-          .build
-
-        // Mark as target NOT being provided
-        doReturn(None).when(spyCommStorage).getCommIdCallbacks(any[v5.UUID])
-
-        // Simulate receiving a message from the kernel
-        ioPubClient ! message
-
-        // Check to see if "eventually" the callback is NOT triggered
-        eventually {
-          // Check that we have checked if the target exists
-          verify(spyCommStorage).getCommIdCallbacks(TestCommId)
-
-          verify(mockCommCallbacks, never()).executeCloseCallbacks(
-            any[CommWriter], mockEq(TestCommId), any[v5.MsgData])
-        }
-      }
-
-      it("should call a registered callback on stream message") {
-        val result = StreamContent("foo", "bar")
-        val header = Header(id, "spark", id,
-          MessageType.Outgoing.Stream.toString, "5.0")
-        val parentHeader = Header(id, "spark", id,
-          MessageType.Incoming.ExecuteRequest.toString, "5.0")
-
-        val kernelMessage = new KernelMessage(
-          Seq[String](),
-          "",
-          header,
-          parentHeader,
-          Metadata(),
-          Json.toJson(result).toString()
-        )
-        val promise: Promise[String] = Promise()
-        val de: DeferredExecution = DeferredExecution().onStream(
-          (content: StreamContent) => {
-            promise.success(content.text)
-          }
-        )
-        DeferredExecutionManager.add(id, de)
-        // Send the message to the IOPubClient
-        val zmqMessage: ZMQMessage = kernelMessage
-
-        ioPubClient ! zmqMessage
-
-        whenReady(promise.future) {
-          case res: String =>
-            res should be eq("bar")
-          case _ =>
-            fail(s"Received failure when asking IOPubClient")
-        }
-      }
-
-      it("should not invoke callback when stream message's parent header is null") {
-        // Construct the kernel message
-        val result = StreamContent("foo", "bar")
-        val header = Header(id, "spark", id,
-          MessageType.Outgoing.Stream.toString, "5.0")
-
-        val kernelMessage = new KernelMessage(
-          Seq[String](),
-          "",
-          header,
-          null,
-          Metadata(),
-          Json.toJson(result).toString()
-        )
-
-        // Send the message to the IOPubClient
-        val zmqMessage: ZMQMessage = kernelMessage
-        val futureResult: Future[Any] = ioPubClient.ask(zmqMessage)(TestTimeout)
-        whenReady(futureResult) {
-          case result: Failure[Any] =>
-            //  Getting the value of the failure will cause the underlying exception will be thrown
-            try {
-              result.get
-            } catch {
-              case t:RuntimeException =>
-                t.getMessage should be("Parent Header was null in Kernel Message.")
-            }
-          case result =>
-            fail(s"Did not receive failure!! ${result}")
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/ShellClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/ShellClientSpec.scala b/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/ShellClientSpec.scala
deleted file mode 100644
index 0110dfd..0000000
--- a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/ShellClientSpec.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.kernel.protocol.v5.client.socket
-
-import java.util.UUID
-
-import akka.actor.{ActorRef, ActorSystem, Props}
-import akka.testkit.{TestProbe, ImplicitSender, TestKit}
-import com.ibm.spark.communication.ZMQMessage
-import com.ibm.spark.communication.security.SecurityActorType
-import com.ibm.spark.kernel.protocol.v5._
-import com.ibm.spark.kernel.protocol.v5.client.ActorLoader
-import com.ibm.spark.kernel.protocol.v5.content.ExecuteRequest
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, FunSpecLike}
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-import play.api.libs.json.Json
-
-class ShellClientSpec extends TestKit(ActorSystem("ShellActorSpec"))
-  with ImplicitSender with FunSpecLike with Matchers with MockitoSugar {
-  private val SignatureEnabled = true
-
-  describe("ShellClientActor") {
-    val socketFactory = mock[SocketFactory]
-    val mockActorLoader = mock[ActorLoader]
-    val probe : TestProbe = TestProbe()
-    when(socketFactory.ShellClient(
-      any(classOf[ActorSystem]), any(classOf[ActorRef])
-    )).thenReturn(probe.ref)
-
-    val signatureManagerProbe = TestProbe()
-    doReturn(system.actorSelection(signatureManagerProbe.ref.path.toString))
-      .when(mockActorLoader).load(SecurityActorType.SignatureManager)
-
-    val shellClient = system.actorOf(Props(
-      classOf[ShellClient], socketFactory, mockActorLoader, SignatureEnabled
-    ))
-
-    describe("send execute request") {
-      it("should send execute request") {
-        val request = ExecuteRequest(
-          "foo", false, true, UserExpressions(), true
-        )
-        val header = Header(
-          UUID.randomUUID().toString, "spark",
-          UUID.randomUUID().toString, MessageType.Incoming.ExecuteRequest.toString,
-          "5.0"
-        )
-        val kernelMessage = KernelMessage(
-          Seq[String](), "",
-          header, HeaderBuilder.empty,
-          Metadata(), Json.toJson(request).toString
-        )
-        shellClient ! kernelMessage
-
-        // Echo back the kernel message sent to have a signature injected
-        signatureManagerProbe.expectMsgClass(classOf[KernelMessage])
-        signatureManagerProbe.reply(kernelMessage)
-
-        probe.expectMsgClass(classOf[ZMQMessage])
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/StdinClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/StdinClientSpec.scala b/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/StdinClientSpec.scala
deleted file mode 100644
index 877c4f5..0000000
--- a/client/src/test/scala/com/ibm/spark/kernel/protocol/v5/client/socket/StdinClientSpec.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package com.ibm.spark.kernel.protocol.v5.client.socket
-
-import akka.actor.{ActorRef, Props, ActorSystem}
-import akka.testkit.{TestProbe, ImplicitSender, TestKit}
-import com.ibm.spark.communication.ZMQMessage
-import com.ibm.spark.communication.security.SecurityActorType
-import com.ibm.spark.kernel.protocol.v5._
-import com.ibm.spark.kernel.protocol.v5.client.ActorLoader
-import com.ibm.spark.kernel.protocol.v5.client.socket.StdinClient.ResponseFunction
-import com.ibm.spark.kernel.protocol.v5.content.{InputReply, InputRequest, ClearOutput, ExecuteRequest}
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
-import com.ibm.spark.kernel.protocol.v5.client.Utilities._
-import play.api.libs.json.Json
-import scala.concurrent.duration._
-
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-
-class StdinClientSpec extends TestKit(ActorSystem("StdinActorSpec"))
-  with ImplicitSender with FunSpecLike with Matchers with MockitoSugar
-  with BeforeAndAfter
-{
-  private val SignatureEnabled = true
-  private val TestReplyString = "some value"
-  private val TestResponseFunc: ResponseFunction = (_, _) => TestReplyString
-
-  private var mockSocketFactory: SocketFactory = _
-  private var mockActorLoader: ActorLoader = _
-  private var signatureManagerProbe: TestProbe = _
-  private var socketProbe: TestProbe = _
-  private var stdinClient: ActorRef = _
-
-  before {
-    socketProbe = TestProbe()
-    signatureManagerProbe = TestProbe()
-    mockSocketFactory = mock[SocketFactory]
-    mockActorLoader = mock[ActorLoader]
-    doReturn(system.actorSelection(signatureManagerProbe.ref.path.toString))
-      .when(mockActorLoader).load(SecurityActorType.SignatureManager)
-    doReturn(socketProbe.ref).when(mockSocketFactory)
-      .StdinClient(any[ActorSystem], any[ActorRef])
-
-    stdinClient = system.actorOf(Props(
-      classOf[StdinClient], mockSocketFactory, mockActorLoader, SignatureEnabled
-    ))
-
-    // Set the response function for our client socket
-    stdinClient ! TestResponseFunc
-  }
-
-  describe("StdinClient") {
-    describe("#receive") {
-      it("should update the response function if receiving a new one") {
-        val expected = "some other value"
-        val replacementFunc: ResponseFunction = (_, _) => expected
-
-        // Update the function
-        stdinClient ! replacementFunc
-
-        val inputRequestMessage: ZMQMessage = KMBuilder()
-          .withHeader(InputRequest.toTypeString)
-          .withContentString(InputRequest("", false))
-          .build
-
-        stdinClient ! inputRequestMessage
-
-        // Echo back the kernel message sent to have a signature injected
-        signatureManagerProbe.expectMsgPF() {
-          case kernelMessage: KernelMessage =>
-            signatureManagerProbe.reply(kernelMessage)
-            true
-        }
-
-        socketProbe.expectMsgPF() {
-          case zmqMessage: ZMQMessage =>
-            val kernelMessage: KernelMessage = zmqMessage
-            val inputReply =
-              Json.parse(kernelMessage.contentString).as[InputReply]
-            inputReply.value should be (expected)
-        }
-      }
-
-      it("should do nothing if the incoming message is not an input_request") {
-        val notInputRequestMessage: ZMQMessage = KMBuilder()
-          .withHeader(ClearOutput.toTypeString)
-          .build
-
-        stdinClient ! notInputRequestMessage
-
-        socketProbe.expectNoMsg(300.milliseconds)
-      }
-
-      it("should respond with an input_reply if the incoming message is " +
-        "an input_request") {
-        val inputRequestMessage: ZMQMessage = KMBuilder()
-          .withHeader(InputRequest.toTypeString)
-          .withContentString(InputRequest("", false))
-          .build
-
-        stdinClient ! inputRequestMessage
-
-        // Echo back the kernel message sent to have a signature injected
-        signatureManagerProbe.expectMsgPF() {
-          case kernelMessage: KernelMessage =>
-            signatureManagerProbe.reply(kernelMessage)
-            true
-        }
-
-        socketProbe.expectMsgPF() {
-          case zmqMessage: ZMQMessage =>
-            val kernelMessage: KernelMessage = zmqMessage
-            val messageType = kernelMessage.header.msg_type
-            messageType should be (InputReply.toTypeString)
-        }
-      }
-
-      it("should use the result from the response function if the incoming " +
-        "message is an input_request") {
-        val inputRequestMessage: ZMQMessage = KMBuilder()
-          .withHeader(InputRequest.toTypeString)
-          .withContentString(InputRequest("", false))
-          .build
-
-        stdinClient ! inputRequestMessage
-
-        // Echo back the kernel message sent to have a signature injected
-        signatureManagerProbe.expectMsgPF() {
-          case kernelMessage: KernelMessage =>
-            signatureManagerProbe.reply(kernelMessage)
-            true
-        }
-
-        socketProbe.expectMsgPF() {
-          case zmqMessage: ZMQMessage =>
-            val kernelMessage: KernelMessage = zmqMessage
-            val inputReply =
-              Json.parse(kernelMessage.contentString).as[InputReply]
-            inputReply.value should be (TestReplyString)
-        }
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/org/apache/toree/comm/ClientCommManagerSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/org/apache/toree/comm/ClientCommManagerSpec.scala b/client/src/test/scala/org/apache/toree/comm/ClientCommManagerSpec.scala
new file mode 100644
index 0000000..85ef4aa
--- /dev/null
+++ b/client/src/test/scala/org/apache/toree/comm/ClientCommManagerSpec.scala
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.ibm.spark.comm
+
+import com.ibm.spark.kernel.protocol.v5
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.kernel.protocol.v5.client.ActorLoader
+import com.ibm.spark.kernel.protocol.v5.content.CommContent
+import org.scalatest.mock.MockitoSugar
+import org.mockito.Mockito._
+import org.mockito.Matchers._
+import org.scalatest.{BeforeAndAfter, FunSpec, Matchers}
+
+class ClientCommManagerSpec extends FunSpec with Matchers with BeforeAndAfter
+  with MockitoSugar
+{
+  private val TestTargetName = "some target"
+
+  private var mockActorLoader: ActorLoader = _
+  private var mockKMBuilder: KMBuilder = _
+  private var mockCommRegistrar: CommRegistrar = _
+  private var clientCommManager: ClientCommManager = _
+
+  private var generatedCommWriter: CommWriter = _
+
+  before {
+    mockActorLoader = mock[ActorLoader]
+    mockKMBuilder = mock[KMBuilder]
+    mockCommRegistrar = mock[CommRegistrar]
+
+    clientCommManager = new ClientCommManager(
+      mockActorLoader,
+      mockKMBuilder,
+      mockCommRegistrar
+    ) {
+      override protected def newCommWriter(commId: UUID): CommWriter = {
+        val commWriter = super.newCommWriter(commId)
+
+        generatedCommWriter = commWriter
+
+        val spyCommWriter = spy(commWriter)
+        doNothing().when(spyCommWriter)
+          .sendCommKernelMessage(any[KernelMessageContent with CommContent])
+
+        spyCommWriter
+      }
+    }
+  }
+
+  describe("ClientCommManager") {
+    describe("#open") {
+      it("should return a wrapped instance of ClientCommWriter") {
+        clientCommManager.open(TestTargetName, v5.MsgData.Empty)
+
+        // Exposed hackishly for testing
+        generatedCommWriter shouldBe a [ClientCommWriter]
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/org/apache/toree/comm/ClientCommWriterSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/org/apache/toree/comm/ClientCommWriterSpec.scala b/client/src/test/scala/org/apache/toree/comm/ClientCommWriterSpec.scala
new file mode 100644
index 0000000..f5a4b43
--- /dev/null
+++ b/client/src/test/scala/org/apache/toree/comm/ClientCommWriterSpec.scala
@@ -0,0 +1,272 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.ibm.spark.comm
+
+import java.util.UUID
+
+import akka.actor.{ActorSelection, ActorSystem}
+import akka.testkit.{TestKit, TestProbe}
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.kernel.protocol.v5.client.ActorLoader
+import com.ibm.spark.kernel.protocol.v5.content._
+import com.typesafe.config.ConfigFactory
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
+import play.api.libs.json.Json
+
+import scala.concurrent.duration._
+
+object ClientCommWriterSpec {
+  val config ="""
+    akka {
+      loglevel = "WARNING"
+    }"""
+}
+
+class ClientCommWriterSpec extends TestKit(
+  ActorSystem("ClientCommWriterSpec",
+    ConfigFactory.parseString(ClientCommWriterSpec.config))
+) with FunSpecLike with Matchers with BeforeAndAfter with MockitoSugar
+{
+
+  private val commId = UUID.randomUUID().toString
+  private var clientCommWriter: ClientCommWriter = _
+  private var kernelMessageBuilder: KMBuilder = _
+
+  private var actorLoader: ActorLoader = _
+  private var shellSocketProbe: TestProbe = _
+
+  /**
+   * Retrieves the next message available.
+   *
+   * @return The KernelMessage instance (or an error if timed out)
+   */
+  private def getNextMessage =
+    shellSocketProbe.receiveOne(200.milliseconds)
+      .asInstanceOf[KernelMessage]
+
+  /**
+   * Retrieves the next message available and returns its type.
+   *
+   * @return The type of the message (pulled from message header)
+   */
+  private def getNextMessageType = getNextMessage.header.msg_type
+
+  /**
+   * Retrieves the next message available and parses the content string.
+   *
+   * @tparam T The type to coerce the content string into
+   *
+   * @return The resulting KernelMessageContent instance
+   */
+  private def getNextMessageContents[T <: KernelMessageContent]
+    (implicit fjs: play.api.libs.json.Reads[T], mf: Manifest[T]) =
+  {
+    val receivedMessage = getNextMessage
+
+    Json.parse(receivedMessage.contentString).as[T]
+  }
+
+  before {
+    kernelMessageBuilder = spy(KMBuilder())
+
+    // Construct path for kernel message relay
+    actorLoader = mock[ActorLoader]
+    shellSocketProbe = TestProbe()
+    val shellSocketSelection: ActorSelection =
+      system.actorSelection(shellSocketProbe.ref.path.toString)
+    doReturn(shellSocketSelection)
+      .when(actorLoader).load(SocketType.ShellClient)
+
+    // Create a new writer to use for testing
+    clientCommWriter =
+      new ClientCommWriter(actorLoader, kernelMessageBuilder, commId)
+  }
+
+  describe("ClientCommWriter") {
+    describe("#writeOpen") {
+      it("should send a comm_open message to the relay") {
+        clientCommWriter.writeOpen(anyString())
+
+        getNextMessageType should be (CommOpen.toTypeString)
+      }
+
+      it("should include the comm_id in the message") {
+        val expected = commId
+        clientCommWriter.writeOpen(anyString())
+
+        val actual = getNextMessageContents[CommOpen].comm_id
+
+        actual should be (expected)
+      }
+
+      it("should include the target name in the message") {
+        val expected = "<TARGET_NAME>"
+        clientCommWriter.writeOpen(expected)
+
+        val actual = getNextMessageContents[CommOpen].target_name
+
+        actual should be (expected)
+      }
+
+      it("should provide empty data in the message if no data is provided") {
+        val expected = MsgData.Empty
+        clientCommWriter.writeOpen(anyString())
+
+        val actual = getNextMessageContents[CommOpen].data
+
+        actual should be (expected)
+      }
+
+      it("should include the data in the message") {
+        val expected = MsgData("some key" -> "some value")
+        clientCommWriter.writeOpen(anyString(), expected)
+
+        val actual = getNextMessageContents[CommOpen].data
+
+        actual should be (expected)
+      }
+    }
+
+    describe("#writeMsg") {
+      it("should send a comm_msg message to the relay") {
+        clientCommWriter.writeMsg(MsgData.Empty)
+
+        getNextMessageType should be (CommMsg.toTypeString)
+      }
+
+      it("should include the comm_id in the message") {
+        val expected = commId
+        clientCommWriter.writeMsg(MsgData.Empty)
+
+        val actual = getNextMessageContents[CommMsg].comm_id
+
+        actual should be (expected)
+      }
+
+      it("should fail a require if the data is null") {
+        intercept[IllegalArgumentException] {
+          clientCommWriter.writeMsg(null)
+        }
+      }
+
+      it("should include the data in the message") {
+        val expected = MsgData("some key" -> "some value")
+        clientCommWriter.writeMsg(expected)
+
+        val actual = getNextMessageContents[CommMsg].data
+
+        actual should be (expected)
+      }
+    }
+
+    describe("#writeClose") {
+      it("should send a comm_close message to the relay") {
+        clientCommWriter.writeClose()
+
+        getNextMessageType should be (CommClose.toTypeString)
+      }
+
+      it("should include the comm_id in the message") {
+        val expected = commId
+        clientCommWriter.writeClose()
+
+        val actual = getNextMessageContents[CommClose].comm_id
+
+        actual should be (expected)
+      }
+
+      it("should provide empty data in the message if no data is provided") {
+        val expected = MsgData.Empty
+        clientCommWriter.writeClose()
+
+        val actual = getNextMessageContents[CommClose].data
+
+        actual should be (expected)
+      }
+
+      it("should include the data in the message") {
+        val expected = MsgData("some key" -> "some value")
+        clientCommWriter.writeClose(expected)
+
+        val actual = getNextMessageContents[CommClose].data
+
+        actual should be (expected)
+      }
+    }
+
+    describe("#write") {
+      it("should send a comm_msg message to the relay") {
+        clientCommWriter.write(Array('a'), 0, 1)
+
+        getNextMessageType should be (CommMsg.toTypeString)
+      }
+
+      it("should include the comm_id in the message") {
+        val expected = commId
+        clientCommWriter.write(Array('a'), 0, 1)
+
+        val actual = getNextMessageContents[CommMsg].comm_id
+
+        actual should be (expected)
+      }
+
+      it("should package the string as part of the data with a 'message' key") {
+        val expected = MsgData("message" -> "a")
+        clientCommWriter.write(Array('a'), 0, 1)
+
+        val actual = getNextMessageContents[CommMsg].data
+
+        actual should be (expected)
+      }
+    }
+
+    describe("#flush") {
+      it("should do nothing") {
+        // TODO: Is this test necessary? It does nothing.
+        clientCommWriter.flush()
+      }
+    }
+
+    describe("#close") {
+      it("should send a comm_close message to the relay") {
+        clientCommWriter.close()
+
+        getNextMessageType should be (CommClose.toTypeString)
+      }
+
+      it("should include the comm_id in the message") {
+        val expected = commId
+        clientCommWriter.close()
+
+        val actual = getNextMessageContents[CommClose].comm_id
+
+        actual should be (expected)
+      }
+
+      it("should provide empty data in the message") {
+        val expected = MsgData.Empty
+        clientCommWriter.close()
+
+        val actual = getNextMessageContents[CommClose].data
+
+        actual should be (expected)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/SparkKernelClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/SparkKernelClientSpec.scala b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/SparkKernelClientSpec.scala
new file mode 100644
index 0000000..8db29f8
--- /dev/null
+++ b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/SparkKernelClientSpec.scala
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.client
+
+import akka.actor.ActorSystem
+import akka.testkit.{TestKit, TestProbe}
+import com.ibm.spark.comm.{CommCallbacks, CommStorage, CommRegistrar}
+import com.ibm.spark.kernel.protocol.v5
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.kernel.protocol.v5.client.execution.ExecuteRequestTuple
+import scala.concurrent.duration._
+import org.mockito.Mockito._
+import org.mockito.Matchers.{eq => mockEq, _}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
+
+class SparkKernelClientSpec
+  extends TestKit(ActorSystem("SparkKernelClientActorSystem"))
+  with Matchers with MockitoSugar with FunSpecLike with BeforeAndAfter
+{
+  private val TestTargetName = "some target"
+
+  private var mockActorLoader: ActorLoader = _
+  private var mockCommRegistrar: CommRegistrar = _
+  private var sparkKernelClient: SparkKernelClient = _
+  private var executeRequestProbe: TestProbe = _
+  private var shellClientProbe: TestProbe = _
+
+  before {
+    mockActorLoader = mock[ActorLoader]
+    mockCommRegistrar = mock[CommRegistrar]
+
+    executeRequestProbe = TestProbe()
+    when(mockActorLoader.load(MessageType.Incoming.ExecuteRequest))
+      .thenReturn(system.actorSelection(executeRequestProbe.ref.path.toString))
+
+    shellClientProbe = TestProbe()
+    when(mockActorLoader.load(SocketType.ShellClient))
+      .thenReturn(system.actorSelection(shellClientProbe.ref.path.toString))
+
+    sparkKernelClient = new SparkKernelClient(
+      mockActorLoader, system, mockCommRegistrar)
+  }
+
+  describe("SparkKernelClient") {
+    describe("#execute") {
+      it("should send an ExecuteRequest message") {
+        val func = (x: Any) => println(x)
+        sparkKernelClient.execute("val foo = 2")
+        executeRequestProbe.expectMsgClass(classOf[ExecuteRequestTuple])
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/execution/DeferredExecutionTest.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/execution/DeferredExecutionTest.scala b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/execution/DeferredExecutionTest.scala
new file mode 100644
index 0000000..db3205c
--- /dev/null
+++ b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/execution/DeferredExecutionTest.scala
@@ -0,0 +1,235 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.client.execution
+
+import com.ibm.spark.kernel.protocol.v5.content.{StreamContent, ExecuteResult}
+import com.ibm.spark.kernel.protocol.v5.content._
+import com.ibm.spark.kernel.protocol.v5._
+import org.scalatest.{Matchers, FunSpec}
+import org.scalatest.concurrent.ScalaFutures
+
+import scala.concurrent.Promise
+import scala.util.{Try, Failure, Success}
+
+object DeferredExecutionTest {
+  val executeReplyError  = Option(ExecuteReplyError(1, None, None, None))
+  val executeReplyOk     = Option(ExecuteReplyOk(1, None, None))
+  val executeResult      = Option(ExecuteResult(1, Data(), Metadata()))
+
+  def mockSendingKernelMessages(deferredExecution: DeferredExecution,
+                                executeReplyOption: Option[ExecuteReply],
+                                executeResultOption: Option[ExecuteResult]): Unit = {
+    //  Mock behaviour of the kernel sending messages to us
+    executeResultOption.map {
+      executeResult =>
+      deferredExecution.resolveResult(executeResult)
+    }
+    executeReplyOption.map {
+      executeReply=>
+        deferredExecution.resolveReply(executeReply)
+    }
+  }
+  
+  def processExecuteResult(executeResult: ExecuteResult)
+          (implicit promise: Promise[Try[ExecuteResultPromise]]): Unit = {
+    promise.success(Success(new ExecuteResultPromise))
+  }
+
+  def processExecuteReply(executeReplyError: ExecuteReplyError)
+          (implicit promise: Promise[Try[ExecuteReplyPromise]]): Unit = {
+    promise.success(Success(new ExecuteReplyPromise))
+  }
+
+  def processOnSuccessfulCompletion(executeReplyOk: ExecuteReplyOk)
+    (implicit promise: Promise[Try[String]]): Unit = {
+    promise.success(Success("Success"))
+  }
+}
+
+class ExecuteResultPromise {}
+class ExecuteReplyPromise {}
+
+class DeferredExecutionTest extends FunSpec with ScalaFutures with Matchers {
+  import DeferredExecutionTest._
+  describe("DeferredExecution") {
+    describe("onResult( callback )"){
+      it("should run all onResult callbacks when ExecuteResult and " +
+         "successful ExecuteReply are returned") {
+        implicit val executeResultPromise: Promise[Try[ExecuteResultPromise]] = Promise()
+        val deferredExecution: DeferredExecution = DeferredExecution()
+          .onResult(processExecuteResult)
+
+        mockSendingKernelMessages(deferredExecution, executeReplyOk, executeResult)
+
+        whenReady(executeResultPromise.future) {
+          case Success(v) => assert(true)
+          case Failure(exception: Throwable) =>
+            fail("Promise resolved with failure when processing " +
+                 "execute result.", exception)
+          case unknownValue=>
+            fail(s"Promised resolved with unknown value: ${unknownValue}")
+        }
+      }
+      it("should run all onResultCallbacks registered after deferred has " +
+         "been resolved") {
+        val executeResultPromise: Promise[Int] = Promise()
+        var counter = 0
+        def processExecuteResult (executeResult: ExecuteResult) : Unit = {
+          counter = counter + 1
+          // Hack to allow callbacks to occur after meeting our assertion value
+          if(counter == 2) {
+            Thread.sleep(500)
+            executeResultPromise.success(counter)
+          }
+        }
+
+        val deferredExecution: DeferredExecution = DeferredExecution()
+          .onResult(processExecuteResult)
+
+        mockSendingKernelMessages(deferredExecution, executeReplyOk, executeResult)
+        //  register callback after code execution has completed
+        deferredExecution.onResult(processExecuteResult)
+
+        whenReady(executeResultPromise.future){ _ => counter should be(2) }
+
+      }
+      it("should not run onResult callbacks when ExecuteReply is a failure") {
+        //  This promise should be resolved by the deferred
+        implicit val executeReplyPromise: Promise[Try[ExecuteReplyPromise]] = Promise()
+        //  This promise should not be resolved by the deferred
+        implicit val executeResultPromise: Promise[Try[ExecuteResultPromise]] = Promise()
+
+        val deferredExecution: DeferredExecution = DeferredExecution()
+          .onError(processExecuteReply)
+          .onResult(processExecuteResult)
+        mockSendingKernelMessages(deferredExecution, executeReplyError, executeResult)
+
+        whenReady(executeReplyPromise.future) { _ =>
+          executeResultPromise.isCompleted should be(false)
+        }
+      }
+    }
+    describe("onStream( callback )"){
+      it("should execute all streaming callbacks") {
+        var counter = 0
+        val streamingResultPromise: Promise[Int] = Promise()
+        def processStreamContent (streamContent: StreamContent) : Unit = {
+          counter = counter + 1
+          if (counter == 4)
+            streamingResultPromise.success(counter)
+        }
+
+        val deferredExecution: DeferredExecution = DeferredExecution()
+          .onStream(processStreamContent)
+          .onStream(processStreamContent)
+
+        deferredExecution.emitStreamContent(StreamContent("stdout","msg"))
+        deferredExecution.emitStreamContent(StreamContent("stdout","msg2"))
+
+        whenReady(streamingResultPromise.future){ counterValue =>
+          counterValue should be(4)
+        }
+      }
+    }
+    
+    describe("onError( callback )") {
+      it("should run all onError callbacks") {
+        implicit val executeReplyPromise: Promise[Try[ExecuteReplyPromise]] = Promise()
+        val deferredExecution: DeferredExecution = DeferredExecution()
+          .onError(processExecuteReply)
+
+        mockSendingKernelMessages(deferredExecution, executeReplyError, executeResult)
+
+        whenReady(executeReplyPromise.future) {
+          case Success(v) => assert(true)
+          case Failure(exception: Throwable) =>
+            fail("Promised resolved with failure while trying to " +
+                 "process execute result.", exception)
+          case unknownValue=>
+            fail(s"Promised resolved with unknown value: ${unknownValue}")
+        }
+      }
+      it("should not run onError callbacks when ExecuteReply is a success") {
+        //  This promise and callback should not be executed by the deferred
+        implicit val executeReplyPromise: Promise[Try[ExecuteReplyPromise]] = Promise()
+        //  This promise and callback should be executed by the deferred
+        implicit val executeResultPromise: Promise[Try[ExecuteResultPromise]] = Promise()
+
+        val deferredExecution: DeferredExecution = DeferredExecution()
+          .onError(processExecuteReply)
+          .onResult(processExecuteResult)
+
+        mockSendingKernelMessages(deferredExecution, executeReplyOk, executeResult)
+
+        whenReady(executeResultPromise.future) {
+          case _ =>
+            executeReplyPromise.isCompleted should be(false)
+        }
+      }
+    }
+    describe("onSuccessfulCompletion( callback )") {
+      it("should run all onSuccessfulCompletion callbacks on ExecuteReplyOk and ExecuteResult") {
+        implicit val executeCompletePromise: Promise[Try[String]] = Promise()
+        val deferredExecution: DeferredExecution = DeferredExecution()
+          .onSuccess(processOnSuccessfulCompletion)
+
+        mockSendingKernelMessages(deferredExecution, executeReplyOk, executeResult)
+
+        whenReady(executeCompletePromise.future) {
+          case Success(s) =>  //  Nothing to do for the successful case
+          case Failure(exception: Throwable) =>
+            fail("Promised resolved with failure while trying to " +
+              "process execute result.", exception)
+          case unknownValue=>
+            fail(s"Promised resolved with unknown value: ${unknownValue}")
+        }
+      }
+      it("should run all onSuccessfulCompletion callbacks on ExecuteReplyOk and None") {
+        implicit val executeCompletePromise: Promise[Try[String]] = Promise()
+        val deferredExecution: DeferredExecution = DeferredExecution()
+          .onSuccess(processOnSuccessfulCompletion)
+
+        mockSendingKernelMessages(deferredExecution, executeReplyOk, None)
+
+        whenReady(executeCompletePromise.future) {
+          case Success(s) =>  //  Nothing to do for the successful case
+          case Failure(exception: Throwable) =>
+            fail("Promised resolved with failure while trying to " +
+              "process execute result.", exception)
+          case unknownValue=>
+            fail(s"Promised resolved with unknown value: ${unknownValue}")
+        }
+      }
+      it("should not run onSuccessfulCompletion callbacks on ExecuteReplyError") {
+        implicit val executeCompletePromise: Promise[Try[String]] = Promise()
+        //  This promise and callback should not be executed by the deferred
+        implicit val executeReplyPromise: Promise[Try[ExecuteReplyPromise]] = Promise()
+
+        val deferredExecution: DeferredExecution = DeferredExecution()
+          .onError(processExecuteReply)
+          .onSuccess(processOnSuccessfulCompletion)
+
+        mockSendingKernelMessages(deferredExecution, executeReplyError, executeResult)
+
+        whenReady(executeReplyPromise.future) {
+          case _ =>
+            executeCompletePromise.isCompleted should be(false)
+        }
+      }
+    }
+  }
+}