You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lr...@apache.org on 2016/01/11 22:01:51 UTC

[09/50] [abbrv] incubator-toree git commit: Attach parent header to comm messages that are generated with a comm writer.

Attach parent header to comm messages that are generated with a comm writer.


Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/d1f93bb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/d1f93bb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/d1f93bb0

Branch: refs/heads/master
Commit: d1f93bb06f0fefafa0d1876e91680f822ddc12d6
Parents: 9db161f
Author: wellecks <we...@gmail.com>
Authored: Mon Nov 16 15:04:34 2015 -0600
Committer: wellecks <we...@gmail.com>
Committed: Mon Nov 16 15:04:34 2015 -0600

----------------------------------------------------------------------
 .../protocol/v5/handler/CommMsgHandler.scala    | 10 +++++---
 .../v5/handler/CommMsgHandlerSpec.scala         | 27 ++++++++++++++++++++
 2 files changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/d1f93bb0/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandler.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandler.scala b/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandler.scala
index a0f5adc..03baef9 100644
--- a/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandler.scala
+++ b/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandler.scala
@@ -42,22 +42,24 @@ class CommMsgHandler(
 {
   override def process(kernelMessage: KernelMessage): Future[_] = future {
     logKernelMessageAction("Initiating Comm Msg for", kernelMessage)
+
+    val kmBuilder = KMBuilder().withParent(kernelMessage)
+
     Utilities.parseAndHandle(
       kernelMessage.contentString,
       CommMsg.commMsgReads,
-      handler = handleCommMsg,
+      handler = handleCommMsg(kmBuilder),
       errHandler = handleParseError
     )
   }
 
-  private def handleCommMsg(commMsg: CommMsg) = {
+  private def handleCommMsg(kmBuilder: KMBuilder)(commMsg: CommMsg) = {
     val commId = commMsg.comm_id
     val data = commMsg.data
 
     logger.debug(s"Received comm_msg with id '$commId'")
 
-    // TODO: Should we be reusing something from the KernelMessage?
-    val commWriter = new KernelCommWriter(actorLoader, KMBuilder(), commId)
+    val commWriter = new KernelCommWriter(actorLoader, kmBuilder, commId)
 
     commStorage.getCommIdCallbacks(commId) match {
       case None             =>

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/d1f93bb0/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandlerSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandlerSpec.scala b/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandlerSpec.scala
index 363ab68..582a08e 100644
--- a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandlerSpec.scala
+++ b/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandlerSpec.scala
@@ -121,6 +121,33 @@ class CommMsgHandlerSpec extends TestKit(
         //       limit? Is there a different logical approach?
         kernelMessageRelayProbe.expectNoMsg(200.milliseconds)
       }
+
+      it("should include the parent's header in the parent header of " +
+         "outgoing messages"){
+
+        // Register a callback that sends a message using the comm writer
+        val msgCallback: CommCallbacks.MsgCallback =
+          new CommCallbacks.MsgCallback() {
+            def apply(v1: CommWriter, v2: v5.UUID, v3: v5.MsgData): Unit =
+              v1.writeMsg(MsgData.Empty)
+          }
+        val callbacks = (new CommCallbacks).addMsgCallback(msgCallback)
+        doReturn(Some(callbacks)).when(spyCommStorage)
+          .getCommIdCallbacks(TestCommId)
+
+        // Send a comm_msg message with the test id
+        val msg = kmBuilder
+          .withHeader(CommMsg.toTypeString)
+          .withContentString(CommMsg(TestCommId, v5.MsgData.Empty))
+          .build
+        commMsgHandler ! msg
+
+        // Verify that the message sent by the handler has the desired property
+        kernelMessageRelayProbe.fishForMessage(200.milliseconds) {
+          case KernelMessage(_, _, _, parentHeader, _, _) =>
+            parentHeader == msg.header
+        }
+      }
     }
   }
 }