You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lr...@apache.org on 2016/01/11 22:01:51 UTC
[09/50] [abbrv] incubator-toree git commit: Attach parent header to
comm messages that are generated with a comm writer.
Attach parent header to comm messages that are generated with a comm writer.
Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/d1f93bb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/d1f93bb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/d1f93bb0
Branch: refs/heads/master
Commit: d1f93bb06f0fefafa0d1876e91680f822ddc12d6
Parents: 9db161f
Author: wellecks <we...@gmail.com>
Authored: Mon Nov 16 15:04:34 2015 -0600
Committer: wellecks <we...@gmail.com>
Committed: Mon Nov 16 15:04:34 2015 -0600
----------------------------------------------------------------------
.../protocol/v5/handler/CommMsgHandler.scala | 10 +++++---
.../v5/handler/CommMsgHandlerSpec.scala | 27 ++++++++++++++++++++
2 files changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/d1f93bb0/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandler.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandler.scala b/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandler.scala
index a0f5adc..03baef9 100644
--- a/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandler.scala
+++ b/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandler.scala
@@ -42,22 +42,24 @@ class CommMsgHandler(
{
override def process(kernelMessage: KernelMessage): Future[_] = future {
logKernelMessageAction("Initiating Comm Msg for", kernelMessage)
+
+ val kmBuilder = KMBuilder().withParent(kernelMessage)
+
Utilities.parseAndHandle(
kernelMessage.contentString,
CommMsg.commMsgReads,
- handler = handleCommMsg,
+ handler = handleCommMsg(kmBuilder),
errHandler = handleParseError
)
}
- private def handleCommMsg(commMsg: CommMsg) = {
+ private def handleCommMsg(kmBuilder: KMBuilder)(commMsg: CommMsg) = {
val commId = commMsg.comm_id
val data = commMsg.data
logger.debug(s"Received comm_msg with id '$commId'")
- // TODO: Should we be reusing something from the KernelMessage?
- val commWriter = new KernelCommWriter(actorLoader, KMBuilder(), commId)
+ val commWriter = new KernelCommWriter(actorLoader, kmBuilder, commId)
commStorage.getCommIdCallbacks(commId) match {
case None =>
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/d1f93bb0/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandlerSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandlerSpec.scala b/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandlerSpec.scala
index 363ab68..582a08e 100644
--- a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandlerSpec.scala
+++ b/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandlerSpec.scala
@@ -121,6 +121,33 @@ class CommMsgHandlerSpec extends TestKit(
// limit? Is there a different logical approach?
kernelMessageRelayProbe.expectNoMsg(200.milliseconds)
}
+
+ it("should include the parent's header in the parent header of " +
+ "outgoing messages"){
+
+ // Register a callback that sends a message using the comm writer
+ val msgCallback: CommCallbacks.MsgCallback =
+ new CommCallbacks.MsgCallback() {
+ def apply(v1: CommWriter, v2: v5.UUID, v3: v5.MsgData): Unit =
+ v1.writeMsg(MsgData.Empty)
+ }
+ val callbacks = (new CommCallbacks).addMsgCallback(msgCallback)
+ doReturn(Some(callbacks)).when(spyCommStorage)
+ .getCommIdCallbacks(TestCommId)
+
+ // Send a comm_msg message with the test id
+ val msg = kmBuilder
+ .withHeader(CommMsg.toTypeString)
+ .withContentString(CommMsg(TestCommId, v5.MsgData.Empty))
+ .build
+ commMsgHandler ! msg
+
+ // Verify that the message sent by the handler has the desired property
+ kernelMessageRelayProbe.fishForMessage(200.milliseconds) {
+ case KernelMessage(_, _, _, parentHeader, _, _) =>
+ parentHeader == msg.header
+ }
+ }
}
}
}