You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lr...@apache.org on 2017/03/16 22:58:00 UTC
incubator-toree git commit: [TOREE-391] Treat zmq ids as bytes
Repository: incubator-toree
Updated Branches:
refs/heads/master 3b6eb2926 -> ca4b86be7
[TOREE-391] Treat zmq ids as bytes
Properly handle zMQ ids as byte arrays
to avoid timeouts during Kernel restarts.
Closes #111
Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/ca4b86be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/ca4b86be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/ca4b86be
Branch: refs/heads/master
Commit: ca4b86be705ce1fd0901e7d4a3dfb9b53067eb0f
Parents: 3b6eb29
Author: Jim Rhyness <jr...@ca.ibm.com>
Authored: Thu Mar 9 20:43:20 2017 -0500
Committer: Luciano Resende <lr...@apache.org>
Committed: Thu Mar 16 15:50:41 2017 -0700
----------------------------------------------------------------------
.../kernel/protocol/v5/client/Utilities.scala | 8 +++----
.../v5/client/socket/IOPubClientSpec.scala | 4 ++--
.../v5/client/socket/ShellClientSpec.scala | 4 ++--
.../toree/communication/SocketManager.scala | 10 ++++----
.../actors/DealerSocketActor.scala | 5 ++--
.../communication/actors/PubSocketActor.scala | 3 +--
.../communication/actors/RepSocketActor.scala | 5 ++--
.../communication/actors/ReqSocketActor.scala | 5 ++--
.../actors/RouterSocketActor.scala | 9 ++++---
.../communication/actors/SubSocketActor.scala | 2 +-
.../communication/socket/JeroMQSocket.scala | 7 ++++--
.../socket/ReqSocketRunnable.scala | 2 +-
.../toree/communication/socket/SocketLike.scala | 2 +-
.../communication/socket/SocketRunnable.scala | 2 +-
.../socket/ZeroMQSocketRunnable.scala | 6 ++---
.../JeroMQSocketIntegrationSpec.scala | 24 +++++++++----------
.../communication/socket/JeroMQSocketSpec.scala | 4 ++--
.../socket/ZeroMQSocketRunnableSpec.scala | 2 +-
.../toree/kernel/api/DisplayMethods.scala | 4 ++--
.../apache/toree/kernel/api/StreamMethods.scala | 2 +-
.../protocol/v5/dispatch/StatusDispatch.scala | 2 +-
.../v5/handler/ExecuteRequestHandler.scala | 4 ++--
.../kernel/protocol/v5/kernel/Utilities.scala | 6 ++---
.../protocol/v5/stream/KernelOutputStream.scala | 2 +-
.../v5/handler/CommInfoRequestHandlerSpec.scala | 6 ++---
.../handler/KernelInfoRequestHandlerSpec.scala | 2 +-
.../protocol/v5/kernel/UtilitiesSpec.scala | 10 ++++----
.../v5/relay/KernelMessageRelaySpec.scala | 8 +++----
.../v5/stream/KernelOuputStreamSpec.scala | 3 ++-
.../toree/kernel/protocol/v5/KMBuilder.scala | 2 +-
.../kernel/protocol/v5/KernelMessage.scala | 25 ++++++++++++++++++--
.../kernel/protocol/v5/KMBuilderSpec.scala | 2 +-
.../toree/kernel/protocol/v5/package.scala | 4 ++--
33 files changed, 103 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/Utilities.scala
----------------------------------------------------------------------
diff --git a/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/Utilities.scala b/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/Utilities.scala
index 274057b..6e653c4 100644
--- a/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/Utilities.scala
+++ b/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/Utilities.scala
@@ -58,9 +58,9 @@ object Utilities extends LogLike {
val delimiterIndex: Int =
message.frames.indexOf(ByteString("<IDS|MSG>".getBytes))
// TODO Handle the case where there is no delimiter
- val ids: Seq[String] =
+ val ids: Seq[Array[Byte]] =
message.frames.take(delimiterIndex).map(
- (byteString : ByteString) => { new String(byteString.toArray) }
+ (byteString : ByteString) => { byteString.toArray }
)
val header = Json.parse(message.frames(delimiterIndex + 2)).as[Header]
val parentHeader = Json.parse(message.frames(delimiterIndex + 3)).validate[ParentHeader].fold[ParentHeader](
@@ -80,7 +80,7 @@ object Utilities extends LogLike {
implicit def KernelMessageToZMQMessage(kernelMessage : KernelMessage) : ZMQMessage = {
val frames: scala.collection.mutable.ListBuffer[ByteString] = scala.collection.mutable.ListBuffer()
- kernelMessage.ids.map((id : String) => frames += id )
+ kernelMessage.ids.map((id : Array[Byte]) => frames += ByteString.apply(id) )
frames += "<IDS|MSG>"
frames += kernelMessage.signature
frames += Json.toJson(kernelMessage.header).toString()
@@ -106,7 +106,7 @@ object Utilities extends LogLike {
val header = Header(
id, "spark", sessionId, MessageType.Incoming.ExecuteRequest.toString, "5.0")
- KMBuilder().withIds(Seq[String]()).withSignature("").withHeader(header)
+ KMBuilder().withIds(Seq[Array[Byte]]()).withSignature("").withHeader(header)
.withParentHeader(HeaderBuilder.empty).withContentString(message).build
}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/IOPubClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/IOPubClientSpec.scala b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/IOPubClientSpec.scala
index 142a3a8..98db662 100644
--- a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/IOPubClientSpec.scala
+++ b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/IOPubClientSpec.scala
@@ -238,7 +238,7 @@ class IOPubClientSpec extends TestKit(ActorSystem(
MessageType.Incoming.ExecuteRequest.toString, "5.0")
val kernelMessage = new KernelMessage(
- Seq[String](),
+ Seq[Array[Byte]](),
"",
header,
parentHeader,
@@ -272,7 +272,7 @@ class IOPubClientSpec extends TestKit(ActorSystem(
MessageType.Outgoing.Stream.toString, "5.0")
val kernelMessage = new KernelMessage(
- Seq[String](),
+ Seq[Array[Byte]](),
"",
header,
null,
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/ShellClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/ShellClientSpec.scala b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/ShellClientSpec.scala
index 382304e..72cc9ee 100644
--- a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/ShellClientSpec.scala
+++ b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/ShellClientSpec.scala
@@ -63,7 +63,7 @@ class ShellClientSpec extends TestKit(ActorSystem("ShellActorSpec"))
"5.0"
)
val kernelMessage = KernelMessage(
- Seq[String](), "",
+ Seq[Array[Byte]](), "",
header, HeaderBuilder.empty,
Metadata(), Json.toJson(request).toString
)
@@ -77,4 +77,4 @@ class ShellClientSpec extends TestKit(ActorSystem("ShellActorSpec"))
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/SocketManager.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/SocketManager.scala b/communication/src/main/scala/org/apache/toree/communication/SocketManager.scala
index c660848..0422334 100644
--- a/communication/src/main/scala/org/apache/toree/communication/SocketManager.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/SocketManager.scala
@@ -75,7 +75,7 @@ class SocketManager {
*/
def newReqSocket(
address: String,
- inboundMessageCallback: (Seq[String]) => Unit
+ inboundMessageCallback: (Seq[Array[Byte]]) => Unit
): SocketLike = withNewContext{ ctx =>
new JeroMQSocket(new ReqSocketRunnable(
ctx,
@@ -95,7 +95,7 @@ class SocketManager {
*/
def newRepSocket(
address: String,
- inboundMessageCallback: (Seq[String]) => Unit
+ inboundMessageCallback: (Seq[Array[Byte]]) => Unit
): SocketLike = withNewContext{ ctx =>
new JeroMQSocket(new ZeroMQSocketRunnable(
ctx,
@@ -133,7 +133,7 @@ class SocketManager {
*/
def newSubSocket(
address: String,
- inboundMessageCallback: (Seq[String]) => Unit
+ inboundMessageCallback: (Seq[Array[Byte]]) => Unit
): SocketLike = withNewContext { ctx =>
new JeroMQSocket(new ZeroMQSocketRunnable(
ctx,
@@ -155,7 +155,7 @@ class SocketManager {
*/
def newRouterSocket(
address: String,
- inboundMessageCallback: (Seq[String]) => Unit
+ inboundMessageCallback: (Seq[Array[Byte]]) => Unit
): SocketLike = withNewContext { ctx =>
new JeroMQSocket(new ZeroMQSocketRunnable(
ctx,
@@ -176,7 +176,7 @@ class SocketManager {
*/
def newDealerSocket(
address: String,
- inboundMessageCallback: (Seq[String]) => Unit,
+ inboundMessageCallback: (Seq[Array[Byte]]) => Unit,
identity: String = UUID.randomUUID().toString
): SocketLike = withNewContext{ ctx =>
new JeroMQSocket(new ZeroMQSocketRunnable(
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/actors/DealerSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/actors/DealerSocketActor.scala b/communication/src/main/scala/org/apache/toree/communication/actors/DealerSocketActor.scala
index c5b4575..ad0f7a9 100644
--- a/communication/src/main/scala/org/apache/toree/communication/actors/DealerSocketActor.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/actors/DealerSocketActor.scala
@@ -33,7 +33,7 @@ class DealerSocketActor(connection: String, listener: ActorRef)
{
logger.debug(s"Initializing dealer socket actor for $connection")
private val manager: SocketManager = new SocketManager
- private val socket = manager.newDealerSocket(connection, (message: Seq[String]) => {
+ private val socket = manager.newDealerSocket(connection, (message: Seq[Array[Byte]]) => {
listener ! ZMQMessage(message.map(ByteString.apply): _*)
})
@@ -43,8 +43,7 @@ class DealerSocketActor(connection: String, listener: ActorRef)
override def receive: Actor.Receive = {
case zmqMessage: ZMQMessage =>
- val frames = zmqMessage.frames.map(byteString =>
- new String(byteString.toArray, ZMQ.CHARSET))
+ val frames = zmqMessage.frames.map(byteString => byteString.toArray )
socket.send(frames: _*)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/actors/PubSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/actors/PubSocketActor.scala b/communication/src/main/scala/org/apache/toree/communication/actors/PubSocketActor.scala
index e0ca2f8..3a7c770 100644
--- a/communication/src/main/scala/org/apache/toree/communication/actors/PubSocketActor.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/actors/PubSocketActor.scala
@@ -45,8 +45,7 @@ class PubSocketActor(connection: String)
override def receive: Actor.Receive = {
case zmqMessage: ZMQMessage => withProcessing {
- val frames = zmqMessage.frames.map(byteString =>
- new String(byteString.toArray, ZMQ.CHARSET))
+ val frames = zmqMessage.frames.map(byteString => byteString.toArray )
socket.send(frames: _*)
}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/actors/RepSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/actors/RepSocketActor.scala b/communication/src/main/scala/org/apache/toree/communication/actors/RepSocketActor.scala
index 6a3d1ef..c406f15 100644
--- a/communication/src/main/scala/org/apache/toree/communication/actors/RepSocketActor.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/actors/RepSocketActor.scala
@@ -33,7 +33,7 @@ class RepSocketActor(connection: String, listener: ActorRef)
{
logger.debug(s"Initializing reply socket actor for $connection")
private val manager: SocketManager = new SocketManager
- private val socket = manager.newRepSocket(connection, (message: Seq[String]) => {
+ private val socket = manager.newRepSocket(connection, (message: Seq[Array[Byte]]) => {
listener ! ZMQMessage(message.map(ByteString.apply): _*)
})
@@ -43,8 +43,7 @@ class RepSocketActor(connection: String, listener: ActorRef)
override def receive: Actor.Receive = {
case zmqMessage: ZMQMessage =>
- val frames = zmqMessage.frames.map(byteString =>
- new String(byteString.toArray, ZMQ.CHARSET))
+ val frames = zmqMessage.frames.map(byteString => byteString.toArray )
socket.send(frames: _*)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/actors/ReqSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/actors/ReqSocketActor.scala b/communication/src/main/scala/org/apache/toree/communication/actors/ReqSocketActor.scala
index 71a99a5..ec22a92 100644
--- a/communication/src/main/scala/org/apache/toree/communication/actors/ReqSocketActor.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/actors/ReqSocketActor.scala
@@ -33,7 +33,7 @@ class ReqSocketActor(connection: String, listener: ActorRef)
{
logger.debug(s"Initializing request socket actor for $connection")
private val manager: SocketManager = new SocketManager
- private val socket = manager.newReqSocket(connection, (message: Seq[String]) => {
+ private val socket = manager.newReqSocket(connection, (message: Seq[Array[Byte]]) => {
listener ! ZMQMessage(message.map(ByteString.apply): _*)
})
@@ -43,8 +43,7 @@ class ReqSocketActor(connection: String, listener: ActorRef)
override def receive: Actor.Receive = {
case zmqMessage: ZMQMessage =>
- val frames = zmqMessage.frames.map(byteString =>
- new String(byteString.toArray, ZMQ.CHARSET))
+ val frames = zmqMessage.frames.map(byteString => byteString.toArray )
socket.send(frames: _*)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/actors/RouterSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/actors/RouterSocketActor.scala b/communication/src/main/scala/org/apache/toree/communication/actors/RouterSocketActor.scala
index 67cbd80..2501909 100644
--- a/communication/src/main/scala/org/apache/toree/communication/actors/RouterSocketActor.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/actors/RouterSocketActor.scala
@@ -33,7 +33,7 @@ class RouterSocketActor(connection: String, listener: ActorRef)
{
logger.debug(s"Initializing router socket actor for $connection")
private val manager: SocketManager = new SocketManager
- private val socket = manager.newRouterSocket(connection, (message: Seq[String]) => {
+ private val socket = manager.newRouterSocket(connection, (message: Seq[Array[Byte]]) => {
listener ! ZMQMessage(message.map(ByteString.apply): _*)
})
@@ -43,8 +43,7 @@ class RouterSocketActor(connection: String, listener: ActorRef)
override def receive: Actor.Receive = {
case zmqMessage: ZMQMessage =>
- val frames = zmqMessage.frames.map(byteString =>
- new String(byteString.toArray, ZMQ.CHARSET))
- socket.send(frames: _*)
+ val frames = zmqMessage.frames.map(byteString => byteString.toArray )
+ socket.send(frames: _*)
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/actors/SubSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/actors/SubSocketActor.scala b/communication/src/main/scala/org/apache/toree/communication/actors/SubSocketActor.scala
index 7c39fb5..0c82823 100644
--- a/communication/src/main/scala/org/apache/toree/communication/actors/SubSocketActor.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/actors/SubSocketActor.scala
@@ -32,7 +32,7 @@ class SubSocketActor(connection: String, listener: ActorRef)
{
logger.debug(s"Initializing subscribe socket actor for $connection")
private val manager: SocketManager = new SocketManager
- private val socket = manager.newSubSocket(connection, (message: Seq[String]) => {
+ private val socket = manager.newSubSocket(connection, (message: Seq[Array[Byte]]) => {
listener ! ZMQMessage(message.map(ByteString.apply): _*)
})
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/socket/JeroMQSocket.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/socket/JeroMQSocket.scala b/communication/src/main/scala/org/apache/toree/communication/socket/JeroMQSocket.scala
index e8942de..236945b 100644
--- a/communication/src/main/scala/org/apache/toree/communication/socket/JeroMQSocket.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/socket/JeroMQSocket.scala
@@ -35,10 +35,13 @@ class JeroMQSocket(private val runnable: ZeroMQSocketRunnable)
*
* @param message The message to send
*/
- override def send(message: String*): Unit = {
+ override def send(message: Array[Byte]*): Unit = {
assert(isAlive, "Socket is not alive to be able to send messages!")
- runnable.offer(ZMsg.newStringMsg(message: _*))
+ val msg = new ZMsg()
+ for( frame <- message ) msg.add( frame )
+
+ runnable.offer( msg )
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/socket/ReqSocketRunnable.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/socket/ReqSocketRunnable.scala b/communication/src/main/scala/org/apache/toree/communication/socket/ReqSocketRunnable.scala
index 81355fa..cd3d85e 100644
--- a/communication/src/main/scala/org/apache/toree/communication/socket/ReqSocketRunnable.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/socket/ReqSocketRunnable.scala
@@ -31,7 +31,7 @@ import org.zeromq.ZMQ.{Socket, Context}
*/
class ReqSocketRunnable(
private val context: Context,
- private val inboundMessageCallback: Option[(Seq[String]) => Unit],
+ private val inboundMessageCallback: Option[(Seq[Array[Byte]]) => Unit],
private val socketOptions: SocketOption*
) extends ZeroMQSocketRunnable(
context,
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/socket/SocketLike.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/socket/SocketLike.scala b/communication/src/main/scala/org/apache/toree/communication/socket/SocketLike.scala
index 504ad32..d5b6d31 100644
--- a/communication/src/main/scala/org/apache/toree/communication/socket/SocketLike.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/socket/SocketLike.scala
@@ -28,7 +28,7 @@ trait SocketLike {
*
* @param message The message to send
*/
- def send(message: String*): Unit
+ def send(message: Array[Byte]*): Unit
/**
* Closes the socket, marking it no longer able to process or send messages.
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/socket/SocketRunnable.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/socket/SocketRunnable.scala b/communication/src/main/scala/org/apache/toree/communication/socket/SocketRunnable.scala
index ee43bc7..8701e04 100644
--- a/communication/src/main/scala/org/apache/toree/communication/socket/SocketRunnable.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/socket/SocketRunnable.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
* through this runnable
*/
abstract class SocketRunnable[T](
- private val inboundMessageCallback: Option[(Seq[String]) => Unit]
+ private val inboundMessageCallback: Option[(Seq[Array[Byte]]) => Unit]
) extends Runnable {
/** The collection of messages to be sent out through the socket. */
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnable.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnable.scala b/communication/src/main/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnable.scala
index 377e918..0464d57 100644
--- a/communication/src/main/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnable.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnable.scala
@@ -36,7 +36,7 @@ import scala.util.Try
class ZeroMQSocketRunnable(
private val context: Context,
private val socketType: SocketType,
- private val inboundMessageCallback: Option[(Seq[String]) => Unit],
+ private val inboundMessageCallback: Option[(Seq[Array[Byte]]) => Unit],
private val socketOptions: SocketOption*
) extends SocketRunnable[ZMsg](inboundMessageCallback)
with LogLike {
@@ -120,8 +120,8 @@ class ZeroMQSocketRunnable(
flags: Int = ZMQ.DONTWAIT
): Unit = {
Option(ZMsg.recvMsg(socket, flags)).foreach(zMsg => {
- inboundMessageCallback.foreach(_(zMsg.asScala.toSeq
- .map(zFrame => new String(zFrame.getData, ZMQ.CHARSET))
+ inboundMessageCallback.foreach(_(zMsg.asScala.toSeq
+ .map(zFrame => zFrame.getData)
))
})
}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/test/scala/integration/JeroMQSocketIntegrationSpec.scala
----------------------------------------------------------------------
diff --git a/communication/src/test/scala/integration/JeroMQSocketIntegrationSpec.scala b/communication/src/test/scala/integration/JeroMQSocketIntegrationSpec.scala
index 6c2e311..2cd5b0b 100644
--- a/communication/src/test/scala/integration/JeroMQSocketIntegrationSpec.scala
+++ b/communication/src/test/scala/integration/JeroMQSocketIntegrationSpec.scala
@@ -47,8 +47,8 @@ class JeroMQSocketIntegrationSpec extends FunSpec
it("should be able to communicate") {
val address =s"inproc://${this.hashCode()}"
- val replyMessages = new ConcurrentLinkedDeque[Seq[String]]()
- val replyCallback: (Seq[String]) => Unit = { msg: Seq[String] =>
+ val replyMessages = new ConcurrentLinkedDeque[Seq[Array[Byte]]]()
+ val replyCallback: (Seq[Array[Byte]]) => Unit = { msg: Seq[Array[Byte]] =>
replyMessages.offer(msg)
}
val reply = socketManager.newRouterSocket(
@@ -60,8 +60,8 @@ class JeroMQSocketIntegrationSpec extends FunSpec
reply.isReady should be (true)
}
- val requestMessages = new ConcurrentLinkedDeque[Seq[String]]()
- val requestCallback: (Seq[String]) => Unit = { msg: Seq[String] =>
+ val requestMessages = new ConcurrentLinkedDeque[Seq[Array[Byte]]]()
+ val requestCallback: (Seq[Array[Byte]]) => Unit = { msg: Seq[Array[Byte]] =>
requestMessages.offer(msg)
}
val request = socketManager.newDealerSocket(
@@ -73,7 +73,7 @@ class JeroMQSocketIntegrationSpec extends FunSpec
request.isReady should be (true)
}
- request.send("Message from the request to the reply")
+ request.send("Message from the request to the reply".getBytes)
eventually {
replyMessages.size() should be(1)
@@ -88,8 +88,8 @@ class JeroMQSocketIntegrationSpec extends FunSpec
it("should be able to communicate"){
val address =s"inproc://${this.hashCode()}"
- val routerMessages = new ConcurrentLinkedDeque[Seq[String]]()
- val routerCallback: (Seq[String]) => Unit = { msg: Seq[String] =>
+ val routerMessages = new ConcurrentLinkedDeque[Seq[Array[Byte]]]()
+ val routerCallback: (Seq[Array[Byte]]) => Unit = { msg: Seq[Array[Byte]] =>
routerMessages.offer(msg)
}
val router = socketManager.newRouterSocket(
@@ -104,14 +104,14 @@ class JeroMQSocketIntegrationSpec extends FunSpec
val dealer = socketManager.newDealerSocket(
address,
- (_: Seq[String]) => {}
+ (_: Seq[Array[Byte]]) => {}
)
eventually {
dealer.isReady should be (true)
}
- dealer.send("Message from the dealer to the router")
+ dealer.send("Message from the dealer to the router".getBytes)
eventually {
routerMessages.size() should be(1)
@@ -135,8 +135,8 @@ class JeroMQSocketIntegrationSpec extends FunSpec
publisher.isReady should be (true)
}
- val subscriberMessages = new ConcurrentLinkedDeque[Seq[String]]()
- val subscriberCallback: (Seq[String]) => Unit = { msg: Seq[String] =>
+ val subscriberMessages = new ConcurrentLinkedDeque[Seq[Array[Byte]]]()
+ val subscriberCallback: (Seq[Array[Byte]]) => Unit = { msg: Seq[Array[Byte]] =>
subscriberMessages.offer(msg)
}
val subscriber = socketManager.newSubSocket(
@@ -148,7 +148,7 @@ class JeroMQSocketIntegrationSpec extends FunSpec
subscriber.isReady should be (true)
}
- publisher.send("Message form the publisher to the subscriber")
+ publisher.send("Message form the publisher to the subscriber".getBytes)
eventually {
subscriberMessages.size() should be(1)
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/test/scala/org/apache/toree/communication/socket/JeroMQSocketSpec.scala
----------------------------------------------------------------------
diff --git a/communication/src/test/scala/org/apache/toree/communication/socket/JeroMQSocketSpec.scala b/communication/src/test/scala/org/apache/toree/communication/socket/JeroMQSocketSpec.scala
index d742a75..078a282 100644
--- a/communication/src/test/scala/org/apache/toree/communication/socket/JeroMQSocketSpec.scala
+++ b/communication/src/test/scala/org/apache/toree/communication/socket/JeroMQSocketSpec.scala
@@ -53,7 +53,7 @@ class JeroMQSocketSpec extends FunSpec with MockitoSugar
val message: String = "Some Message"
val expected = ZMsg.newStringMsg(message)
- socket.send(message)
+ socket.send(message.getBytes)
verify(runnable).offer(expected)
}
@@ -61,7 +61,7 @@ class JeroMQSocketSpec extends FunSpec with MockitoSugar
socket.close()
intercept[AssertionError] {
- socket.send("")
+ socket.send("".getBytes)
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/test/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnableSpec.scala
----------------------------------------------------------------------
diff --git a/communication/src/test/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnableSpec.scala b/communication/src/test/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnableSpec.scala
index d16f7a7..aa0e993 100644
--- a/communication/src/test/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnableSpec.scala
+++ b/communication/src/test/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnableSpec.scala
@@ -42,7 +42,7 @@ class ZeroMQSocketRunnableSpec extends FunSpec with Matchers
private val socket: ZMQ.Socket,
private val context: Context,
private val socketType: SocketType,
- private val inboundMessageCallback: Option[(Seq[String]) => Unit],
+ private val inboundMessageCallback: Option[(Seq[Array[Byte]]) => Unit],
private val socketOptions: SocketOption*
) extends ZeroMQSocketRunnable(
context,
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/kernel/src/main/scala/org/apache/toree/kernel/api/DisplayMethods.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/api/DisplayMethods.scala b/kernel/src/main/scala/org/apache/toree/kernel/api/DisplayMethods.scala
index 9cf5e29..d6b17e1 100644
--- a/kernel/src/main/scala/org/apache/toree/kernel/api/DisplayMethods.scala
+++ b/kernel/src/main/scala/org/apache/toree/kernel/api/DisplayMethods.scala
@@ -38,7 +38,7 @@ class DisplayMethods(
val displayData = v5.content.DisplayData("user", Map(mimeType -> data), Map())
val kernelMessage = kmBuilder
- .withIds(Seq(v5.content.DisplayData.toTypeString))
+ .withIds(Seq(v5.content.DisplayData.toTypeString.getBytes))
.withHeader(v5.content.DisplayData.toTypeString)
.withContentString(displayData).build
@@ -49,7 +49,7 @@ class DisplayMethods(
val clearOutput = v5.content.ClearOutput(wait)
val kernelMessage = kmBuilder
- .withIds(Seq(v5.content.ClearOutput.toTypeString))
+ .withIds(Seq(v5.content.ClearOutput.toTypeString.getBytes))
.withHeader(v5.content.ClearOutput.toTypeString)
.withContentString(clearOutput).build
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/kernel/src/main/scala/org/apache/toree/kernel/api/StreamMethods.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/api/StreamMethods.scala b/kernel/src/main/scala/org/apache/toree/kernel/api/StreamMethods.scala
index 99ac5e4..56e2051 100644
--- a/kernel/src/main/scala/org/apache/toree/kernel/api/StreamMethods.scala
+++ b/kernel/src/main/scala/org/apache/toree/kernel/api/StreamMethods.scala
@@ -30,7 +30,7 @@ class StreamMethods(actorLoader: ActorLoader, parentMessage: KernelMessage)
{
private[api] val kmBuilder = v5.KMBuilder()
.withParent(parentMessage)
- .withIds(Seq(v5.content.StreamContent.toTypeString))
+ .withIds(Seq(v5.content.StreamContent.toTypeString.getBytes))
.withHeader(v5.content.StreamContent.toTypeString)
/**
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/dispatch/StatusDispatch.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/dispatch/StatusDispatch.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/dispatch/StatusDispatch.scala
index 76e326c..5a148d6 100644
--- a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/dispatch/StatusDispatch.scala
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/dispatch/StatusDispatch.scala
@@ -29,7 +29,7 @@ class StatusDispatch(actorLoader: ActorLoader) extends Actor with LogLike {
private def sendStatusMessage(kernelStatus: KernelStatusType, parentHeader: Header) {
// Create the status message and send it to the relay
val km : KernelMessage = KMBuilder()
- .withIds(Seq(MessageType.Outgoing.Status.toString))
+ .withIds(Seq(MessageType.Outgoing.Status.toString.getBytes))
.withSignature("")
.withHeader(MessageType.Outgoing.Status)
.withParentHeader(parentHeader)
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/handler/ExecuteRequestHandler.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/handler/ExecuteRequestHandler.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/handler/ExecuteRequestHandler.scala
index a59608d..96c306e 100644
--- a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/handler/ExecuteRequestHandler.scala
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/handler/ExecuteRequestHandler.scala
@@ -98,7 +98,7 @@ class ExecuteRequestHandler(
// Send an ExecuteResult with the result of the code execution
if (executeResult.hasContent) {
val executeResultMsg = skeletonBuilder
- .withIds(Seq(MessageType.Outgoing.ExecuteResult.toString))
+ .withIds(Seq(MessageType.Outgoing.ExecuteResult.toString.getBytes))
.withHeader(MessageType.Outgoing.ExecuteResult)
.withContentString(executeResult).build
relayMsg(executeResultMsg, relayActor)
@@ -166,4 +166,4 @@ class ExecuteRequestHandler(
logKernelMessageAction("Sending to KernelMessageRelay.", km)
relayActor ! km
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/Utilities.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/Utilities.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/Utilities.scala
index 6ef4c06..6f5d060 100644
--- a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/Utilities.scala
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/Utilities.scala
@@ -55,9 +55,9 @@ object Utilities extends LogLike {
val delimiterIndex: Int =
message.frames.indexOf(ByteString("<IDS|MSG>".getBytes))
// TODO Handle the case where there is no delimiter
- val ids: Seq[String] =
+ val ids: Seq[Array[Byte]] =
message.frames.take(delimiterIndex).map(
- (byteString : ByteString) => { new String(byteString.toArray) }
+ (byteString : ByteString) => { byteString.toArray }
)
val header = Json.parse(message.frames(delimiterIndex + 2)).as[Header]
// TODO: Investigate better solution than setting parentHeader to null for {}
@@ -78,7 +78,7 @@ object Utilities extends LogLike {
implicit def KernelMessageToZMQMessage(kernelMessage : KernelMessage) : ZMQMessage = {
val frames: scala.collection.mutable.ListBuffer[ByteString] = scala.collection.mutable.ListBuffer()
- kernelMessage.ids.map((id : String) => frames += id )
+ kernelMessage.ids.map((id : Array[Byte]) => frames += ByteString.apply(id) )
frames += "<IDS|MSG>"
frames += kernelMessage.signature
frames += Json.toJson(kernelMessage.header).toString()
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOutputStream.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOutputStream.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOutputStream.scala
index 65c5874..5f798b2 100644
--- a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOutputStream.scala
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOutputStream.scala
@@ -106,7 +106,7 @@ class KernelOutputStream(
)
val kernelMessage = kmBuilder
- .withIds(Seq(MessageType.Outgoing.Stream.toString))
+ .withIds(Seq(MessageType.Outgoing.Stream.toString.getBytes))
.withHeader(MessageType.Outgoing.Stream)
.withContentString(streamContent).build
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/CommInfoRequestHandlerSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/CommInfoRequestHandlerSpec.scala b/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/CommInfoRequestHandlerSpec.scala
index 0e30747..e60fdec 100644
--- a/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/CommInfoRequestHandlerSpec.scala
+++ b/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/CommInfoRequestHandlerSpec.scala
@@ -65,7 +65,7 @@ class CommInfoRequestHandlerSpec extends TestKit(
describe("Comm Info Request Handler") {
it("should return a KernelMessage containing a comm info response for a specific target name") {
val kernelMessage = new KernelMessage(
- Seq[String](), "test message", header, header, Map[String, String](), "{\"target_name\":\"test.name\"}"
+ Seq[Array[Byte]](), "test message", header, header, Map[String, String](), "{\"target_name\":\"test.name\"}"
)
when(mockCommStorage.getTargets()).thenReturn(Set("test.name"))
@@ -83,7 +83,7 @@ class CommInfoRequestHandlerSpec extends TestKit(
it("should return a KernelMessage containing a comm info response for all comms when target_name is missing from the message") {
val kernelMessage = new KernelMessage(
- Seq[String](), "test message", header, header, Map[String, String](), "{}"
+ Seq[Array[Byte]](), "test message", header, header, Map[String, String](), "{}"
)
when(mockCommStorage.getTargets()).thenReturn(Set("test.name1", "test.name2"))
@@ -103,7 +103,7 @@ class CommInfoRequestHandlerSpec extends TestKit(
it("should return a KernelMessage containing an empty comm info response when the target name value is not found") {
val kernelMessage = new KernelMessage(
- Seq[String](), "test message", header, header, Map[String, String](), "{\"target_name\":\"can't_find_me\"}"
+ Seq[Array[Byte]](), "test message", header, header, Map[String, String](), "{\"target_name\":\"can't_find_me\"}"
)
when(mockCommStorage.getTargets()).thenReturn(Set("test.name"))
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/KernelInfoRequestHandlerSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/KernelInfoRequestHandlerSpec.scala b/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/KernelInfoRequestHandlerSpec.scala
index a4af1e5..f477582 100644
--- a/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/KernelInfoRequestHandlerSpec.scala
+++ b/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/KernelInfoRequestHandlerSpec.scala
@@ -56,7 +56,7 @@ class KernelInfoRequestHandlerSpec extends TestKit(
val header = Header("","","","","")
val kernelMessage = new KernelMessage(
- Seq[String](), "test message", header, header, Map[String, String](), "{}"
+ Seq[Array[Byte]](), "test message", header, header, Map[String, String](), "{}"
)
describe("Kernel Info Request Handler") {
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/kernel/UtilitiesSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/kernel/UtilitiesSpec.scala b/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/kernel/UtilitiesSpec.scala
index f2f21b7..cfae79b 100644
--- a/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/kernel/UtilitiesSpec.scala
+++ b/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/kernel/UtilitiesSpec.scala
@@ -33,7 +33,7 @@ class UtilitiesSpec extends FunSpec with Matchers {
"<PARENT-UUID>", "<PARENT-STRING>", "<PARENT-UUID>", "<PARENT-STRING>", "<PARENT-FLOAT>"
)
val kernelMessage = KernelMessage(
- Seq("<STRING-1>","<STRING-2>"),
+ Seq("<STRING-1>","<STRING-2>").map(x => x.getBytes),
"<SIGNATURE>", header, parentHeader, Map(), "<STRING>"
)
@@ -69,13 +69,13 @@ class UtilitiesSpec extends FunSpec with Matchers {
describe("Utilities") {
describe("implicit #KernelMessageToZMQMessage") {
it("should correctly convert a kernel message to a ZMQMessage") {
- Utilities.KernelMessageToZMQMessage(kernelMessage) should be (zmqMessage)
+ Utilities.KernelMessageToZMQMessage(kernelMessage) should equal (zmqMessage)
}
}
describe("implicit #ZMQMessageToKernelMessage") {
it("should correctly convert a ZMQMessage to a kernel message") {
- Utilities.ZMQMessageToKernelMessage(zmqMessage) should be (kernelMessage)
+ Utilities.ZMQMessageToKernelMessage(zmqMessage) should equal (kernelMessage)
}
}
@@ -83,12 +83,12 @@ class UtilitiesSpec extends FunSpec with Matchers {
it("should convert back to the original message, ZMQ -> Kernel -> ZMQ") {
Utilities.KernelMessageToZMQMessage(
Utilities.ZMQMessageToKernelMessage(zmqMessage)
- ) should be (zmqMessage)
+ ) should equal (zmqMessage)
}
it("should convert back to the original message, Kernel -> ZMQ -> Kernel") {
Utilities.ZMQMessageToKernelMessage(
Utilities.KernelMessageToZMQMessage(kernelMessage)
- ) should be (kernelMessage)
+ ) should equal (kernelMessage)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/relay/KernelMessageRelaySpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/relay/KernelMessageRelaySpec.scala b/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/relay/KernelMessageRelaySpec.scala
index 0dbcc09..2935f4b 100644
--- a/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/relay/KernelMessageRelaySpec.scala
+++ b/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/relay/KernelMessageRelaySpec.scala
@@ -54,10 +54,10 @@ class KernelMessageRelaySpec extends TestKit(
"<TYPE>", "<VERSION>")
private val parentHeader: Header = Header("<PARENT-UUID>", "<PARENT-USER>",
"<PARENT-SESSION>", "<PARENT-TYPE>", "<PARENT-VERSION>")
- private val incomingKernelMessage: KernelMessage = KernelMessage(Seq("<ID>"),
+ private val incomingKernelMessage: KernelMessage = KernelMessage(Seq("<ID>".getBytes),
"<SIGNATURE>", header.copy(msg_type = IncomingMessageType),
parentHeader, Metadata(), "<CONTENT>")
- private val outgoingKernelMessage: KernelMessage = KernelMessage(Seq("<ID>"),
+ private val outgoingKernelMessage: KernelMessage = KernelMessage(Seq("<ID>".getBytes),
"<SIGNATURE>", header.copy(msg_type = OutgoingMessageType),
incomingKernelMessage.header, Metadata(), "<CONTENT>")
private val incomingZmqStrings = "1" :: "2" :: "3" :: "4" :: Nil
@@ -221,7 +221,7 @@ class KernelMessageRelaySpec extends TestKit(
def sendKernelMessages(n: Int, kernelMessageRelay: ActorRef): Unit ={
// Sends n messages to the relay
(0 until n).foreach (i => {
- val km = KernelMessage(Seq("<ID>"), s"${i}",
+ val km = KernelMessage(Seq("<ID>".getBytes), s"${i}",
header.copy(msg_type = IncomingMessageType), parentHeader,
Metadata(), s"${i}")
kernelMessageRelay ! Tuple2(Seq("SomeString"), km)
@@ -245,4 +245,4 @@ case class ChaoticActor[U](receiveFunc : Any => U) extends Actor {
}
}).start()
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOuputStreamSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOuputStreamSpec.scala b/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOuputStreamSpec.scala
index a74946d..c9ee2aa 100644
--- a/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOuputStreamSpec.scala
+++ b/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOuputStreamSpec.scala
@@ -237,7 +237,8 @@ class KernelOuputStreamSpec
Then("the ids should be set to execute_result")
val message = kernelOutputRelayProbe
.receiveOne(MaxAkkaTestTimeout).asInstanceOf[KernelMessage]
- message.ids should be (Seq(MessageType.Outgoing.Stream.toString))
+
+ message.ids(0).deep should equal (MessageType.Outgoing.Stream.toString.getBytes.deep)
}
it("should set the message type in the header of the kernel message to an execute_result") {
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/KMBuilder.scala
----------------------------------------------------------------------
diff --git a/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/KMBuilder.scala b/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/KMBuilder.scala
index 70eeb4b..ccc69a3 100644
--- a/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/KMBuilder.scala
+++ b/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/KMBuilder.scala
@@ -48,7 +48,7 @@ case class KMBuilder(km: KernelMessage = KernelMessage(
)) {
require(km != null)
- def withIds(newVal: Seq[String]) : KMBuilder =
+ def withIds(newVal: Seq[Array[Byte]]) : KMBuilder =
KMBuilder(this.km.copy(ids = newVal))
def withSignature(newVal: String) : KMBuilder =
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/KernelMessage.scala
----------------------------------------------------------------------
diff --git a/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/KernelMessage.scala b/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/KernelMessage.scala
index 6ef6663..8eeb536 100644
--- a/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/KernelMessage.scala
+++ b/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/KernelMessage.scala
@@ -18,10 +18,31 @@
package org.apache.toree.kernel.protocol.v5
case class KernelMessage(
- ids: Seq[String],
+ ids: Seq[Array[Byte]],
signature: String,
header: Header,
parentHeader: ParentHeader, // TODO: This can be an empty json object of {}
metadata: Metadata,
contentString: String
-)
+)
+{
+ override def equals ( o: Any ) = o match {
+ case km: KernelMessage => {
+ var equal = ( ids.length == km.ids.length && signature == km.signature && header == km.header && parentHeader == km.parentHeader && metadata == km.metadata && contentString == km.contentString )
+ var i = ids.length
+ while ( equal && ( 0 < i ) ) {
+ i = i - 1
+ equal = (ids(i).deep == km.ids(i).deep )
+ }
+ equal = true
+ equal
+ }
+ case _ => false
+ }
+
+ override def hashCode: Int = {
+ var z = signature.## + header.## + parentHeader.## + metadata.## + contentString.##
+ for( id <- ids ) for ( b <- id ) { z += b }
+ z
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/KMBuilderSpec.scala
----------------------------------------------------------------------
diff --git a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/KMBuilderSpec.scala b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/KMBuilderSpec.scala
index afdf5b1..01132a8 100644
--- a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/KMBuilderSpec.scala
+++ b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/KMBuilderSpec.scala
@@ -73,7 +73,7 @@ class KMBuilderSpec extends FunSpec with Matchers {
describe("withXYZ"){
describe("#withIds"){
it("should produce a KMBuilder with a KernelMessage with ids set") {
- val ids = Seq("baos", "win")
+ val ids = Seq("baos", "win").map(x => x.getBytes)
val builder = KMBuilder().withIds(ids)
builder.km.ids should be (ids)
}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/package.scala
----------------------------------------------------------------------
diff --git a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/package.scala b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/package.scala
index e30941a..33d2f6b 100644
--- a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/package.scala
+++ b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/package.scala
@@ -30,7 +30,7 @@ package object v5Test {
val MockParenHeader: Header = Header("<PARENT-UUID>","<PARENT-USER>","<PARENT-SESSION>",
MessageType.Outgoing.ClearOutput.toString, "<PARENT-VERSION>")
// The actual kernel message
- val MockKernelMessage : KernelMessage = KernelMessage(Seq("<ID>"), "<SIGNATURE>", MockHeader,
+ val MockKernelMessage : KernelMessage = KernelMessage(Seq("<ID>".getBytes), "<SIGNATURE>", MockHeader,
MockParenHeader, Metadata(), "<CONTENT>")
// Use the implicit to convert the KernelMessage to ZMQMessage
//val MockZMQMessage : ZMQMessage = MockKernelMessage
@@ -41,7 +41,7 @@ package object v5Test {
contentString = Json.toJson(MockExecuteRequest).toString
)
val MockKernelMessageWithBadExecuteRequest = new KernelMessage(
- Seq[String](), "test message", MockHeader, MockParenHeader, Map[String, String](),
+ Seq[Array[Byte]](), "test message", MockHeader, MockParenHeader, Map[String, String](),
"""
{"code" : 124 }
"""