You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lb...@apache.org on 2016/01/22 23:07:35 UTC

[27/51] [abbrv] incubator-toree git commit: Moved scala files to new locations based on new package

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/HeartbeatClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/HeartbeatClientSpec.scala b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/HeartbeatClientSpec.scala
new file mode 100644
index 0000000..9fdd702
--- /dev/null
+++ b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/HeartbeatClientSpec.scala
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.client.socket
+
+import akka.actor.{ActorRef, ActorSystem, Props}
+import akka.testkit.{TestProbe, ImplicitSender, TestKit}
+import com.ibm.spark.communication.ZMQMessage
+import com.ibm.spark.kernel.protocol.v5.client.ActorLoader
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, FunSpecLike}
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+
+class HeartbeatClientSpec extends TestKit(ActorSystem("HeartbeatActorSpec"))
+  with ImplicitSender with FunSpecLike with Matchers with MockitoSugar {
+
+  describe("HeartbeatClientActor") {
+    val socketFactory = mock[SocketFactory]
+    val mockActorLoader = mock[ActorLoader]
+    val probe : TestProbe = TestProbe()
+    when(socketFactory.HeartbeatClient(any(classOf[ActorSystem]), any(classOf[ActorRef]))).thenReturn(probe.ref)
+
+    val heartbeatClient = system.actorOf(Props(
+      classOf[HeartbeatClient], socketFactory, mockActorLoader, true
+    ))
+
+    describe("send heartbeat") {
+      it("should send ping ZMQMessage") {
+        heartbeatClient ! HeartbeatMessage
+        probe.expectMsgClass(classOf[ZMQMessage])
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/IOPubClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/IOPubClientSpec.scala b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/IOPubClientSpec.scala
new file mode 100644
index 0000000..b592dcd
--- /dev/null
+++ b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/IOPubClientSpec.scala
@@ -0,0 +1,300 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.client.socket
+
+import java.util.UUID
+
+import akka.actor.{ActorRef, ActorSystem, Props}
+import akka.pattern.ask
+import akka.testkit.{ImplicitSender, TestKit, TestProbe}
+import akka.util.Timeout
+import com.ibm.spark.comm.{CommCallbacks, CommRegistrar, CommStorage, CommWriter}
+import com.ibm.spark.communication.ZMQMessage
+import com.ibm.spark.kernel.protocol.v5
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.kernel.protocol.v5.client.Utilities._
+import com.ibm.spark.kernel.protocol.v5.client.execution.{DeferredExecution, DeferredExecutionManager}
+import com.ibm.spark.kernel.protocol.v5.client.{ActorLoader, Utilities}
+import com.ibm.spark.kernel.protocol.v5.content.{CommClose, CommMsg, CommOpen, StreamContent}
+import com.typesafe.config.ConfigFactory
+import org.mockito.Matchers.{eq => mockEq, _}
+import org.mockito.Mockito._
+import org.scalatest.concurrent.{Eventually, ScalaFutures}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.time.{Milliseconds, Span}
+import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
+import play.api.libs.json.Json
+
+import scala.concurrent.duration._
+import scala.concurrent.{Future, Promise}
+import scala.util.Failure
+
+object IOPubClientSpec {
+  val config ="""
+    akka {
+      loglevel = "WARNING"
+    }"""
+}
+
+class IOPubClientSpec extends TestKit(ActorSystem(
+  "IOPubClientSpecSystem", ConfigFactory.parseString(IOPubClientSpec.config)
+)) with ImplicitSender with FunSpecLike with Matchers with MockitoSugar
+  with ScalaFutures with BeforeAndAfter with Eventually
+{
+  private val TestTimeout = Timeout(10.seconds)
+  implicit override val patienceConfig = PatienceConfig(
+    timeout = scaled(Span(200, Milliseconds)),
+    interval = scaled(Span(5, Milliseconds))
+  )
+  private val SignatureEnabled = true
+
+  private var clientSocketProbe: TestProbe = _
+  private var mockClientSocketFactory: SocketFactory = _
+  private var mockActorLoader: ActorLoader = _
+  private var mockCommRegistrar: CommRegistrar = _
+  private var spyCommStorage: CommStorage = _
+  private var mockCommCallbacks: CommCallbacks = _
+  private var ioPubClient: ActorRef = _
+
+  private var kmBuilder: KMBuilder = _
+
+  private val id = UUID.randomUUID().toString
+  private val TestTargetName = "some target"
+  private val TestCommId = UUID.randomUUID().toString
+
+  before {
+    kmBuilder = KMBuilder()
+    mockCommCallbacks = mock[CommCallbacks]
+    mockCommRegistrar = mock[CommRegistrar]
+
+    spyCommStorage = spy(new CommStorage())
+
+    clientSocketProbe = TestProbe()
+    mockActorLoader = mock[ActorLoader]
+    mockClientSocketFactory = mock[SocketFactory]
+
+    //  Stub the return value for the socket factory
+    when(mockClientSocketFactory.IOPubClient(anyObject(), any[ActorRef]))
+      .thenReturn(clientSocketProbe.ref)
+
+    //  Construct the object we will test against
+    ioPubClient = system.actorOf(Props(
+      classOf[IOPubClient], mockClientSocketFactory, mockActorLoader,
+      SignatureEnabled, mockCommRegistrar, spyCommStorage
+    ))
+  }
+
+  describe("IOPubClient") {
+    describe("#receive") {
+      it("should execute all Comm open callbacks on comm_open message") {
+        val message: ZMQMessage = kmBuilder
+          .withHeader(CommOpen.toTypeString)
+          .withContentString(CommOpen(TestCommId, TestTargetName, v5.MsgData.Empty))
+          .build
+
+        // Mark as target being provided
+        doReturn(Some(mockCommCallbacks)).when(spyCommStorage)
+          .getTargetCallbacks(anyString())
+
+        // Simulate receiving a message from the kernel
+        ioPubClient ! message
+
+        // Check to see if "eventually" the callback is triggered
+        eventually {
+          verify(mockCommCallbacks).executeOpenCallbacks(
+            any[CommWriter], mockEq(TestCommId),
+            mockEq(TestTargetName), any[v5.MsgData])
+        }
+      }
+
+      it("should not execute Comm open callbacks if the target is not found") {
+        val message: ZMQMessage = kmBuilder
+          .withHeader(CommOpen.toTypeString)
+          .withContentString(CommOpen(TestCommId, TestTargetName, v5.MsgData.Empty))
+          .build
+
+        // Mark as target NOT being provided
+        doReturn(None).when(spyCommStorage).getTargetCallbacks(anyString())
+
+        // Simulate receiving a message from the kernel
+        ioPubClient ! message
+
+        // Check to see if "eventually" the callback is NOT triggered
+        eventually {
+          // Check that we have checked if the target exists
+          verify(spyCommStorage).getTargetCallbacks(TestTargetName)
+
+          verify(mockCommCallbacks, never()).executeOpenCallbacks(
+            any[CommWriter], mockEq(TestCommId),
+            mockEq(TestTargetName), any[v5.MsgData])
+          verify(mockCommRegistrar, never()).link(TestTargetName, TestCommId)
+        }
+      }
+
+      it("should execute all Comm msg callbacks on comm_msg message") {
+        val message: ZMQMessage = kmBuilder
+          .withHeader(CommMsg.toTypeString)
+          .withContentString(CommMsg(TestCommId, v5.MsgData.Empty))
+          .build
+
+        // Mark as target being provided
+        doReturn(Some(mockCommCallbacks)).when(spyCommStorage)
+          .getCommIdCallbacks(any[v5.UUID])
+
+        // Simulate receiving a message from the kernel
+        ioPubClient ! message
+
+        // Check to see if "eventually" the callback is triggered
+        eventually {
+          verify(mockCommCallbacks).executeMsgCallbacks(
+            any[CommWriter], mockEq(TestCommId), any[v5.MsgData])
+        }
+      }
+
+      it("should not execute Comm msg callbacks if the Comm id is not found") {
+        val message: ZMQMessage = kmBuilder
+          .withHeader(CommMsg.toTypeString)
+          .withContentString(CommMsg(TestCommId, v5.MsgData.Empty))
+          .build
+
+        // Mark as target NOT being provided
+        doReturn(None).when(spyCommStorage).getCommIdCallbacks(any[v5.UUID])
+
+        // Simulate receiving a message from the kernel
+        ioPubClient ! message
+
+        // Check to see if "eventually" the callback is NOT triggered
+        eventually {
+          // Check that we have checked if the target exists
+          verify(spyCommStorage).getCommIdCallbacks(TestCommId)
+
+          verify(mockCommCallbacks, never()).executeMsgCallbacks(
+            any[CommWriter], mockEq(TestCommId), any[v5.MsgData])
+        }
+      }
+
+      it("should execute all Comm close callbacks on comm_close message") {
+        val message: ZMQMessage = kmBuilder
+          .withHeader(CommClose.toTypeString)
+          .withContentString(CommClose(TestCommId, v5.MsgData.Empty))
+          .build
+
+        // Mark as target being provided
+        doReturn(Some(mockCommCallbacks)).when(spyCommStorage)
+          .getCommIdCallbacks(any[v5.UUID])
+
+        // Simulate receiving a message from the kernel
+        ioPubClient ! message
+
+        // Check to see if "eventually" the callback is triggered
+        eventually {
+          verify(mockCommCallbacks).executeCloseCallbacks(
+            any[CommWriter], mockEq(TestCommId), any[v5.MsgData])
+        }
+      }
+
+      it("should not execute Comm close callbacks if Comm id is not found") {
+        val message: ZMQMessage = kmBuilder
+          .withHeader(CommClose.toTypeString)
+          .withContentString(CommClose(TestCommId, v5.MsgData.Empty))
+          .build
+
+        // Mark as target NOT being provided
+        doReturn(None).when(spyCommStorage).getCommIdCallbacks(any[v5.UUID])
+
+        // Simulate receiving a message from the kernel
+        ioPubClient ! message
+
+        // Check to see if "eventually" the callback is NOT triggered
+        eventually {
+          // Check that we have checked if the target exists
+          verify(spyCommStorage).getCommIdCallbacks(TestCommId)
+
+          verify(mockCommCallbacks, never()).executeCloseCallbacks(
+            any[CommWriter], mockEq(TestCommId), any[v5.MsgData])
+        }
+      }
+
+      it("should call a registered callback on stream message") {
+        val result = StreamContent("foo", "bar")
+        val header = Header(id, "spark", id,
+          MessageType.Outgoing.Stream.toString, "5.0")
+        val parentHeader = Header(id, "spark", id,
+          MessageType.Incoming.ExecuteRequest.toString, "5.0")
+
+        val kernelMessage = new KernelMessage(
+          Seq[String](),
+          "",
+          header,
+          parentHeader,
+          Metadata(),
+          Json.toJson(result).toString()
+        )
+        val promise: Promise[String] = Promise()
+        val de: DeferredExecution = DeferredExecution().onStream(
+          (content: StreamContent) => {
+            promise.success(content.text)
+          }
+        )
+        DeferredExecutionManager.add(id, de)
+        // Send the message to the IOPubClient
+        val zmqMessage: ZMQMessage = kernelMessage
+
+        ioPubClient ! zmqMessage
+
+        whenReady(promise.future) {
+          case res: String =>
+            res should be eq("bar")
+          case _ =>
+            fail(s"Received failure when asking IOPubClient")
+        }
+      }
+
+      it("should not invoke callback when stream message's parent header is null") {
+        // Construct the kernel message
+        val result = StreamContent("foo", "bar")
+        val header = Header(id, "spark", id,
+          MessageType.Outgoing.Stream.toString, "5.0")
+
+        val kernelMessage = new KernelMessage(
+          Seq[String](),
+          "",
+          header,
+          null,
+          Metadata(),
+          Json.toJson(result).toString()
+        )
+
+        // Send the message to the IOPubClient
+        val zmqMessage: ZMQMessage = kernelMessage
+        val futureResult: Future[Any] = ioPubClient.ask(zmqMessage)(TestTimeout)
+        whenReady(futureResult) {
+          case result: Failure[Any] =>
+            //  Getting the value of the failure will cause the underlying exception will be thrown
+            try {
+              result.get
+            } catch {
+              case t:RuntimeException =>
+                t.getMessage should be("Parent Header was null in Kernel Message.")
+            }
+          case result =>
+            fail(s"Did not receive failure!! ${result}")
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/ShellClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/ShellClientSpec.scala b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/ShellClientSpec.scala
new file mode 100644
index 0000000..0110dfd
--- /dev/null
+++ b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/ShellClientSpec.scala
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.client.socket
+
+import java.util.UUID
+
+import akka.actor.{ActorRef, ActorSystem, Props}
+import akka.testkit.{TestProbe, ImplicitSender, TestKit}
+import com.ibm.spark.communication.ZMQMessage
+import com.ibm.spark.communication.security.SecurityActorType
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.kernel.protocol.v5.client.ActorLoader
+import com.ibm.spark.kernel.protocol.v5.content.ExecuteRequest
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, FunSpecLike}
+import org.mockito.Mockito._
+import org.mockito.Matchers._
+import play.api.libs.json.Json
+
+class ShellClientSpec extends TestKit(ActorSystem("ShellActorSpec"))
+  with ImplicitSender with FunSpecLike with Matchers with MockitoSugar {
+  private val SignatureEnabled = true
+
+  describe("ShellClientActor") {
+    val socketFactory = mock[SocketFactory]
+    val mockActorLoader = mock[ActorLoader]
+    val probe : TestProbe = TestProbe()
+    when(socketFactory.ShellClient(
+      any(classOf[ActorSystem]), any(classOf[ActorRef])
+    )).thenReturn(probe.ref)
+
+    val signatureManagerProbe = TestProbe()
+    doReturn(system.actorSelection(signatureManagerProbe.ref.path.toString))
+      .when(mockActorLoader).load(SecurityActorType.SignatureManager)
+
+    val shellClient = system.actorOf(Props(
+      classOf[ShellClient], socketFactory, mockActorLoader, SignatureEnabled
+    ))
+
+    describe("send execute request") {
+      it("should send execute request") {
+        val request = ExecuteRequest(
+          "foo", false, true, UserExpressions(), true
+        )
+        val header = Header(
+          UUID.randomUUID().toString, "spark",
+          UUID.randomUUID().toString, MessageType.Incoming.ExecuteRequest.toString,
+          "5.0"
+        )
+        val kernelMessage = KernelMessage(
+          Seq[String](), "",
+          header, HeaderBuilder.empty,
+          Metadata(), Json.toJson(request).toString
+        )
+        shellClient ! kernelMessage
+
+        // Echo back the kernel message sent to have a signature injected
+        signatureManagerProbe.expectMsgClass(classOf[KernelMessage])
+        signatureManagerProbe.reply(kernelMessage)
+
+        probe.expectMsgClass(classOf[ZMQMessage])
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/StdinClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/StdinClientSpec.scala b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/StdinClientSpec.scala
new file mode 100644
index 0000000..877c4f5
--- /dev/null
+++ b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/StdinClientSpec.scala
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.client.socket
+
+import akka.actor.{ActorRef, Props, ActorSystem}
+import akka.testkit.{TestProbe, ImplicitSender, TestKit}
+import com.ibm.spark.communication.ZMQMessage
+import com.ibm.spark.communication.security.SecurityActorType
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.kernel.protocol.v5.client.ActorLoader
+import com.ibm.spark.kernel.protocol.v5.client.socket.StdinClient.ResponseFunction
+import com.ibm.spark.kernel.protocol.v5.content.{InputReply, InputRequest, ClearOutput, ExecuteRequest}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
+import com.ibm.spark.kernel.protocol.v5.client.Utilities._
+import play.api.libs.json.Json
+import scala.concurrent.duration._
+
+import org.mockito.Mockito._
+import org.mockito.Matchers._
+
+class StdinClientSpec extends TestKit(ActorSystem("StdinActorSpec"))
+  with ImplicitSender with FunSpecLike with Matchers with MockitoSugar
+  with BeforeAndAfter
+{
+  private val SignatureEnabled = true
+  private val TestReplyString = "some value"
+  private val TestResponseFunc: ResponseFunction = (_, _) => TestReplyString
+
+  private var mockSocketFactory: SocketFactory = _
+  private var mockActorLoader: ActorLoader = _
+  private var signatureManagerProbe: TestProbe = _
+  private var socketProbe: TestProbe = _
+  private var stdinClient: ActorRef = _
+
+  before {
+    socketProbe = TestProbe()
+    signatureManagerProbe = TestProbe()
+    mockSocketFactory = mock[SocketFactory]
+    mockActorLoader = mock[ActorLoader]
+    doReturn(system.actorSelection(signatureManagerProbe.ref.path.toString))
+      .when(mockActorLoader).load(SecurityActorType.SignatureManager)
+    doReturn(socketProbe.ref).when(mockSocketFactory)
+      .StdinClient(any[ActorSystem], any[ActorRef])
+
+    stdinClient = system.actorOf(Props(
+      classOf[StdinClient], mockSocketFactory, mockActorLoader, SignatureEnabled
+    ))
+
+    // Set the response function for our client socket
+    stdinClient ! TestResponseFunc
+  }
+
+  describe("StdinClient") {
+    describe("#receive") {
+      it("should update the response function if receiving a new one") {
+        val expected = "some other value"
+        val replacementFunc: ResponseFunction = (_, _) => expected
+
+        // Update the function
+        stdinClient ! replacementFunc
+
+        val inputRequestMessage: ZMQMessage = KMBuilder()
+          .withHeader(InputRequest.toTypeString)
+          .withContentString(InputRequest("", false))
+          .build
+
+        stdinClient ! inputRequestMessage
+
+        // Echo back the kernel message sent to have a signature injected
+        signatureManagerProbe.expectMsgPF() {
+          case kernelMessage: KernelMessage =>
+            signatureManagerProbe.reply(kernelMessage)
+            true
+        }
+
+        socketProbe.expectMsgPF() {
+          case zmqMessage: ZMQMessage =>
+            val kernelMessage: KernelMessage = zmqMessage
+            val inputReply =
+              Json.parse(kernelMessage.contentString).as[InputReply]
+            inputReply.value should be (expected)
+        }
+      }
+
+      it("should do nothing if the incoming message is not an input_request") {
+        val notInputRequestMessage: ZMQMessage = KMBuilder()
+          .withHeader(ClearOutput.toTypeString)
+          .build
+
+        stdinClient ! notInputRequestMessage
+
+        socketProbe.expectNoMsg(300.milliseconds)
+      }
+
+      it("should respond with an input_reply if the incoming message is " +
+        "an input_request") {
+        val inputRequestMessage: ZMQMessage = KMBuilder()
+          .withHeader(InputRequest.toTypeString)
+          .withContentString(InputRequest("", false))
+          .build
+
+        stdinClient ! inputRequestMessage
+
+        // Echo back the kernel message sent to have a signature injected
+        signatureManagerProbe.expectMsgPF() {
+          case kernelMessage: KernelMessage =>
+            signatureManagerProbe.reply(kernelMessage)
+            true
+        }
+
+        socketProbe.expectMsgPF() {
+          case zmqMessage: ZMQMessage =>
+            val kernelMessage: KernelMessage = zmqMessage
+            val messageType = kernelMessage.header.msg_type
+            messageType should be (InputReply.toTypeString)
+        }
+      }
+
+      it("should use the result from the response function if the incoming " +
+        "message is an input_request") {
+        val inputRequestMessage: ZMQMessage = KMBuilder()
+          .withHeader(InputRequest.toTypeString)
+          .withContentString(InputRequest("", false))
+          .build
+
+        stdinClient ! inputRequestMessage
+
+        // Echo back the kernel message sent to have a signature injected
+        signatureManagerProbe.expectMsgPF() {
+          case kernelMessage: KernelMessage =>
+            signatureManagerProbe.reply(kernelMessage)
+            true
+        }
+
+        socketProbe.expectMsgPF() {
+          case zmqMessage: ZMQMessage =>
+            val kernelMessage: KernelMessage = zmqMessage
+            val inputReply =
+              Json.parse(kernelMessage.contentString).as[InputReply]
+            inputReply.value should be (TestReplyString)
+        }
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/SocketManager.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/SocketManager.scala b/communication/src/main/scala/com/ibm/spark/communication/SocketManager.scala
deleted file mode 100644
index 994360f..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/SocketManager.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.ibm.spark.communication
-
-import java.util.UUID
-import java.util.concurrent.ConcurrentHashMap
-import com.ibm.spark.communication.socket._
-import org.zeromq.ZMQ
-
-import scala.collection.JavaConverters._
-
-/**
- * Represents the factory for sockets that also manages ZMQ contexts and
- * facilitates closing of sockets created by the factory.
- */
-class SocketManager {
-  /**
-   * Creates a new ZMQ context with a single IO thread.
-   *
-   * @return The new ZMQ context
-   */
-  protected def newZmqContext(): ZMQ.Context = ZMQ.context(1)
-
-  private val socketToContextMap =
-    new ConcurrentHashMap[SocketLike, ZMQ.Context]().asScala
-
-  /**
-   * Provides and registers a new ZMQ context, used for creating a new socket.
-   * @param mkSocket a function that creates a socket using a given context
-   * @return the new socket
-   * @see newZmqContext
-   */
-  private def withNewContext[A <: SocketLike](mkSocket: ZMQ.Context => A): A = {
-    val ctx = newZmqContext()
-    val socket = mkSocket(ctx)
-    socketToContextMap.put(socket, ctx)
-    socket
-  }
-
-  /**
-   * Closes the socket provided and also closes the context if no more sockets
-   * are using the context.
-   *
-   * @param socket The socket to close
-   */
-  def closeSocket(socket: SocketLike) = {
-    socket.close()
-
-    socketToContextMap.remove(socket).foreach(context => {
-      if (!socketToContextMap.values.exists(_ == context)) context.close()
-    })
-  }
-
-  /**
-   * Creates a new request socket.
-   *
-   * @param address The address to associate with the socket
-   * @param inboundMessageCallback The callback to use for incoming messages
-   *
-   * @return The new socket instance
-   */
-  def newReqSocket(
-    address: String,
-    inboundMessageCallback: (Seq[String]) => Unit
-  ): SocketLike = withNewContext{ ctx =>
-     new JeroMQSocket(new ReqSocketRunnable(
-      ctx,
-      Some(inboundMessageCallback),
-      Connect(address),
-      Linger(0)
-    ))
-  }
-
-  /**
-   * Creates a new reply socket.
-   *
-   * @param address The address to associate with the socket
-   * @param inboundMessageCallback The callback to use for incoming messages
-   *
-   * @return The new socket instance
-   */
-  def newRepSocket(
-    address: String,
-    inboundMessageCallback: (Seq[String]) => Unit
-  ): SocketLike = withNewContext{ ctx =>
-    new JeroMQSocket(new ZeroMQSocketRunnable(
-      ctx,
-      RepSocket,
-      Some(inboundMessageCallback),
-      Bind(address),
-      Linger(0)
-    ))
-  }
-
-  /**
-   * Creates a new publish socket.
-   *
-   * @param address The address to associate with the socket
-   *
-   * @return The new socket instance
-   */
-  def newPubSocket(
-    address: String
-  ): SocketLike = withNewContext{ ctx =>
-    new JeroMQSocket(new PubSocketRunnable(
-      ctx,
-      Bind(address),
-      Linger(0)
-    ))
-  }
-
-  /**
-   * Creates a new subscribe socket.
-   *
-   * @param address The address to associate with the socket
-   * @param inboundMessageCallback The callback to use for incoming messages
-   *
-   * @return The new socket instance
-   */
-  def newSubSocket(
-    address: String,
-    inboundMessageCallback: (Seq[String]) => Unit
-  ): SocketLike = withNewContext { ctx =>
-    new JeroMQSocket(new ZeroMQSocketRunnable(
-      ctx,
-      SubSocket,
-      Some(inboundMessageCallback),
-      Connect(address),
-      Linger(0),
-      Subscribe.all
-    ))
-  }
-
-  /**
-   * Creates a new router socket.
-   *
-   * @param address The address to associate with the socket
-   * @param inboundMessageCallback The callback to use for incoming messages
-   *
-   * @return The new socket instance
-   */
-  def newRouterSocket(
-    address: String,
-    inboundMessageCallback: (Seq[String]) => Unit
-  ): SocketLike = withNewContext { ctx =>
-    new JeroMQSocket(new ZeroMQSocketRunnable(
-      ctx,
-      RouterSocket,
-      Some(inboundMessageCallback),
-      Bind(address),
-      Linger(0)
-    ))
-  }
-
-  /**
-   * Creates a new dealer socket.
-   *
-   * @param address The address to associate with the socket
-   * @param inboundMessageCallback The callback to use for incoming messages
-   *
-   * @return The new socket instance
-   */
-  def newDealerSocket(
-    address: String,
-    inboundMessageCallback: (Seq[String]) => Unit,
-    identity: String = UUID.randomUUID().toString
-  ): SocketLike = withNewContext{ ctx =>
-    new JeroMQSocket(new ZeroMQSocketRunnable(
-      ctx,
-      DealerSocket,
-      Some(inboundMessageCallback),
-      Connect(address),
-      Linger(0),
-      Identity(identity.getBytes(ZMQ.CHARSET))
-    ))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/ZMQMessage.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/ZMQMessage.scala b/communication/src/main/scala/com/ibm/spark/communication/ZMQMessage.scala
deleted file mode 100644
index ffa7705..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/ZMQMessage.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.ibm.spark.communication
-
-import akka.util.ByteString
-
-/**
- * Represents a ZeroMQ message containing a collection of Akka ByteString
- * instances.
- *
- * @note This is left in for backwards compatibility!
- *
- * @param frames The collection of Akka ByteString instances
- */
-case class ZMQMessage(frames: ByteString*) {
-  def frame(i: Int) = frames(i)
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/actors/DealerSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/actors/DealerSocketActor.scala b/communication/src/main/scala/com/ibm/spark/communication/actors/DealerSocketActor.scala
deleted file mode 100644
index 0f0497d..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/actors/DealerSocketActor.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.ibm.spark.communication.actors
-
-import akka.actor.{Actor, ActorRef}
-import akka.util.ByteString
-import com.ibm.spark.communication.{ZMQMessage, SocketManager}
-import com.ibm.spark.utils.LogLike
-import org.zeromq.ZMQ
-
-/**
- * Represents an actor containing a dealer socket.
- *
- * @param connection The address to connect to
- * @param listener The actor to send incoming messages back to
- */
-class DealerSocketActor(connection: String, listener: ActorRef)
-  extends Actor with LogLike
-{
-  logger.debug(s"Initializing dealer socket actor for $connection")
-  private val manager: SocketManager = new SocketManager
-  private val socket = manager.newDealerSocket(connection, (message: Seq[String]) => {
-    listener ! ZMQMessage(message.map(ByteString.apply): _*)
-  })
-
-  override def postStop(): Unit = {
-    manager.closeSocket(socket)
-  }
-
-  override def receive: Actor.Receive = {
-    case zmqMessage: ZMQMessage =>
-      val frames = zmqMessage.frames.map(byteString =>
-        new String(byteString.toArray, ZMQ.CHARSET))
-      socket.send(frames: _*)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/actors/PubSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/actors/PubSocketActor.scala b/communication/src/main/scala/com/ibm/spark/communication/actors/PubSocketActor.scala
deleted file mode 100644
index f74764e..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/actors/PubSocketActor.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.ibm.spark.communication.actors
-
-import akka.actor.Actor
-import com.ibm.spark.communication.utils.OrderedSupport
-import com.ibm.spark.communication.{SocketManager, ZMQMessage}
-import com.ibm.spark.kernel.protocol.v5.KernelMessage
-import com.ibm.spark.utils.LogLike
-import org.zeromq.ZMQ
-
-/**
- * Represents an actor containing a publish socket.
- *
- * Note: OrderedSupport is used to ensure correct processing order.
- *       A similar pattern may be useful for other socket actors if
- *       issues arise in the future.
- *
- * @param connection The address to bind to
- */
-class PubSocketActor(connection: String)
-  extends Actor with LogLike with OrderedSupport
-{
-  logger.debug(s"Initializing publish socket actor for $connection")
-  private val manager: SocketManager = new SocketManager
-  private val socket = manager.newPubSocket(connection)
-
-  override def postStop(): Unit = {
-    manager.closeSocket(socket)
-  }
-
-  override def receive: Actor.Receive = {
-    case zmqMessage: ZMQMessage => withProcessing {
-      val frames = zmqMessage.frames.map(byteString =>
-        new String(byteString.toArray, ZMQ.CHARSET))
-
-      socket.send(frames: _*)
-    }
-  }
-
-  /**
-   * Defines the types that will be stashed by {@link #waiting() waiting}
-   * while the Actor is in processing state.
-   * @return
-   */
-  override def orderedTypes(): Seq[Class[_]] = Seq(classOf[ZMQMessage])
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/actors/RepSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/actors/RepSocketActor.scala b/communication/src/main/scala/com/ibm/spark/communication/actors/RepSocketActor.scala
deleted file mode 100644
index b8643f5..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/actors/RepSocketActor.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.ibm.spark.communication.actors
-
-import akka.actor.{Actor, ActorRef}
-import akka.util.ByteString
-import com.ibm.spark.communication.{SocketManager, ZMQMessage}
-import com.ibm.spark.utils.LogLike
-import org.zeromq.ZMQ
-
-/**
- * Represents an actor containing a reply socket.
- *
- * @param connection The address to bind to
- * @param listener The actor to send incoming messages back to
- */
-class RepSocketActor(connection: String, listener: ActorRef)
-  extends Actor with LogLike
-{
-  logger.debug(s"Initializing reply socket actor for $connection")
-  private val manager: SocketManager = new SocketManager
-  private val socket = manager.newRepSocket(connection, (message: Seq[String]) => {
-    listener ! ZMQMessage(message.map(ByteString.apply): _*)
-  })
-
-  override def postStop(): Unit = {
-    manager.closeSocket(socket)
-  }
-
-  override def receive: Actor.Receive = {
-    case zmqMessage: ZMQMessage =>
-      val frames = zmqMessage.frames.map(byteString =>
-        new String(byteString.toArray, ZMQ.CHARSET))
-      socket.send(frames: _*)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/actors/ReqSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/actors/ReqSocketActor.scala b/communication/src/main/scala/com/ibm/spark/communication/actors/ReqSocketActor.scala
deleted file mode 100644
index e38f2a0..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/actors/ReqSocketActor.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.ibm.spark.communication.actors
-
-import akka.actor.{Actor, ActorRef}
-import akka.util.ByteString
-import com.ibm.spark.communication.{ZMQMessage, SocketManager}
-import com.ibm.spark.utils.LogLike
-import org.zeromq.ZMQ
-
-/**
- * Represents an actor containing a request socket.
- *
- * @param connection The address to connect to
- * @param listener The actor to send incoming messages back to
- */
-class ReqSocketActor(connection: String, listener: ActorRef)
-  extends Actor with LogLike
-{
-  logger.debug(s"Initializing request socket actor for $connection")
-  private val manager: SocketManager = new SocketManager
-  private val socket = manager.newReqSocket(connection, (message: Seq[String]) => {
-    listener ! ZMQMessage(message.map(ByteString.apply): _*)
-  })
-
-  override def postStop(): Unit = {
-    manager.closeSocket(socket)
-  }
-
-  override def receive: Actor.Receive = {
-    case zmqMessage: ZMQMessage =>
-      val frames = zmqMessage.frames.map(byteString =>
-        new String(byteString.toArray, ZMQ.CHARSET))
-      socket.send(frames: _*)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/actors/RouterSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/actors/RouterSocketActor.scala b/communication/src/main/scala/com/ibm/spark/communication/actors/RouterSocketActor.scala
deleted file mode 100644
index 6aa3bc5..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/actors/RouterSocketActor.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.ibm.spark.communication.actors
-
-import akka.actor.{Actor, ActorRef}
-import akka.util.ByteString
-import com.ibm.spark.communication.{SocketManager, ZMQMessage}
-import com.ibm.spark.utils.LogLike
-import org.zeromq.ZMQ
-
-/**
- * Represents an actor containing a router socket.
- *
- * @param connection The address to bind to
- * @param listener The actor to send incoming messages back to
- */
-class RouterSocketActor(connection: String, listener: ActorRef)
-  extends Actor with LogLike
-{
-  logger.debug(s"Initializing router socket actor for $connection")
-  private val manager: SocketManager = new SocketManager
-  private val socket = manager.newRouterSocket(connection, (message: Seq[String]) => {
-    listener ! ZMQMessage(message.map(ByteString.apply): _*)
-  })
-
-  override def postStop(): Unit = {
-    manager.closeSocket(socket)
-  }
-
-  override def receive: Actor.Receive = {
-    case zmqMessage: ZMQMessage =>
-      val frames = zmqMessage.frames.map(byteString =>
-        new String(byteString.toArray, ZMQ.CHARSET))
-      socket.send(frames: _*)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/actors/SubSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/actors/SubSocketActor.scala b/communication/src/main/scala/com/ibm/spark/communication/actors/SubSocketActor.scala
deleted file mode 100644
index 8fef496..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/actors/SubSocketActor.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.ibm.spark.communication.actors
-
-import akka.actor.{Actor, ActorRef}
-import akka.util.ByteString
-import com.ibm.spark.communication.{ZMQMessage, SocketManager}
-import com.ibm.spark.utils.LogLike
-
-/**
- * Represents an actor containing a subscribe socket.
- *
- * @param connection The address to connect to
- * @param listener The actor to send incoming messages back to
- */
-class SubSocketActor(connection: String, listener: ActorRef)
-  extends Actor with LogLike
-{
-  logger.debug(s"Initializing subscribe socket actor for $connection")
-  private val manager: SocketManager = new SocketManager
-  private val socket = manager.newSubSocket(connection, (message: Seq[String]) => {
-    listener ! ZMQMessage(message.map(ByteString.apply): _*)
-  })
-
-  override def postStop(): Unit = {
-    manager.closeSocket(socket)
-  }
-
-  override def receive: Actor.Receive = {
-    case _ =>
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/security/Hmac.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/security/Hmac.scala b/communication/src/main/scala/com/ibm/spark/communication/security/Hmac.scala
deleted file mode 100644
index 9f44177..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/security/Hmac.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.communication.security
-
-import javax.crypto.Mac
-import javax.crypto.spec.SecretKeySpec
-
-import com.ibm.spark.communication.security.HmacAlgorithm.HmacAlgorithm
-
-object HmacAlgorithm extends Enumeration {
-  type HmacAlgorithm = Value
-
-  def apply(key: String) = Value(key)
-
-  val MD5     = Value("HmacMD5")
-  val SHA1    = Value("HmacSHA1")
-  val SHA256  = Value("HmacSHA256")
-}
-
-object Hmac {
-  
-  def apply(key: String, algorithm: HmacAlgorithm = HmacAlgorithm.SHA256) =
-    new Hmac(key, algorithm)
-  
-  def newMD5(key: String): Hmac     = this(key, HmacAlgorithm.MD5)
-  def newSHA1(key: String): Hmac    = this(key, HmacAlgorithm.SHA1)
-  def newSHA256(key: String): Hmac  = this(key, HmacAlgorithm.SHA256)
-}
-
-class Hmac(
-  val key: String,
-  val algorithm: HmacAlgorithm = HmacAlgorithm.SHA256
-) {
-
-  private var mac: Mac = _
-  private var secretKeySpec: SecretKeySpec = _
-
-  if (key.nonEmpty) {
-    mac = Mac.getInstance(algorithm.toString)
-    secretKeySpec = new SecretKeySpec(key.getBytes, algorithm.toString)
-    mac.init(secretKeySpec)
-  }
-
-  def apply(items: String*): String = digest(items)
-
-  def digest(items: Seq[String]): String = if (key.nonEmpty) {
-    mac synchronized {
-      items.map(_.getBytes("UTF-8")).foreach(mac.update)
-      mac.doFinal().map("%02x" format _).mkString
-    }
-  } else ""
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/security/SignatureCheckerActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/security/SignatureCheckerActor.scala b/communication/src/main/scala/com/ibm/spark/communication/security/SignatureCheckerActor.scala
deleted file mode 100644
index c3fabd7..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/security/SignatureCheckerActor.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.communication.security
-
-import akka.actor.Actor
-import com.ibm.spark.communication.utils.OrderedSupport
-import com.ibm.spark.utils.LogLike
-
-/**
- * Verifies whether or not a kernel message has a valid signature.
- * @param hmac The HMAC to use for signature validation
- */
-class SignatureCheckerActor(
-  private val hmac: Hmac
-) extends Actor with LogLike with OrderedSupport {
-  override def receive: Receive = {
-    case (signature: String, blob: Seq[_]) => withProcessing {
-      val stringBlob: Seq[String] = blob.map(_.toString)
-      val hmacString = hmac(stringBlob: _*)
-      val isValidSignature = hmacString == signature
-      logger.trace(s"Signature ${signature} validity checked against " +
-        s"hmac ${hmacString} with outcome ${isValidSignature}")
-      sender ! isValidSignature
-    }
-  }
-
-  /**
-   * Defines the types that will be stashed by {@link #waiting() waiting}
-   * while the Actor is in processing state.
-   * @return
-   */
-  override def orderedTypes(): Seq[Class[_]] = Seq(classOf[(String, Seq[_])])
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/security/SignatureManagerActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/security/SignatureManagerActor.scala b/communication/src/main/scala/com/ibm/spark/communication/security/SignatureManagerActor.scala
deleted file mode 100644
index f381644..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/security/SignatureManagerActor.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.communication.security
-
-import akka.actor.{Props, ActorRef, Actor}
-import akka.util.Timeout
-import com.ibm.spark.communication.utils.OrderedSupport
-import com.ibm.spark.kernel.protocol.v5.KernelMessage
-import com.ibm.spark.utils.LogLike
-
-import scala.concurrent.duration._
-import akka.pattern.ask
-import akka.pattern.pipe
-
-class SignatureManagerActor(
-  key: String, scheme: String
-) extends Actor with LogLike with OrderedSupport {
-  private val hmac = Hmac(key, HmacAlgorithm(scheme))
-
-  def this(key: String) = this(key, HmacAlgorithm.SHA256.toString)
-
-  // NOTE: Required to provide the execution context for futures with akka
-  import context._
-
-  // NOTE: Required for ask (?) to function... maybe can define elsewhere?
-  implicit val timeout = Timeout(5.seconds)
-
-  //
-  // List of child actors that the signature manager contains
-  //
-  private var signatureChecker: ActorRef = _
-  private var signatureProducer: ActorRef = _
-
-  /**
-   * Initializes all child actors performing tasks for the interpreter.
-   */
-  override def preStart() = {
-    signatureChecker = context.actorOf(
-      Props(classOf[SignatureCheckerActor], hmac),
-      name = SignatureManagerChildActorType.SignatureChecker.toString
-    )
-    signatureProducer = context.actorOf(
-      Props(classOf[SignatureProducerActor], hmac),
-      name = SignatureManagerChildActorType.SignatureProducer.toString
-    )
-  }
-
-  override def receive: Receive = {
-    // Check blob strings for matching digest
-    case (signature: String, blob: Seq[_]) =>
-      startProcessing()
-      val destActor = sender()
-      val sigFuture = signatureChecker ? ((signature, blob))
-
-      sigFuture foreach { case isValid =>
-          destActor ! isValid
-          finishedProcessing()
-      }
-
-    case message: KernelMessage =>
-      startProcessing()
-      val destActor = sender()
-
-      // TODO: Proper error handling for possible exception from mapTo
-      val sigFuture = (signatureProducer ? message).mapTo[String].map(
-        result => message.copy(signature = result)
-      )
-
-      sigFuture foreach { case kernelMessage =>
-        destActor ! kernelMessage
-        finishedProcessing()
-      }
-  }
-
-  /**
-   * Defines the types that will be stashed by {@link #waiting() waiting}
-   * while the Actor is in processing state.
-   * @return
-   */
-  override def orderedTypes(): Seq[Class[_]] = Seq(
-    classOf[(String, Seq[_])],
-    classOf[KernelMessage]
-  )
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/security/SignatureProducerActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/security/SignatureProducerActor.scala b/communication/src/main/scala/com/ibm/spark/communication/security/SignatureProducerActor.scala
deleted file mode 100644
index 36b5688..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/security/SignatureProducerActor.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.communication.security
-
-import akka.actor.Actor
-import com.ibm.spark.communication.utils.OrderedSupport
-import com.ibm.spark.kernel.protocol.v5.KernelMessage
-import com.ibm.spark.utils.LogLike
-import play.api.libs.json.Json
-
-/**
- * Constructs a signature from any kernel message received.
- * @param hmac The HMAC to use for signature construction
- */
-class SignatureProducerActor(
-  private val hmac: Hmac
-) extends Actor with LogLike with OrderedSupport {
-  override def receive: Receive = {
-    case message: KernelMessage => withProcessing {
-      val signature = hmac(
-        Json.stringify(Json.toJson(message.header)),
-        Json.stringify(Json.toJson(message.parentHeader)),
-        Json.stringify(Json.toJson(message.metadata)),
-        message.contentString
-      )
-      sender ! signature
-    }
-  }
-
-  /**
-   * Defines the types that will be stashed by {@link #waiting() waiting}
-   * while the Actor is in processing state.
-   * @return
-   */
-  override def orderedTypes(): Seq[Class[_]] = Seq(classOf[KernelMessage])
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/security/package.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/security/package.scala b/communication/src/main/scala/com/ibm/spark/communication/security/package.scala
deleted file mode 100644
index 5c5b1d5..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/security/package.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.communication
-
-package object security {
-  object SecurityActorType extends Enumeration {
-    type SecurityActorType = Value
-
-    val SignatureManager    = Value("signature_manager")
-  }
-
-  object SignatureManagerChildActorType extends Enumeration {
-    type SignatureManagerChildActorType = Value
-
-    val SignatureChecker  = Value("signature_checker")
-    val SignatureProducer = Value("signature_producer")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/JeroMQSocket.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/JeroMQSocket.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/JeroMQSocket.scala
deleted file mode 100644
index c95eb69..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/socket/JeroMQSocket.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.ibm.spark.communication.socket
-
-import org.zeromq.ZMsg
-
-/**
- * Represents a socket implemented using JeroMQ.
- *
- * @param runnable The underlying ZeroMQ socket runnable to use for the thread
- *                 managed by this socket
- */
-class JeroMQSocket(private val runnable: ZeroMQSocketRunnable)
-  extends SocketLike {
-
-  private val socketThread = new Thread(runnable)
-  socketThread.start()
-
-  /**
-   * Sends a message using this socket.
-   *
-   * @param message The message to send
-   */
-  override def send(message: String*): Unit = {
-    assert(isAlive, "Socket is not alive to be able to send messages!")
-
-    runnable.offer(ZMsg.newStringMsg(message: _*))
-  }
-
-  /**
-   * Closes the socket by closing the runnable and waiting for the underlying
-   * thread to close.
-   */
-  override def close(): Unit = {
-    runnable.close()
-    socketThread.join()
-  }
-
-  /**
-   * Indicates whether or not this socket is alive.
-   *
-   * @return True if alive (thread running), otherwise false
-   */
-  override def isAlive: Boolean = socketThread.isAlive
-
-  /**
-   * Indicates whether or not this socket is ready to send/receive messages.
-   *
-   * @return True if ready (runnable processing messages), otherwise false
-   */
-  override def isReady: Boolean = runnable.isProcessing
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/PubSocketRunnable.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/PubSocketRunnable.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/PubSocketRunnable.scala
deleted file mode 100644
index 43f3f3d..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/socket/PubSocketRunnable.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.ibm.spark.communication.socket
-
-import org.zeromq.ZMQ.{Socket, Context}
-
-/**
- * Represents the runnable component of a socket specifically targeted towards
- * publish sockets. No incoming messages are processed.
- *
- * @param context The ZMQ context to use with this runnable to create a socket
- * @param socketOptions The options to use when creating the socket
- */
-class PubSocketRunnable(
-  private val context: Context,
-  private val socketOptions: SocketOption*
-) extends ZeroMQSocketRunnable(
-  context,
-  PubSocket,
-  None,
-  socketOptions: _*
-) {
-  /** Does nothing. */
-  override protected def processNextInboundMessage(
-    socket: Socket,
-    flags: Int
-  ): Unit = {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/ReqSocketRunnable.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/ReqSocketRunnable.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/ReqSocketRunnable.scala
deleted file mode 100644
index 0aa527f..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/socket/ReqSocketRunnable.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.ibm.spark.communication.socket
-
-import org.zeromq.ZMQ.{Socket, Context}
-
-/**
- * Represents the runnable component of a socket that processes messages and
- * sends messages placed on an outbound queue. Targeted towards the request
- * socket, this runnable ensures that a message is sent out first and then a
- * response is received before sending the next message.
- *
- * @param context The ZMQ context to use with this runnable to create a socket
- * @param inboundMessageCallback The callback to invoke when receiving a message
- *                               on the socket created
- * @param socketOptions The options to use when creating the socket
- */
-class ReqSocketRunnable(
-  private val context: Context,
-  private val inboundMessageCallback: Option[(Seq[String]) => Unit],
-  private val socketOptions: SocketOption*
-) extends ZeroMQSocketRunnable(
-  context,
-  ReqSocket,
-  inboundMessageCallback,
-  socketOptions: _*
-) {
-  /** Does nothing. */
-  override protected def processNextInboundMessage(
-    socket: Socket,
-    flags: Int
-  ): Unit = {}
-
-  /**
-   * Sends a message and then waits for an incoming response (if a message
-   * was sent from the outbound queue).
-   *
-   * @param socket The socket to use when sending the message
-   *
-   * @return True if a message was sent, otherwise false
-   */
-  override protected def processNextOutboundMessage(socket: Socket): Boolean = {
-    val shouldReceiveMessage = super.processNextOutboundMessage(socket)
-
-    if (shouldReceiveMessage) {
-      super.processNextInboundMessage(socket, 0)
-    }
-
-    shouldReceiveMessage
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/SocketLike.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketLike.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/SocketLike.scala
deleted file mode 100644
index 9bf752d..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketLike.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.ibm.spark.communication.socket
-
-/**
- * Represents a generic interface for socket communication.
- */
-trait SocketLike {
-  /**
-   * Sends a message through the socket if alive.
-   *
-   * @throws AssertionError If the socket is not alive when attempting to send
-   *                        a message
-   *
-   * @param message The message to send
-   */
-  def send(message: String*): Unit
-
-  /**
-   * Closes the socket, marking it no longer able to process or send messages.
-   */
-  def close(): Unit
-
-  /**
-   * Returns whether or not the socket is alive (processing new messages and
-   * capable of sending out messages).
-   *
-   * @return True if alive, otherwise false
-   */
-  def isAlive: Boolean
-
-  /**
-   * Returns whether or not the socket is ready to send/receive messages.
-   *
-   * @return True if ready, otherwise false
-   */
-  def isReady: Boolean
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/SocketOption.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketOption.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/SocketOption.scala
deleted file mode 100644
index 7685239..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketOption.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.ibm.spark.communication.socket
-
-import org.zeromq.ZMQ
-
-/** Represents an option to provide to a socket. */
-sealed trait SocketOption
-
-/**
- * Represents the linger option used to communicate the millisecond duration
- * to continue processing messages after the socket has been told to close.
- *
- * @note Provide -1 as the duration to wait until all messages are processed
- *
- * @param milliseconds The duration in milliseconds
- */
-case class Linger(milliseconds: Int) extends SocketOption
-
-/**
- * Represents the subscribe option used to filter messages coming into a
- * socket subscribing to a publisher. Uses the provided byte prefix to filter
- * incoming messages.
- *
- * @param topic The array of bytes to use as a filter based on the
- *              bytes at the beginning of incoming messages
- */
-case class Subscribe(topic: Array[Byte]) extends SocketOption
-object Subscribe {
-  val all = Subscribe(ZMQ.SUBSCRIPTION_ALL)
-}
-
-/**
- * Represents the identity option used to identify the socket.
- *
- * @param identity The identity to use with the socket
- */
-case class Identity(identity: Array[Byte]) extends SocketOption
-
-/**
- * Represents the bind option used to tell the socket what address to bind to.
- *
- * @param address The address for the socket to use
- */
-case class Bind(address: String) extends SocketOption
-
-/**
- * Represents the connect option used to tell the socket what address to
- * connect to.
- *
- * @param address The address for the socket to use
- */
-case class Connect(address: String) extends SocketOption

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/SocketRunnable.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketRunnable.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/SocketRunnable.scala
deleted file mode 100644
index 6c033cc..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketRunnable.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.ibm.spark.communication.socket
-
-import java.util.concurrent.ConcurrentLinkedQueue
-
-/**
- * Represents the interface for a runnable used to send and receive messages
- * for a socket.
- *
- * @param inboundMessageCallback The callback to use when receiving a message
- *                               through this runnable
- */
-abstract class SocketRunnable[T](
-   private val inboundMessageCallback: Option[(Seq[String]) => Unit]
-) extends Runnable {
-
-  /** The collection of messages to be sent out through the socket. */
-  val outboundMessages: ConcurrentLinkedQueue[T] =
-    new ConcurrentLinkedQueue[T]()
-
-  /**
-   * Attempts to add a new message to the outbound queue to be sent out.
-   *
-   * @param message The message to add to the queue
-   *
-   * @return True if successfully queued the message, otherwise false
-   */
-  def offer(message: T): Boolean = outboundMessages.offer(message)
-
-  /**
-   * Indicates whether or not the runnable is processing messages (both
-   * sending and receiving).
-   *
-   * @return True if processing, otherwise false
-   */
-  def isProcessing: Boolean
-
-  /**
-   * Closes the runnable such that it no longer processes messages and also
-   * closes the underlying socket associated with the runnable.
-   */
-  def close(): Unit
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/SocketType.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketType.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/SocketType.scala
deleted file mode 100644
index 7062d85..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketType.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.ibm.spark.communication.socket
-
-import org.zeromq.ZMQ
-
-/**
- * Represents the type option used to indicate the type of socket to create.
- *
- * @param `type` The type as an integer
- */
-sealed class SocketType(val `type`: Int)
-
-/** Represents a publish socket. */
-case object PubSocket extends SocketType(ZMQ.PUB)
-
-/** Represents a subscribe socket. */
-case object SubSocket extends SocketType(ZMQ.SUB)
-
-/** Represents a reply socket. */
-case object RepSocket extends SocketType(ZMQ.REP)
-
-/** Represents a request socket. */
-case object ReqSocket extends SocketType(ZMQ.REQ)
-
-/** Represents a router socket. */
-case object RouterSocket extends SocketType(ZMQ.ROUTER)
-
-/** Represents a dealer socket. */
-case object DealerSocket extends SocketType(ZMQ.DEALER)

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/ZeroMQSocketRunnable.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/ZeroMQSocketRunnable.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/ZeroMQSocketRunnable.scala
deleted file mode 100644
index 6fee716..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/socket/ZeroMQSocketRunnable.scala
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.ibm.spark.communication.socket
-
-import com.ibm.spark.utils.LogLike
-import org.zeromq.{ZMsg, ZMQ}
-import org.zeromq.ZMQ.Context
-
-import scala.collection.JavaConverters._
-import scala.util.Try
-
-/**
- * Represents the runnable component of a socket that processes messages and
- * sends messages placed on an outbound queue.
- *
- * @param context The ZMQ context to use with this runnable to create a socket
- * @param socketType The type of socket to create
- * @param inboundMessageCallback The callback to invoke when receiving a message
- *                               on the socket created
- * @param socketOptions The options to use when creating the socket
- */
-class ZeroMQSocketRunnable(
-  private val context: Context,
-  private val socketType: SocketType,
-  private val inboundMessageCallback: Option[(Seq[String]) => Unit],
-  private val socketOptions: SocketOption*
-) extends SocketRunnable[ZMsg](inboundMessageCallback)
-  with LogLike {
-  require(socketOptions.count {
-    case _: Bind    => true
-    case _: Connect => true
-    case _          => false
-  } == 1, "ZeroMQ socket needs exactly one bind or connect!")
-
-  @volatile private var notClosed: Boolean = true
-  @volatile private var _isProcessing: Boolean = false
-
-  /**
-   * Indicates the processing state of this runnable.
-   *
-   * @return True if processing messages, otherwise false
-   */
-  override def isProcessing: Boolean = _isProcessing
-
-  /**
-   * Processes the provided options, performing associated actions on the
-   * specified socket.
-   *
-   * @param socket The socket to apply actions on
-   */
-  protected def processOptions(socket: ZMQ.Socket): Unit = {
-    val socketOptionsString = socketOptions.map("\n- " + _.toString).mkString("")
-    logger.trace(
-      s"Processing options for socket $socketType: $socketOptionsString"
-    )
-
-    // Split our options based on connection (bind/connect) and everything else
-    val (connectionOptions, otherOptions) = socketOptions.partition {
-      case Bind(_) | Connect(_) => true
-      case _ => false
-    }
-
-    // Apply non-connection options first since some (like identity) must be
-    // run before the socket does a bind/connect
-    otherOptions.foreach {
-      case Linger(milliseconds) => socket.setLinger(milliseconds)
-      case Subscribe(topic)     => socket.subscribe(topic)
-      case Identity(identity)   => socket.setIdentity(identity)
-      case option               => logger.warn(s"Unknown option: $option")
-    }
-
-    // Perform our bind or connect
-    connectionOptions.foreach {
-      case Bind(address)        => socket.bind(address)
-      case Connect(address)     => socket.connect(address)
-      case option               =>
-        logger.warn(s"Unknown connection option: $option")
-    }
-
-    _isProcessing = true
-  }
-
-  /**
-   * Sends the next outbound message from the outbound message queue.
-   *
-   * @param socket The socket to use when sending the message
-   *
-   * @return True if a message was sent, otherwise false
-   */
-  protected def processNextOutboundMessage(socket: ZMQ.Socket): Boolean = {
-    val message = Option(outboundMessages.poll())
-
-    message.foreach(_.send(socket))
-
-    message.nonEmpty
-  }
-
-  /**
-   * Retrieves the next inbound message (if available) and invokes the
-   * inbound message callback.
-   *
-   * @param socket The socket whose next incoming message to retrieve
-   */
-  protected def processNextInboundMessage(
-    socket: ZMQ.Socket,
-    flags: Int = ZMQ.DONTWAIT
-  ): Unit = {
-    Option(ZMsg.recvMsg(socket, flags)).foreach(zMsg => {
-      inboundMessageCallback.foreach(_(zMsg.asScala.toSeq
-        .map(zFrame => new String(zFrame.getData, ZMQ.CHARSET))
-      ))
-    })
-  }
-
-  /**
-   * Creates a new instance of a ZMQ Socket.
-   *
-   * @param zmqContext The context to use to create the socket
-   * @param socketType The type of socket to create
-   *
-   * @return The new ZMQ.Socket instance
-   */
-  protected def newZmqSocket(zmqContext: ZMQ.Context, socketType: Int) =
-    zmqContext.socket(socketType)
-
-  override def run(): Unit = {
-    val socket = newZmqSocket(context, socketType.`type`)//context.socket(socketType.`type`)
-
-    try {
-      processOptions(socket)
-
-      while (notClosed) {
-        Try(processNextOutboundMessage(socket)).failed.foreach(
-          logger.error("Failed to send next outgoing message!", _: Throwable)
-        )
-        Try(processNextInboundMessage(socket)).failed.foreach(
-          logger.error("Failed to retrieve next incoming message!", _: Throwable)
-        )
-        Thread.sleep(1)
-      }
-    } catch {
-      case ex: Exception =>
-        logger.error("Unexpected exception in 0mq socket runnable!", ex)
-    } finally {
-      try{
-        socket.close()
-      } catch {
-        case ex: Exception =>
-          logger.error("Failed to close socket!", _: Throwable)
-      }
-    }
-  }
-
-  /**
-   * Marks the runnable as closed such that it eventually stops processing
-   * messages and closes the socket.
-   *
-   * @throws AssertionError If the runnable is not processing messages or has
-   *                        already been closed
-   */
-  override def close(): Unit = {
-    assert(_isProcessing && notClosed,
-      "Runnable is not processing or is closed!")
-
-    _isProcessing = false
-    notClosed = false
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/utils/OrderedSupport.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/utils/OrderedSupport.scala b/communication/src/main/scala/com/ibm/spark/communication/utils/OrderedSupport.scala
deleted file mode 100644
index 8f41861..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/utils/OrderedSupport.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.communication.utils
-
-import akka.actor.{Actor, Stash}
-import com.ibm.spark.utils.LogLike
-
-/**
- * A trait to enforce ordered processing for messages of particular types.
- */
-trait OrderedSupport extends Actor with Stash with LogLike {
-  /**
-   * Executes instead of the default receive when the Actor has begun
-   * processing. Stashes incoming messages of particular types, defined by
-   * {@link #orderedTypes() orderedTypes} function, for later processing. Uses
-   * the default receive method for all other types. Upon receiving a
-   * FinishedProcessing message, resumes processing all messages with the
-   * default receive.
-   * @return
-   */
-  def waiting : Receive = {
-    case FinishedProcessing =>
-      context.unbecome()
-      unstashAll()
-    case aVal: Any if (orderedTypes().contains(aVal.getClass)) =>
-      logger.trace(s"Stashing message ${aVal} of type ${aVal.getClass}.")
-      stash()
-    case aVal: Any =>
-      logger.trace(s"Forwarding message ${aVal} of type ${aVal.getClass} " +
-        "to default receive.")
-      receive(aVal)
-  }
-
-  /**
-   * Suspends the default receive method for types defined by the
-   * {@link #orderedTypes() orderedTypes} function.
-   */
-  def startProcessing(): Unit = {
-    logger.debug("Actor is in processing state and will stash messages of " +
-      s"types: ${orderedTypes.mkString(" ")}")
-    context.become(waiting, discardOld = false)
-  }
-
-  /**
-   * Resumes the default receive method for all message types.
-   */
-  def finishedProcessing(): Unit = {
-    logger.debug("Actor is no longer in processing state.")
-    self ! FinishedProcessing
-  }
-
-  /**
-   * Executes a block of code, wrapping it in start/finished processing
-   * needed for ordered execution.
-   *
-   * @param block The block to execute
-   * @tparam T The return type of the block
-   * @return The result of executing the block
-   */
-  def withProcessing[T](block: => T): T = {
-    startProcessing()
-    val results = block
-    finishedProcessing()
-    results
-  }
-
-  /**
-   * Defines the types that will be stashed by {@link #waiting() waiting}
-   * while the Actor is in processing state.
-   * @return
-   */
-  def orderedTypes(): Seq[Class[_]]
-
-  case object FinishedProcessing
-}