You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lb...@apache.org on 2016/01/22 23:07:35 UTC
[27/51] [abbrv] incubator-toree git commit: Moved scala files to new
locations based on new package
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/HeartbeatClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/HeartbeatClientSpec.scala b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/HeartbeatClientSpec.scala
new file mode 100644
index 0000000..9fdd702
--- /dev/null
+++ b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/HeartbeatClientSpec.scala
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.client.socket
+
+import akka.actor.{ActorRef, ActorSystem, Props}
+import akka.testkit.{TestProbe, ImplicitSender, TestKit}
+import com.ibm.spark.communication.ZMQMessage
+import com.ibm.spark.kernel.protocol.v5.client.ActorLoader
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, FunSpecLike}
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+
+class HeartbeatClientSpec extends TestKit(ActorSystem("HeartbeatActorSpec"))
+ with ImplicitSender with FunSpecLike with Matchers with MockitoSugar {
+
+ describe("HeartbeatClientActor") {
+ val socketFactory = mock[SocketFactory]
+ val mockActorLoader = mock[ActorLoader]
+ val probe : TestProbe = TestProbe()
+ when(socketFactory.HeartbeatClient(any(classOf[ActorSystem]), any(classOf[ActorRef]))).thenReturn(probe.ref)
+
+ val heartbeatClient = system.actorOf(Props(
+ classOf[HeartbeatClient], socketFactory, mockActorLoader, true
+ ))
+
+ describe("send heartbeat") {
+ it("should send ping ZMQMessage") {
+ heartbeatClient ! HeartbeatMessage
+ probe.expectMsgClass(classOf[ZMQMessage])
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/IOPubClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/IOPubClientSpec.scala b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/IOPubClientSpec.scala
new file mode 100644
index 0000000..b592dcd
--- /dev/null
+++ b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/IOPubClientSpec.scala
@@ -0,0 +1,300 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.client.socket
+
+import java.util.UUID
+
+import akka.actor.{ActorRef, ActorSystem, Props}
+import akka.pattern.ask
+import akka.testkit.{ImplicitSender, TestKit, TestProbe}
+import akka.util.Timeout
+import com.ibm.spark.comm.{CommCallbacks, CommRegistrar, CommStorage, CommWriter}
+import com.ibm.spark.communication.ZMQMessage
+import com.ibm.spark.kernel.protocol.v5
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.kernel.protocol.v5.client.Utilities._
+import com.ibm.spark.kernel.protocol.v5.client.execution.{DeferredExecution, DeferredExecutionManager}
+import com.ibm.spark.kernel.protocol.v5.client.{ActorLoader, Utilities}
+import com.ibm.spark.kernel.protocol.v5.content.{CommClose, CommMsg, CommOpen, StreamContent}
+import com.typesafe.config.ConfigFactory
+import org.mockito.Matchers.{eq => mockEq, _}
+import org.mockito.Mockito._
+import org.scalatest.concurrent.{Eventually, ScalaFutures}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.time.{Milliseconds, Span}
+import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
+import play.api.libs.json.Json
+
+import scala.concurrent.duration._
+import scala.concurrent.{Future, Promise}
+import scala.util.Failure
+
+object IOPubClientSpec {
+ val config ="""
+ akka {
+ loglevel = "WARNING"
+ }"""
+}
+
+class IOPubClientSpec extends TestKit(ActorSystem(
+ "IOPubClientSpecSystem", ConfigFactory.parseString(IOPubClientSpec.config)
+)) with ImplicitSender with FunSpecLike with Matchers with MockitoSugar
+ with ScalaFutures with BeforeAndAfter with Eventually
+{
+ private val TestTimeout = Timeout(10.seconds)
+ implicit override val patienceConfig = PatienceConfig(
+ timeout = scaled(Span(200, Milliseconds)),
+ interval = scaled(Span(5, Milliseconds))
+ )
+ private val SignatureEnabled = true
+
+ private var clientSocketProbe: TestProbe = _
+ private var mockClientSocketFactory: SocketFactory = _
+ private var mockActorLoader: ActorLoader = _
+ private var mockCommRegistrar: CommRegistrar = _
+ private var spyCommStorage: CommStorage = _
+ private var mockCommCallbacks: CommCallbacks = _
+ private var ioPubClient: ActorRef = _
+
+ private var kmBuilder: KMBuilder = _
+
+ private val id = UUID.randomUUID().toString
+ private val TestTargetName = "some target"
+ private val TestCommId = UUID.randomUUID().toString
+
+ before {
+ kmBuilder = KMBuilder()
+ mockCommCallbacks = mock[CommCallbacks]
+ mockCommRegistrar = mock[CommRegistrar]
+
+ spyCommStorage = spy(new CommStorage())
+
+ clientSocketProbe = TestProbe()
+ mockActorLoader = mock[ActorLoader]
+ mockClientSocketFactory = mock[SocketFactory]
+
+ // Stub the return value for the socket factory
+ when(mockClientSocketFactory.IOPubClient(anyObject(), any[ActorRef]))
+ .thenReturn(clientSocketProbe.ref)
+
+ // Construct the object we will test against
+ ioPubClient = system.actorOf(Props(
+ classOf[IOPubClient], mockClientSocketFactory, mockActorLoader,
+ SignatureEnabled, mockCommRegistrar, spyCommStorage
+ ))
+ }
+
+ describe("IOPubClient") {
+ describe("#receive") {
+ it("should execute all Comm open callbacks on comm_open message") {
+ val message: ZMQMessage = kmBuilder
+ .withHeader(CommOpen.toTypeString)
+ .withContentString(CommOpen(TestCommId, TestTargetName, v5.MsgData.Empty))
+ .build
+
+ // Mark as target being provided
+ doReturn(Some(mockCommCallbacks)).when(spyCommStorage)
+ .getTargetCallbacks(anyString())
+
+ // Simulate receiving a message from the kernel
+ ioPubClient ! message
+
+ // Check to see if "eventually" the callback is triggered
+ eventually {
+ verify(mockCommCallbacks).executeOpenCallbacks(
+ any[CommWriter], mockEq(TestCommId),
+ mockEq(TestTargetName), any[v5.MsgData])
+ }
+ }
+
+ it("should not execute Comm open callbacks if the target is not found") {
+ val message: ZMQMessage = kmBuilder
+ .withHeader(CommOpen.toTypeString)
+ .withContentString(CommOpen(TestCommId, TestTargetName, v5.MsgData.Empty))
+ .build
+
+ // Mark as target NOT being provided
+ doReturn(None).when(spyCommStorage).getTargetCallbacks(anyString())
+
+ // Simulate receiving a message from the kernel
+ ioPubClient ! message
+
+ // Check to see if "eventually" the callback is NOT triggered
+ eventually {
+ // Check that we have checked if the target exists
+ verify(spyCommStorage).getTargetCallbacks(TestTargetName)
+
+ verify(mockCommCallbacks, never()).executeOpenCallbacks(
+ any[CommWriter], mockEq(TestCommId),
+ mockEq(TestTargetName), any[v5.MsgData])
+ verify(mockCommRegistrar, never()).link(TestTargetName, TestCommId)
+ }
+ }
+
+ it("should execute all Comm msg callbacks on comm_msg message") {
+ val message: ZMQMessage = kmBuilder
+ .withHeader(CommMsg.toTypeString)
+ .withContentString(CommMsg(TestCommId, v5.MsgData.Empty))
+ .build
+
+ // Mark as target being provided
+ doReturn(Some(mockCommCallbacks)).when(spyCommStorage)
+ .getCommIdCallbacks(any[v5.UUID])
+
+ // Simulate receiving a message from the kernel
+ ioPubClient ! message
+
+ // Check to see if "eventually" the callback is triggered
+ eventually {
+ verify(mockCommCallbacks).executeMsgCallbacks(
+ any[CommWriter], mockEq(TestCommId), any[v5.MsgData])
+ }
+ }
+
+ it("should not execute Comm msg callbacks if the Comm id is not found") {
+ val message: ZMQMessage = kmBuilder
+ .withHeader(CommMsg.toTypeString)
+ .withContentString(CommMsg(TestCommId, v5.MsgData.Empty))
+ .build
+
+ // Mark as target NOT being provided
+ doReturn(None).when(spyCommStorage).getCommIdCallbacks(any[v5.UUID])
+
+ // Simulate receiving a message from the kernel
+ ioPubClient ! message
+
+ // Check to see if "eventually" the callback is NOT triggered
+ eventually {
+ // Check that we have checked if the target exists
+ verify(spyCommStorage).getCommIdCallbacks(TestCommId)
+
+ verify(mockCommCallbacks, never()).executeMsgCallbacks(
+ any[CommWriter], mockEq(TestCommId), any[v5.MsgData])
+ }
+ }
+
+ it("should execute all Comm close callbacks on comm_close message") {
+ val message: ZMQMessage = kmBuilder
+ .withHeader(CommClose.toTypeString)
+ .withContentString(CommClose(TestCommId, v5.MsgData.Empty))
+ .build
+
+ // Mark as target being provided
+ doReturn(Some(mockCommCallbacks)).when(spyCommStorage)
+ .getCommIdCallbacks(any[v5.UUID])
+
+ // Simulate receiving a message from the kernel
+ ioPubClient ! message
+
+ // Check to see if "eventually" the callback is triggered
+ eventually {
+ verify(mockCommCallbacks).executeCloseCallbacks(
+ any[CommWriter], mockEq(TestCommId), any[v5.MsgData])
+ }
+ }
+
+ it("should not execute Comm close callbacks if Comm id is not found") {
+ val message: ZMQMessage = kmBuilder
+ .withHeader(CommClose.toTypeString)
+ .withContentString(CommClose(TestCommId, v5.MsgData.Empty))
+ .build
+
+ // Mark as target NOT being provided
+ doReturn(None).when(spyCommStorage).getCommIdCallbacks(any[v5.UUID])
+
+ // Simulate receiving a message from the kernel
+ ioPubClient ! message
+
+ // Check to see if "eventually" the callback is NOT triggered
+ eventually {
+ // Check that we have checked if the target exists
+ verify(spyCommStorage).getCommIdCallbacks(TestCommId)
+
+ verify(mockCommCallbacks, never()).executeCloseCallbacks(
+ any[CommWriter], mockEq(TestCommId), any[v5.MsgData])
+ }
+ }
+
+ it("should call a registered callback on stream message") {
+ val result = StreamContent("foo", "bar")
+ val header = Header(id, "spark", id,
+ MessageType.Outgoing.Stream.toString, "5.0")
+ val parentHeader = Header(id, "spark", id,
+ MessageType.Incoming.ExecuteRequest.toString, "5.0")
+
+ val kernelMessage = new KernelMessage(
+ Seq[String](),
+ "",
+ header,
+ parentHeader,
+ Metadata(),
+ Json.toJson(result).toString()
+ )
+ val promise: Promise[String] = Promise()
+ val de: DeferredExecution = DeferredExecution().onStream(
+ (content: StreamContent) => {
+ promise.success(content.text)
+ }
+ )
+ DeferredExecutionManager.add(id, de)
+ // Send the message to the IOPubClient
+ val zmqMessage: ZMQMessage = kernelMessage
+
+ ioPubClient ! zmqMessage
+
+ whenReady(promise.future) {
+ case res: String =>
+ res should be eq("bar")
+ case _ =>
+ fail(s"Received failure when asking IOPubClient")
+ }
+ }
+
+ it("should not invoke callback when stream message's parent header is null") {
+ // Construct the kernel message
+ val result = StreamContent("foo", "bar")
+ val header = Header(id, "spark", id,
+ MessageType.Outgoing.Stream.toString, "5.0")
+
+ val kernelMessage = new KernelMessage(
+ Seq[String](),
+ "",
+ header,
+ null,
+ Metadata(),
+ Json.toJson(result).toString()
+ )
+
+ // Send the message to the IOPubClient
+ val zmqMessage: ZMQMessage = kernelMessage
+ val futureResult: Future[Any] = ioPubClient.ask(zmqMessage)(TestTimeout)
+ whenReady(futureResult) {
+ case result: Failure[Any] =>
+ // Getting the value of the failure will cause the underlying exception will be thrown
+ try {
+ result.get
+ } catch {
+ case t:RuntimeException =>
+ t.getMessage should be("Parent Header was null in Kernel Message.")
+ }
+ case result =>
+ fail(s"Did not receive failure!! ${result}")
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/ShellClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/ShellClientSpec.scala b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/ShellClientSpec.scala
new file mode 100644
index 0000000..0110dfd
--- /dev/null
+++ b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/ShellClientSpec.scala
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.client.socket
+
+import java.util.UUID
+
+import akka.actor.{ActorRef, ActorSystem, Props}
+import akka.testkit.{TestProbe, ImplicitSender, TestKit}
+import com.ibm.spark.communication.ZMQMessage
+import com.ibm.spark.communication.security.SecurityActorType
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.kernel.protocol.v5.client.ActorLoader
+import com.ibm.spark.kernel.protocol.v5.content.ExecuteRequest
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, FunSpecLike}
+import org.mockito.Mockito._
+import org.mockito.Matchers._
+import play.api.libs.json.Json
+
+class ShellClientSpec extends TestKit(ActorSystem("ShellActorSpec"))
+ with ImplicitSender with FunSpecLike with Matchers with MockitoSugar {
+ private val SignatureEnabled = true
+
+ describe("ShellClientActor") {
+ val socketFactory = mock[SocketFactory]
+ val mockActorLoader = mock[ActorLoader]
+ val probe : TestProbe = TestProbe()
+ when(socketFactory.ShellClient(
+ any(classOf[ActorSystem]), any(classOf[ActorRef])
+ )).thenReturn(probe.ref)
+
+ val signatureManagerProbe = TestProbe()
+ doReturn(system.actorSelection(signatureManagerProbe.ref.path.toString))
+ .when(mockActorLoader).load(SecurityActorType.SignatureManager)
+
+ val shellClient = system.actorOf(Props(
+ classOf[ShellClient], socketFactory, mockActorLoader, SignatureEnabled
+ ))
+
+ describe("send execute request") {
+ it("should send execute request") {
+ val request = ExecuteRequest(
+ "foo", false, true, UserExpressions(), true
+ )
+ val header = Header(
+ UUID.randomUUID().toString, "spark",
+ UUID.randomUUID().toString, MessageType.Incoming.ExecuteRequest.toString,
+ "5.0"
+ )
+ val kernelMessage = KernelMessage(
+ Seq[String](), "",
+ header, HeaderBuilder.empty,
+ Metadata(), Json.toJson(request).toString
+ )
+ shellClient ! kernelMessage
+
+ // Echo back the kernel message sent to have a signature injected
+ signatureManagerProbe.expectMsgClass(classOf[KernelMessage])
+ signatureManagerProbe.reply(kernelMessage)
+
+ probe.expectMsgClass(classOf[ZMQMessage])
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/StdinClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/StdinClientSpec.scala b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/StdinClientSpec.scala
new file mode 100644
index 0000000..877c4f5
--- /dev/null
+++ b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/StdinClientSpec.scala
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.client.socket
+
+import akka.actor.{ActorRef, Props, ActorSystem}
+import akka.testkit.{TestProbe, ImplicitSender, TestKit}
+import com.ibm.spark.communication.ZMQMessage
+import com.ibm.spark.communication.security.SecurityActorType
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.kernel.protocol.v5.client.ActorLoader
+import com.ibm.spark.kernel.protocol.v5.client.socket.StdinClient.ResponseFunction
+import com.ibm.spark.kernel.protocol.v5.content.{InputReply, InputRequest, ClearOutput, ExecuteRequest}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
+import com.ibm.spark.kernel.protocol.v5.client.Utilities._
+import play.api.libs.json.Json
+import scala.concurrent.duration._
+
+import org.mockito.Mockito._
+import org.mockito.Matchers._
+
+class StdinClientSpec extends TestKit(ActorSystem("StdinActorSpec"))
+ with ImplicitSender with FunSpecLike with Matchers with MockitoSugar
+ with BeforeAndAfter
+{
+ private val SignatureEnabled = true
+ private val TestReplyString = "some value"
+ private val TestResponseFunc: ResponseFunction = (_, _) => TestReplyString
+
+ private var mockSocketFactory: SocketFactory = _
+ private var mockActorLoader: ActorLoader = _
+ private var signatureManagerProbe: TestProbe = _
+ private var socketProbe: TestProbe = _
+ private var stdinClient: ActorRef = _
+
+ before {
+ socketProbe = TestProbe()
+ signatureManagerProbe = TestProbe()
+ mockSocketFactory = mock[SocketFactory]
+ mockActorLoader = mock[ActorLoader]
+ doReturn(system.actorSelection(signatureManagerProbe.ref.path.toString))
+ .when(mockActorLoader).load(SecurityActorType.SignatureManager)
+ doReturn(socketProbe.ref).when(mockSocketFactory)
+ .StdinClient(any[ActorSystem], any[ActorRef])
+
+ stdinClient = system.actorOf(Props(
+ classOf[StdinClient], mockSocketFactory, mockActorLoader, SignatureEnabled
+ ))
+
+ // Set the response function for our client socket
+ stdinClient ! TestResponseFunc
+ }
+
+ describe("StdinClient") {
+ describe("#receive") {
+ it("should update the response function if receiving a new one") {
+ val expected = "some other value"
+ val replacementFunc: ResponseFunction = (_, _) => expected
+
+ // Update the function
+ stdinClient ! replacementFunc
+
+ val inputRequestMessage: ZMQMessage = KMBuilder()
+ .withHeader(InputRequest.toTypeString)
+ .withContentString(InputRequest("", false))
+ .build
+
+ stdinClient ! inputRequestMessage
+
+ // Echo back the kernel message sent to have a signature injected
+ signatureManagerProbe.expectMsgPF() {
+ case kernelMessage: KernelMessage =>
+ signatureManagerProbe.reply(kernelMessage)
+ true
+ }
+
+ socketProbe.expectMsgPF() {
+ case zmqMessage: ZMQMessage =>
+ val kernelMessage: KernelMessage = zmqMessage
+ val inputReply =
+ Json.parse(kernelMessage.contentString).as[InputReply]
+ inputReply.value should be (expected)
+ }
+ }
+
+ it("should do nothing if the incoming message is not an input_request") {
+ val notInputRequestMessage: ZMQMessage = KMBuilder()
+ .withHeader(ClearOutput.toTypeString)
+ .build
+
+ stdinClient ! notInputRequestMessage
+
+ socketProbe.expectNoMsg(300.milliseconds)
+ }
+
+ it("should respond with an input_reply if the incoming message is " +
+ "an input_request") {
+ val inputRequestMessage: ZMQMessage = KMBuilder()
+ .withHeader(InputRequest.toTypeString)
+ .withContentString(InputRequest("", false))
+ .build
+
+ stdinClient ! inputRequestMessage
+
+ // Echo back the kernel message sent to have a signature injected
+ signatureManagerProbe.expectMsgPF() {
+ case kernelMessage: KernelMessage =>
+ signatureManagerProbe.reply(kernelMessage)
+ true
+ }
+
+ socketProbe.expectMsgPF() {
+ case zmqMessage: ZMQMessage =>
+ val kernelMessage: KernelMessage = zmqMessage
+ val messageType = kernelMessage.header.msg_type
+ messageType should be (InputReply.toTypeString)
+ }
+ }
+
+ it("should use the result from the response function if the incoming " +
+ "message is an input_request") {
+ val inputRequestMessage: ZMQMessage = KMBuilder()
+ .withHeader(InputRequest.toTypeString)
+ .withContentString(InputRequest("", false))
+ .build
+
+ stdinClient ! inputRequestMessage
+
+ // Echo back the kernel message sent to have a signature injected
+ signatureManagerProbe.expectMsgPF() {
+ case kernelMessage: KernelMessage =>
+ signatureManagerProbe.reply(kernelMessage)
+ true
+ }
+
+ socketProbe.expectMsgPF() {
+ case zmqMessage: ZMQMessage =>
+ val kernelMessage: KernelMessage = zmqMessage
+ val inputReply =
+ Json.parse(kernelMessage.contentString).as[InputReply]
+ inputReply.value should be (TestReplyString)
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/SocketManager.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/SocketManager.scala b/communication/src/main/scala/com/ibm/spark/communication/SocketManager.scala
deleted file mode 100644
index 994360f..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/SocketManager.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.communication
-
-import java.util.UUID
-import java.util.concurrent.ConcurrentHashMap
-import com.ibm.spark.communication.socket._
-import org.zeromq.ZMQ
-
-import scala.collection.JavaConverters._
-
-/**
- * Represents the factory for sockets that also manages ZMQ contexts and
- * facilitates closing of sockets created by the factory.
- */
-class SocketManager {
- /**
- * Creates a new ZMQ context with a single IO thread.
- *
- * @return The new ZMQ context
- */
- protected def newZmqContext(): ZMQ.Context = ZMQ.context(1)
-
- private val socketToContextMap =
- new ConcurrentHashMap[SocketLike, ZMQ.Context]().asScala
-
- /**
- * Provides and registers a new ZMQ context, used for creating a new socket.
- * @param mkSocket a function that creates a socket using a given context
- * @return the new socket
- * @see newZmqContext
- */
- private def withNewContext[A <: SocketLike](mkSocket: ZMQ.Context => A): A = {
- val ctx = newZmqContext()
- val socket = mkSocket(ctx)
- socketToContextMap.put(socket, ctx)
- socket
- }
-
- /**
- * Closes the socket provided and also closes the context if no more sockets
- * are using the context.
- *
- * @param socket The socket to close
- */
- def closeSocket(socket: SocketLike) = {
- socket.close()
-
- socketToContextMap.remove(socket).foreach(context => {
- if (!socketToContextMap.values.exists(_ == context)) context.close()
- })
- }
-
- /**
- * Creates a new request socket.
- *
- * @param address The address to associate with the socket
- * @param inboundMessageCallback The callback to use for incoming messages
- *
- * @return The new socket instance
- */
- def newReqSocket(
- address: String,
- inboundMessageCallback: (Seq[String]) => Unit
- ): SocketLike = withNewContext{ ctx =>
- new JeroMQSocket(new ReqSocketRunnable(
- ctx,
- Some(inboundMessageCallback),
- Connect(address),
- Linger(0)
- ))
- }
-
- /**
- * Creates a new reply socket.
- *
- * @param address The address to associate with the socket
- * @param inboundMessageCallback The callback to use for incoming messages
- *
- * @return The new socket instance
- */
- def newRepSocket(
- address: String,
- inboundMessageCallback: (Seq[String]) => Unit
- ): SocketLike = withNewContext{ ctx =>
- new JeroMQSocket(new ZeroMQSocketRunnable(
- ctx,
- RepSocket,
- Some(inboundMessageCallback),
- Bind(address),
- Linger(0)
- ))
- }
-
- /**
- * Creates a new publish socket.
- *
- * @param address The address to associate with the socket
- *
- * @return The new socket instance
- */
- def newPubSocket(
- address: String
- ): SocketLike = withNewContext{ ctx =>
- new JeroMQSocket(new PubSocketRunnable(
- ctx,
- Bind(address),
- Linger(0)
- ))
- }
-
- /**
- * Creates a new subscribe socket.
- *
- * @param address The address to associate with the socket
- * @param inboundMessageCallback The callback to use for incoming messages
- *
- * @return The new socket instance
- */
- def newSubSocket(
- address: String,
- inboundMessageCallback: (Seq[String]) => Unit
- ): SocketLike = withNewContext { ctx =>
- new JeroMQSocket(new ZeroMQSocketRunnable(
- ctx,
- SubSocket,
- Some(inboundMessageCallback),
- Connect(address),
- Linger(0),
- Subscribe.all
- ))
- }
-
- /**
- * Creates a new router socket.
- *
- * @param address The address to associate with the socket
- * @param inboundMessageCallback The callback to use for incoming messages
- *
- * @return The new socket instance
- */
- def newRouterSocket(
- address: String,
- inboundMessageCallback: (Seq[String]) => Unit
- ): SocketLike = withNewContext { ctx =>
- new JeroMQSocket(new ZeroMQSocketRunnable(
- ctx,
- RouterSocket,
- Some(inboundMessageCallback),
- Bind(address),
- Linger(0)
- ))
- }
-
- /**
- * Creates a new dealer socket.
- *
- * @param address The address to associate with the socket
- * @param inboundMessageCallback The callback to use for incoming messages
- *
- * @return The new socket instance
- */
- def newDealerSocket(
- address: String,
- inboundMessageCallback: (Seq[String]) => Unit,
- identity: String = UUID.randomUUID().toString
- ): SocketLike = withNewContext{ ctx =>
- new JeroMQSocket(new ZeroMQSocketRunnable(
- ctx,
- DealerSocket,
- Some(inboundMessageCallback),
- Connect(address),
- Linger(0),
- Identity(identity.getBytes(ZMQ.CHARSET))
- ))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/ZMQMessage.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/ZMQMessage.scala b/communication/src/main/scala/com/ibm/spark/communication/ZMQMessage.scala
deleted file mode 100644
index ffa7705..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/ZMQMessage.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.communication
-
-import akka.util.ByteString
-
-/**
- * Represents a ZeroMQ message containing a collection of Akka ByteString
- * instances.
- *
- * @note This is left in for backwards compatibility!
- *
- * @param frames The collection of Akka ByteString instances
- */
-case class ZMQMessage(frames: ByteString*) {
- def frame(i: Int) = frames(i)
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/actors/DealerSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/actors/DealerSocketActor.scala b/communication/src/main/scala/com/ibm/spark/communication/actors/DealerSocketActor.scala
deleted file mode 100644
index 0f0497d..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/actors/DealerSocketActor.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.communication.actors
-
-import akka.actor.{Actor, ActorRef}
-import akka.util.ByteString
-import com.ibm.spark.communication.{ZMQMessage, SocketManager}
-import com.ibm.spark.utils.LogLike
-import org.zeromq.ZMQ
-
-/**
- * Represents an actor containing a dealer socket.
- *
- * @param connection The address to connect to
- * @param listener The actor to send incoming messages back to
- */
-class DealerSocketActor(connection: String, listener: ActorRef)
- extends Actor with LogLike
-{
- logger.debug(s"Initializing dealer socket actor for $connection")
- private val manager: SocketManager = new SocketManager
- private val socket = manager.newDealerSocket(connection, (message: Seq[String]) => {
- listener ! ZMQMessage(message.map(ByteString.apply): _*)
- })
-
- override def postStop(): Unit = {
- manager.closeSocket(socket)
- }
-
- override def receive: Actor.Receive = {
- case zmqMessage: ZMQMessage =>
- val frames = zmqMessage.frames.map(byteString =>
- new String(byteString.toArray, ZMQ.CHARSET))
- socket.send(frames: _*)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/actors/PubSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/actors/PubSocketActor.scala b/communication/src/main/scala/com/ibm/spark/communication/actors/PubSocketActor.scala
deleted file mode 100644
index f74764e..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/actors/PubSocketActor.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.communication.actors
-
-import akka.actor.Actor
-import com.ibm.spark.communication.utils.OrderedSupport
-import com.ibm.spark.communication.{SocketManager, ZMQMessage}
-import com.ibm.spark.kernel.protocol.v5.KernelMessage
-import com.ibm.spark.utils.LogLike
-import org.zeromq.ZMQ
-
-/**
- * Represents an actor containing a publish socket.
- *
- * Note: OrderedSupport is used to ensure correct processing order.
- * A similar pattern may be useful for other socket actors if
- * issues arise in the future.
- *
- * @param connection The address to bind to
- */
-class PubSocketActor(connection: String)
- extends Actor with LogLike with OrderedSupport
-{
- logger.debug(s"Initializing publish socket actor for $connection")
- private val manager: SocketManager = new SocketManager
- private val socket = manager.newPubSocket(connection)
-
- override def postStop(): Unit = {
- manager.closeSocket(socket)
- }
-
- override def receive: Actor.Receive = {
- case zmqMessage: ZMQMessage => withProcessing {
- val frames = zmqMessage.frames.map(byteString =>
- new String(byteString.toArray, ZMQ.CHARSET))
-
- socket.send(frames: _*)
- }
- }
-
- /**
- * Defines the types that will be stashed by {@link #waiting() waiting}
- * while the Actor is in processing state.
- * @return
- */
- override def orderedTypes(): Seq[Class[_]] = Seq(classOf[ZMQMessage])
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/actors/RepSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/actors/RepSocketActor.scala b/communication/src/main/scala/com/ibm/spark/communication/actors/RepSocketActor.scala
deleted file mode 100644
index b8643f5..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/actors/RepSocketActor.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.communication.actors
-
-import akka.actor.{Actor, ActorRef}
-import akka.util.ByteString
-import com.ibm.spark.communication.{SocketManager, ZMQMessage}
-import com.ibm.spark.utils.LogLike
-import org.zeromq.ZMQ
-
-/**
- * Represents an actor containing a reply socket.
- *
- * @param connection The address to bind to
- * @param listener The actor to send incoming messages back to
- */
-class RepSocketActor(connection: String, listener: ActorRef)
- extends Actor with LogLike
-{
- logger.debug(s"Initializing reply socket actor for $connection")
- private val manager: SocketManager = new SocketManager
- private val socket = manager.newRepSocket(connection, (message: Seq[String]) => {
- listener ! ZMQMessage(message.map(ByteString.apply): _*)
- })
-
- override def postStop(): Unit = {
- manager.closeSocket(socket)
- }
-
- override def receive: Actor.Receive = {
- case zmqMessage: ZMQMessage =>
- val frames = zmqMessage.frames.map(byteString =>
- new String(byteString.toArray, ZMQ.CHARSET))
- socket.send(frames: _*)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/actors/ReqSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/actors/ReqSocketActor.scala b/communication/src/main/scala/com/ibm/spark/communication/actors/ReqSocketActor.scala
deleted file mode 100644
index e38f2a0..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/actors/ReqSocketActor.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.communication.actors
-
-import akka.actor.{Actor, ActorRef}
-import akka.util.ByteString
-import com.ibm.spark.communication.{ZMQMessage, SocketManager}
-import com.ibm.spark.utils.LogLike
-import org.zeromq.ZMQ
-
-/**
- * Represents an actor containing a request socket.
- *
- * @param connection The address to connect to
- * @param listener The actor to send incoming messages back to
- */
-class ReqSocketActor(connection: String, listener: ActorRef)
- extends Actor with LogLike
-{
- logger.debug(s"Initializing request socket actor for $connection")
- private val manager: SocketManager = new SocketManager
- private val socket = manager.newReqSocket(connection, (message: Seq[String]) => {
- listener ! ZMQMessage(message.map(ByteString.apply): _*)
- })
-
- override def postStop(): Unit = {
- manager.closeSocket(socket)
- }
-
- override def receive: Actor.Receive = {
- case zmqMessage: ZMQMessage =>
- val frames = zmqMessage.frames.map(byteString =>
- new String(byteString.toArray, ZMQ.CHARSET))
- socket.send(frames: _*)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/actors/RouterSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/actors/RouterSocketActor.scala b/communication/src/main/scala/com/ibm/spark/communication/actors/RouterSocketActor.scala
deleted file mode 100644
index 6aa3bc5..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/actors/RouterSocketActor.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.communication.actors
-
-import akka.actor.{Actor, ActorRef}
-import akka.util.ByteString
-import com.ibm.spark.communication.{SocketManager, ZMQMessage}
-import com.ibm.spark.utils.LogLike
-import org.zeromq.ZMQ
-
-/**
- * Represents an actor containing a router socket.
- *
- * @param connection The address to bind to
- * @param listener The actor to send incoming messages back to
- */
-class RouterSocketActor(connection: String, listener: ActorRef)
- extends Actor with LogLike
-{
- logger.debug(s"Initializing router socket actor for $connection")
- private val manager: SocketManager = new SocketManager
- private val socket = manager.newRouterSocket(connection, (message: Seq[String]) => {
- listener ! ZMQMessage(message.map(ByteString.apply): _*)
- })
-
- override def postStop(): Unit = {
- manager.closeSocket(socket)
- }
-
- override def receive: Actor.Receive = {
- case zmqMessage: ZMQMessage =>
- val frames = zmqMessage.frames.map(byteString =>
- new String(byteString.toArray, ZMQ.CHARSET))
- socket.send(frames: _*)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/actors/SubSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/actors/SubSocketActor.scala b/communication/src/main/scala/com/ibm/spark/communication/actors/SubSocketActor.scala
deleted file mode 100644
index 8fef496..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/actors/SubSocketActor.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.communication.actors
-
-import akka.actor.{Actor, ActorRef}
-import akka.util.ByteString
-import com.ibm.spark.communication.{ZMQMessage, SocketManager}
-import com.ibm.spark.utils.LogLike
-
-/**
- * Represents an actor containing a subscribe socket.
- *
- * @param connection The address to connect to
- * @param listener The actor to send incoming messages back to
- */
-class SubSocketActor(connection: String, listener: ActorRef)
- extends Actor with LogLike
-{
- logger.debug(s"Initializing subscribe socket actor for $connection")
- private val manager: SocketManager = new SocketManager
- private val socket = manager.newSubSocket(connection, (message: Seq[String]) => {
- listener ! ZMQMessage(message.map(ByteString.apply): _*)
- })
-
- override def postStop(): Unit = {
- manager.closeSocket(socket)
- }
-
- override def receive: Actor.Receive = {
- case _ =>
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/security/Hmac.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/security/Hmac.scala b/communication/src/main/scala/com/ibm/spark/communication/security/Hmac.scala
deleted file mode 100644
index 9f44177..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/security/Hmac.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.communication.security
-
-import javax.crypto.Mac
-import javax.crypto.spec.SecretKeySpec
-
-import com.ibm.spark.communication.security.HmacAlgorithm.HmacAlgorithm
-
-object HmacAlgorithm extends Enumeration {
- type HmacAlgorithm = Value
-
- def apply(key: String) = Value(key)
-
- val MD5 = Value("HmacMD5")
- val SHA1 = Value("HmacSHA1")
- val SHA256 = Value("HmacSHA256")
-}
-
-object Hmac {
-
- def apply(key: String, algorithm: HmacAlgorithm = HmacAlgorithm.SHA256) =
- new Hmac(key, algorithm)
-
- def newMD5(key: String): Hmac = this(key, HmacAlgorithm.MD5)
- def newSHA1(key: String): Hmac = this(key, HmacAlgorithm.SHA1)
- def newSHA256(key: String): Hmac = this(key, HmacAlgorithm.SHA256)
-}
-
-class Hmac(
- val key: String,
- val algorithm: HmacAlgorithm = HmacAlgorithm.SHA256
-) {
-
- private var mac: Mac = _
- private var secretKeySpec: SecretKeySpec = _
-
- if (key.nonEmpty) {
- mac = Mac.getInstance(algorithm.toString)
- secretKeySpec = new SecretKeySpec(key.getBytes, algorithm.toString)
- mac.init(secretKeySpec)
- }
-
- def apply(items: String*): String = digest(items)
-
- def digest(items: Seq[String]): String = if (key.nonEmpty) {
- mac synchronized {
- items.map(_.getBytes("UTF-8")).foreach(mac.update)
- mac.doFinal().map("%02x" format _).mkString
- }
- } else ""
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/security/SignatureCheckerActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/security/SignatureCheckerActor.scala b/communication/src/main/scala/com/ibm/spark/communication/security/SignatureCheckerActor.scala
deleted file mode 100644
index c3fabd7..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/security/SignatureCheckerActor.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.communication.security
-
-import akka.actor.Actor
-import com.ibm.spark.communication.utils.OrderedSupport
-import com.ibm.spark.utils.LogLike
-
-/**
- * Verifies whether or not a kernel message has a valid signature.
- * @param hmac The HMAC to use for signature validation
- */
-class SignatureCheckerActor(
- private val hmac: Hmac
-) extends Actor with LogLike with OrderedSupport {
- override def receive: Receive = {
- case (signature: String, blob: Seq[_]) => withProcessing {
- val stringBlob: Seq[String] = blob.map(_.toString)
- val hmacString = hmac(stringBlob: _*)
- val isValidSignature = hmacString == signature
- logger.trace(s"Signature ${signature} validity checked against " +
- s"hmac ${hmacString} with outcome ${isValidSignature}")
- sender ! isValidSignature
- }
- }
-
- /**
- * Defines the types that will be stashed by {@link #waiting() waiting}
- * while the Actor is in processing state.
- * @return
- */
- override def orderedTypes(): Seq[Class[_]] = Seq(classOf[(String, Seq[_])])
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/security/SignatureManagerActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/security/SignatureManagerActor.scala b/communication/src/main/scala/com/ibm/spark/communication/security/SignatureManagerActor.scala
deleted file mode 100644
index f381644..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/security/SignatureManagerActor.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.communication.security
-
-import akka.actor.{Props, ActorRef, Actor}
-import akka.util.Timeout
-import com.ibm.spark.communication.utils.OrderedSupport
-import com.ibm.spark.kernel.protocol.v5.KernelMessage
-import com.ibm.spark.utils.LogLike
-
-import scala.concurrent.duration._
-import akka.pattern.ask
-import akka.pattern.pipe
-
-class SignatureManagerActor(
- key: String, scheme: String
-) extends Actor with LogLike with OrderedSupport {
- private val hmac = Hmac(key, HmacAlgorithm(scheme))
-
- def this(key: String) = this(key, HmacAlgorithm.SHA256.toString)
-
- // NOTE: Required to provide the execution context for futures with akka
- import context._
-
- // NOTE: Required for ask (?) to function... maybe can define elsewhere?
- implicit val timeout = Timeout(5.seconds)
-
- //
- // List of child actors that the signature manager contains
- //
- private var signatureChecker: ActorRef = _
- private var signatureProducer: ActorRef = _
-
- /**
- * Initializes all child actors performing tasks for the interpreter.
- */
- override def preStart() = {
- signatureChecker = context.actorOf(
- Props(classOf[SignatureCheckerActor], hmac),
- name = SignatureManagerChildActorType.SignatureChecker.toString
- )
- signatureProducer = context.actorOf(
- Props(classOf[SignatureProducerActor], hmac),
- name = SignatureManagerChildActorType.SignatureProducer.toString
- )
- }
-
- override def receive: Receive = {
- // Check blob strings for matching digest
- case (signature: String, blob: Seq[_]) =>
- startProcessing()
- val destActor = sender()
- val sigFuture = signatureChecker ? ((signature, blob))
-
- sigFuture foreach { case isValid =>
- destActor ! isValid
- finishedProcessing()
- }
-
- case message: KernelMessage =>
- startProcessing()
- val destActor = sender()
-
- // TODO: Proper error handling for possible exception from mapTo
- val sigFuture = (signatureProducer ? message).mapTo[String].map(
- result => message.copy(signature = result)
- )
-
- sigFuture foreach { case kernelMessage =>
- destActor ! kernelMessage
- finishedProcessing()
- }
- }
-
- /**
- * Defines the types that will be stashed by {@link #waiting() waiting}
- * while the Actor is in processing state.
- * @return
- */
- override def orderedTypes(): Seq[Class[_]] = Seq(
- classOf[(String, Seq[_])],
- classOf[KernelMessage]
- )
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/security/SignatureProducerActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/security/SignatureProducerActor.scala b/communication/src/main/scala/com/ibm/spark/communication/security/SignatureProducerActor.scala
deleted file mode 100644
index 36b5688..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/security/SignatureProducerActor.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.communication.security
-
-import akka.actor.Actor
-import com.ibm.spark.communication.utils.OrderedSupport
-import com.ibm.spark.kernel.protocol.v5.KernelMessage
-import com.ibm.spark.utils.LogLike
-import play.api.libs.json.Json
-
-/**
- * Constructs a signature from any kernel message received.
- * @param hmac The HMAC to use for signature construction
- */
-class SignatureProducerActor(
- private val hmac: Hmac
-) extends Actor with LogLike with OrderedSupport {
- override def receive: Receive = {
- case message: KernelMessage => withProcessing {
- val signature = hmac(
- Json.stringify(Json.toJson(message.header)),
- Json.stringify(Json.toJson(message.parentHeader)),
- Json.stringify(Json.toJson(message.metadata)),
- message.contentString
- )
- sender ! signature
- }
- }
-
- /**
- * Defines the types that will be stashed by {@link #waiting() waiting}
- * while the Actor is in processing state.
- * @return
- */
- override def orderedTypes(): Seq[Class[_]] = Seq(classOf[KernelMessage])
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/security/package.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/security/package.scala b/communication/src/main/scala/com/ibm/spark/communication/security/package.scala
deleted file mode 100644
index 5c5b1d5..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/security/package.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.communication
-
-package object security {
- object SecurityActorType extends Enumeration {
- type SecurityActorType = Value
-
- val SignatureManager = Value("signature_manager")
- }
-
- object SignatureManagerChildActorType extends Enumeration {
- type SignatureManagerChildActorType = Value
-
- val SignatureChecker = Value("signature_checker")
- val SignatureProducer = Value("signature_producer")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/JeroMQSocket.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/JeroMQSocket.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/JeroMQSocket.scala
deleted file mode 100644
index c95eb69..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/socket/JeroMQSocket.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.communication.socket
-
-import org.zeromq.ZMsg
-
-/**
- * Represents a socket implemented using JeroMQ.
- *
- * @param runnable The underlying ZeroMQ socket runnable to use for the thread
- * managed by this socket
- */
-class JeroMQSocket(private val runnable: ZeroMQSocketRunnable)
- extends SocketLike {
-
- private val socketThread = new Thread(runnable)
- socketThread.start()
-
- /**
- * Sends a message using this socket.
- *
- * @param message The message to send
- */
- override def send(message: String*): Unit = {
- assert(isAlive, "Socket is not alive to be able to send messages!")
-
- runnable.offer(ZMsg.newStringMsg(message: _*))
- }
-
- /**
- * Closes the socket by closing the runnable and waiting for the underlying
- * thread to close.
- */
- override def close(): Unit = {
- runnable.close()
- socketThread.join()
- }
-
- /**
- * Indicates whether or not this socket is alive.
- *
- * @return True if alive (thread running), otherwise false
- */
- override def isAlive: Boolean = socketThread.isAlive
-
- /**
- * Indicates whether or not this socket is ready to send/receive messages.
- *
- * @return True if ready (runnable processing messages), otherwise false
- */
- override def isReady: Boolean = runnable.isProcessing
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/PubSocketRunnable.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/PubSocketRunnable.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/PubSocketRunnable.scala
deleted file mode 100644
index 43f3f3d..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/socket/PubSocketRunnable.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.communication.socket
-
-import org.zeromq.ZMQ.{Socket, Context}
-
-/**
- * Represents the runnable component of a socket specifically targeted towards
- * publish sockets. No incoming messages are processed.
- *
- * @param context The ZMQ context to use with this runnable to create a socket
- * @param socketOptions The options to use when creating the socket
- */
-class PubSocketRunnable(
- private val context: Context,
- private val socketOptions: SocketOption*
-) extends ZeroMQSocketRunnable(
- context,
- PubSocket,
- None,
- socketOptions: _*
-) {
- /** Does nothing. */
- override protected def processNextInboundMessage(
- socket: Socket,
- flags: Int
- ): Unit = {}
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/ReqSocketRunnable.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/ReqSocketRunnable.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/ReqSocketRunnable.scala
deleted file mode 100644
index 0aa527f..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/socket/ReqSocketRunnable.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.communication.socket
-
-import org.zeromq.ZMQ.{Socket, Context}
-
-/**
- * Represents the runnable component of a socket that processes messages and
- * sends messages placed on an outbound queue. Targeted towards the request
- * socket, this runnable ensures that a message is sent out first and then a
- * response is received before sending the next message.
- *
- * @param context The ZMQ context to use with this runnable to create a socket
- * @param inboundMessageCallback The callback to invoke when receiving a message
- * on the socket created
- * @param socketOptions The options to use when creating the socket
- */
-class ReqSocketRunnable(
- private val context: Context,
- private val inboundMessageCallback: Option[(Seq[String]) => Unit],
- private val socketOptions: SocketOption*
-) extends ZeroMQSocketRunnable(
- context,
- ReqSocket,
- inboundMessageCallback,
- socketOptions: _*
-) {
- /** Does nothing. */
- override protected def processNextInboundMessage(
- socket: Socket,
- flags: Int
- ): Unit = {}
-
- /**
- * Sends a message and then waits for an incoming response (if a message
- * was sent from the outbound queue).
- *
- * @param socket The socket to use when sending the message
- *
- * @return True if a message was sent, otherwise false
- */
- override protected def processNextOutboundMessage(socket: Socket): Boolean = {
- val shouldReceiveMessage = super.processNextOutboundMessage(socket)
-
- if (shouldReceiveMessage) {
- super.processNextInboundMessage(socket, 0)
- }
-
- shouldReceiveMessage
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/SocketLike.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketLike.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/SocketLike.scala
deleted file mode 100644
index 9bf752d..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketLike.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.communication.socket
-
-/**
- * Represents a generic interface for socket communication.
- */
-trait SocketLike {
- /**
- * Sends a message through the socket if alive.
- *
- * @throws AssertionError If the socket is not alive when attempting to send
- * a message
- *
- * @param message The message to send
- */
- def send(message: String*): Unit
-
- /**
- * Closes the socket, marking it no longer able to process or send messages.
- */
- def close(): Unit
-
- /**
- * Returns whether or not the socket is alive (processing new messages and
- * capable of sending out messages).
- *
- * @return True if alive, otherwise false
- */
- def isAlive: Boolean
-
- /**
- * Returns whether or not the socket is ready to send/receive messages.
- *
- * @return True if ready, otherwise false
- */
- def isReady: Boolean
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/SocketOption.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketOption.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/SocketOption.scala
deleted file mode 100644
index 7685239..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketOption.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.communication.socket
-
-import org.zeromq.ZMQ
-
-/** Represents an option to provide to a socket. */
-sealed trait SocketOption
-
-/**
- * Represents the linger option used to communicate the millisecond duration
- * to continue processing messages after the socket has been told to close.
- *
- * @note Provide -1 as the duration to wait until all messages are processed
- *
- * @param milliseconds The duration in milliseconds
- */
-case class Linger(milliseconds: Int) extends SocketOption
-
-/**
- * Represents the subscribe option used to filter messages coming into a
- * socket subscribing to a publisher. Uses the provided byte prefix to filter
- * incoming messages.
- *
- * @param topic The array of bytes to use as a filter based on the
- * bytes at the beginning of incoming messages
- */
-case class Subscribe(topic: Array[Byte]) extends SocketOption
-object Subscribe {
- val all = Subscribe(ZMQ.SUBSCRIPTION_ALL)
-}
-
-/**
- * Represents the identity option used to identify the socket.
- *
- * @param identity The identity to use with the socket
- */
-case class Identity(identity: Array[Byte]) extends SocketOption
-
-/**
- * Represents the bind option used to tell the socket what address to bind to.
- *
- * @param address The address for the socket to use
- */
-case class Bind(address: String) extends SocketOption
-
-/**
- * Represents the connect option used to tell the socket what address to
- * connect to.
- *
- * @param address The address for the socket to use
- */
-case class Connect(address: String) extends SocketOption
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/SocketRunnable.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketRunnable.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/SocketRunnable.scala
deleted file mode 100644
index 6c033cc..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketRunnable.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.communication.socket
-
-import java.util.concurrent.ConcurrentLinkedQueue
-
-/**
- * Represents the interface for a runnable used to send and receive messages
- * for a socket.
- *
- * @param inboundMessageCallback The callback to use when receiving a message
- * through this runnable
- */
-abstract class SocketRunnable[T](
- private val inboundMessageCallback: Option[(Seq[String]) => Unit]
-) extends Runnable {
-
- /** The collection of messages to be sent out through the socket. */
- val outboundMessages: ConcurrentLinkedQueue[T] =
- new ConcurrentLinkedQueue[T]()
-
- /**
- * Attempts to add a new message to the outbound queue to be sent out.
- *
- * @param message The message to add to the queue
- *
- * @return True if successfully queued the message, otherwise false
- */
- def offer(message: T): Boolean = outboundMessages.offer(message)
-
- /**
- * Indicates whether or not the runnable is processing messages (both
- * sending and receiving).
- *
- * @return True if processing, otherwise false
- */
- def isProcessing: Boolean
-
- /**
- * Closes the runnable such that it no longer processes messages and also
- * closes the underlying socket associated with the runnable.
- */
- def close(): Unit
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/SocketType.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketType.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/SocketType.scala
deleted file mode 100644
index 7062d85..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketType.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.communication.socket
-
-import org.zeromq.ZMQ
-
-/**
- * Represents the type option used to indicate the type of socket to create.
- *
- * @param `type` The type as an integer
- */
-sealed class SocketType(val `type`: Int)
-
-/** Represents a publish socket. */
-case object PubSocket extends SocketType(ZMQ.PUB)
-
-/** Represents a subscribe socket. */
-case object SubSocket extends SocketType(ZMQ.SUB)
-
-/** Represents a reply socket. */
-case object RepSocket extends SocketType(ZMQ.REP)
-
-/** Represents a request socket. */
-case object ReqSocket extends SocketType(ZMQ.REQ)
-
-/** Represents a router socket. */
-case object RouterSocket extends SocketType(ZMQ.ROUTER)
-
-/** Represents a dealer socket. */
-case object DealerSocket extends SocketType(ZMQ.DEALER)
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/ZeroMQSocketRunnable.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/ZeroMQSocketRunnable.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/ZeroMQSocketRunnable.scala
deleted file mode 100644
index 6fee716..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/socket/ZeroMQSocketRunnable.scala
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.communication.socket
-
-import com.ibm.spark.utils.LogLike
-import org.zeromq.{ZMsg, ZMQ}
-import org.zeromq.ZMQ.Context
-
-import scala.collection.JavaConverters._
-import scala.util.Try
-
-/**
- * Represents the runnable component of a socket that processes messages and
- * sends messages placed on an outbound queue.
- *
- * @param context The ZMQ context to use with this runnable to create a socket
- * @param socketType The type of socket to create
- * @param inboundMessageCallback The callback to invoke when receiving a message
- * on the socket created
- * @param socketOptions The options to use when creating the socket
- */
-class ZeroMQSocketRunnable(
- private val context: Context,
- private val socketType: SocketType,
- private val inboundMessageCallback: Option[(Seq[String]) => Unit],
- private val socketOptions: SocketOption*
-) extends SocketRunnable[ZMsg](inboundMessageCallback)
- with LogLike {
- require(socketOptions.count {
- case _: Bind => true
- case _: Connect => true
- case _ => false
- } == 1, "ZeroMQ socket needs exactly one bind or connect!")
-
- @volatile private var notClosed: Boolean = true
- @volatile private var _isProcessing: Boolean = false
-
- /**
- * Indicates the processing state of this runnable.
- *
- * @return True if processing messages, otherwise false
- */
- override def isProcessing: Boolean = _isProcessing
-
- /**
- * Processes the provided options, performing associated actions on the
- * specified socket.
- *
- * @param socket The socket to apply actions on
- */
- protected def processOptions(socket: ZMQ.Socket): Unit = {
- val socketOptionsString = socketOptions.map("\n- " + _.toString).mkString("")
- logger.trace(
- s"Processing options for socket $socketType: $socketOptionsString"
- )
-
- // Split our options based on connection (bind/connect) and everything else
- val (connectionOptions, otherOptions) = socketOptions.partition {
- case Bind(_) | Connect(_) => true
- case _ => false
- }
-
- // Apply non-connection options first since some (like identity) must be
- // run before the socket does a bind/connect
- otherOptions.foreach {
- case Linger(milliseconds) => socket.setLinger(milliseconds)
- case Subscribe(topic) => socket.subscribe(topic)
- case Identity(identity) => socket.setIdentity(identity)
- case option => logger.warn(s"Unknown option: $option")
- }
-
- // Perform our bind or connect
- connectionOptions.foreach {
- case Bind(address) => socket.bind(address)
- case Connect(address) => socket.connect(address)
- case option =>
- logger.warn(s"Unknown connection option: $option")
- }
-
- _isProcessing = true
- }
-
- /**
- * Sends the next outbound message from the outbound message queue.
- *
- * @param socket The socket to use when sending the message
- *
- * @return True if a message was sent, otherwise false
- */
- protected def processNextOutboundMessage(socket: ZMQ.Socket): Boolean = {
- val message = Option(outboundMessages.poll())
-
- message.foreach(_.send(socket))
-
- message.nonEmpty
- }
-
- /**
- * Retrieves the next inbound message (if available) and invokes the
- * inbound message callback.
- *
- * @param socket The socket whose next incoming message to retrieve
- */
- protected def processNextInboundMessage(
- socket: ZMQ.Socket,
- flags: Int = ZMQ.DONTWAIT
- ): Unit = {
- Option(ZMsg.recvMsg(socket, flags)).foreach(zMsg => {
- inboundMessageCallback.foreach(_(zMsg.asScala.toSeq
- .map(zFrame => new String(zFrame.getData, ZMQ.CHARSET))
- ))
- })
- }
-
- /**
- * Creates a new instance of a ZMQ Socket.
- *
- * @param zmqContext The context to use to create the socket
- * @param socketType The type of socket to create
- *
- * @return The new ZMQ.Socket instance
- */
- protected def newZmqSocket(zmqContext: ZMQ.Context, socketType: Int) =
- zmqContext.socket(socketType)
-
- override def run(): Unit = {
- val socket = newZmqSocket(context, socketType.`type`)//context.socket(socketType.`type`)
-
- try {
- processOptions(socket)
-
- while (notClosed) {
- Try(processNextOutboundMessage(socket)).failed.foreach(
- logger.error("Failed to send next outgoing message!", _: Throwable)
- )
- Try(processNextInboundMessage(socket)).failed.foreach(
- logger.error("Failed to retrieve next incoming message!", _: Throwable)
- )
- Thread.sleep(1)
- }
- } catch {
- case ex: Exception =>
- logger.error("Unexpected exception in 0mq socket runnable!", ex)
- } finally {
- try{
- socket.close()
- } catch {
- case ex: Exception =>
- logger.error("Failed to close socket!", _: Throwable)
- }
- }
- }
-
- /**
- * Marks the runnable as closed such that it eventually stops processing
- * messages and closes the socket.
- *
- * @throws AssertionError If the runnable is not processing messages or has
- * already been closed
- */
- override def close(): Unit = {
- assert(_isProcessing && notClosed,
- "Runnable is not processing or is closed!")
-
- _isProcessing = false
- notClosed = false
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/utils/OrderedSupport.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/utils/OrderedSupport.scala b/communication/src/main/scala/com/ibm/spark/communication/utils/OrderedSupport.scala
deleted file mode 100644
index 8f41861..0000000
--- a/communication/src/main/scala/com/ibm/spark/communication/utils/OrderedSupport.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.communication.utils
-
-import akka.actor.{Actor, Stash}
-import com.ibm.spark.utils.LogLike
-
-/**
- * A trait to enforce ordered processing for messages of particular types.
- */
-trait OrderedSupport extends Actor with Stash with LogLike {
- /**
- * Executes instead of the default receive when the Actor has begun
- * processing. Stashes incoming messages of particular types, defined by
- * {@link #orderedTypes() orderedTypes} function, for later processing. Uses
- * the default receive method for all other types. Upon receiving a
- * FinishedProcessing message, resumes processing all messages with the
- * default receive.
- * @return
- */
- def waiting : Receive = {
- case FinishedProcessing =>
- context.unbecome()
- unstashAll()
- case aVal: Any if (orderedTypes().contains(aVal.getClass)) =>
- logger.trace(s"Stashing message ${aVal} of type ${aVal.getClass}.")
- stash()
- case aVal: Any =>
- logger.trace(s"Forwarding message ${aVal} of type ${aVal.getClass} " +
- "to default receive.")
- receive(aVal)
- }
-
- /**
- * Suspends the default receive method for types defined by the
- * {@link #orderedTypes() orderedTypes} function.
- */
- def startProcessing(): Unit = {
- logger.debug("Actor is in processing state and will stash messages of " +
- s"types: ${orderedTypes.mkString(" ")}")
- context.become(waiting, discardOld = false)
- }
-
- /**
- * Resumes the default receive method for all message types.
- */
- def finishedProcessing(): Unit = {
- logger.debug("Actor is no longer in processing state.")
- self ! FinishedProcessing
- }
-
- /**
- * Executes a block of code, wrapping it in start/finished processing
- * needed for ordered execution.
- *
- * @param block The block to execute
- * @tparam T The return type of the block
- * @return The result of executing the block
- */
- def withProcessing[T](block: => T): T = {
- startProcessing()
- val results = block
- finishedProcessing()
- results
- }
-
- /**
- * Defines the types that will be stashed by {@link #waiting() waiting}
- * while the Actor is in processing state.
- * @return
- */
- def orderedTypes(): Seq[Class[_]]
-
- case object FinishedProcessing
-}