You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lb...@apache.org on 2016/01/22 23:07:12 UTC

[04/51] [abbrv] incubator-toree git commit: Moved scala files to new locations based on new package

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/InspectRequestSpec.scala
----------------------------------------------------------------------
diff --git a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/InspectRequestSpec.scala b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/InspectRequestSpec.scala
new file mode 100644
index 0000000..2ff561d
--- /dev/null
+++ b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/InspectRequestSpec.scala
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.content
+
+import org.scalatest.{FunSpec, Matchers}
+import play.api.data.validation.ValidationError
+import play.api.libs.json._
+
+class InspectRequestSpec extends FunSpec with Matchers {
+  val inspectRequestJson: JsValue = Json.parse("""
+  {
+    "code": "<STRING>",
+    "cursor_pos": 999,
+    "detail_level": 1
+  }
+  """)
+
+  val inspectRequest: InspectRequest = InspectRequest(
+    "<STRING>", 999, 1
+  )
+
+  describe("InspectRequest") {
+    describe("#toTypeString") {
+      it("should return correct type") {
+        InspectRequest.toTypeString should be ("inspect_request")
+      }
+    }
+
+    describe("implicit conversions") {
+      it("should implicitly convert from valid json to a InspectRequest instance") {
+        // This is the least safe way to convert as an error is thrown if it fails
+        inspectRequestJson.as[InspectRequest] should be (inspectRequest)
+      }
+
+      it("should also work with asOpt") {
+        // This is safer, but we lose the error information as it returns
+        // None if the conversion fails
+        val newInspectRequest = inspectRequestJson.asOpt[InspectRequest]
+
+        newInspectRequest.get should be (inspectRequest)
+      }
+
+      it("should also work with validate") {
+        // This is the safest as it collects all error information (not just first error) and reports it
+        val InspectRequestResults = inspectRequestJson.validate[InspectRequest]
+
+        InspectRequestResults.fold(
+          (invalid: Seq[(JsPath, Seq[ValidationError])]) => println("Failed!"),
+          (valid: InspectRequest) => valid
+        ) should be (inspectRequest)
+      }
+
+      it("should implicitly convert from a InspectRequest instance to valid json") {
+        Json.toJson(inspectRequest) should be (inspectRequestJson)
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelInfoReplySpec.scala
----------------------------------------------------------------------
diff --git a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelInfoReplySpec.scala b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelInfoReplySpec.scala
new file mode 100644
index 0000000..7d704e2
--- /dev/null
+++ b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelInfoReplySpec.scala
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.content
+
+import org.scalatest.{FunSpec, Matchers}
+import play.api.data.validation.ValidationError
+import play.api.libs.json._
+
+class KernelInfoReplySpec extends FunSpec with Matchers {
+  val kernelInfoReplyJson: JsValue = Json.parse("""
+  {
+    "protocol_version": "x.y.z",
+    "implementation": "<name>",
+    "implementation_version": "z.y.x",
+    "language_info": { "name": "<some language>" },
+    "language_version": "a.b.c",
+    "banner": "<some banner>"
+  }
+  """)
+
+  val kernelInfoReply: KernelInfoReply = KernelInfoReply(
+    "x.y.z", "<name>", "z.y.x", Map("name" -> "<some language>"), "a.b.c", "<some banner>"
+  )
+
+  describe("KernelInfoReply") {
+    describe("#toTypeString") {
+      it("should return correct type") {
+        KernelInfoReply.toTypeString should be ("kernel_info_reply")
+      }
+    }
+
+    describe("implicit conversions") {
+      it("should implicitly convert from valid json to a kernelInfoReply instance") {
+        // This is the least safe way to convert as an error is thrown if it fails
+        kernelInfoReplyJson.as[KernelInfoReply] should be (kernelInfoReply)
+      }
+
+      it("should also work with asOpt") {
+        // This is safer, but we lose the error information as it returns
+        // None if the conversion fails
+        val newKernelInfoReply = kernelInfoReplyJson.asOpt[KernelInfoReply]
+
+        newKernelInfoReply.get should be (kernelInfoReply)
+      }
+
+      it("should also work with validate") {
+        // This is the safest as it collects all error information (not just first error) and reports it
+        val kernelInfoReplyResults = kernelInfoReplyJson.validate[KernelInfoReply]
+
+        kernelInfoReplyResults.fold(
+          (invalid: Seq[(JsPath, Seq[ValidationError])]) => println("Failed!"),
+          (valid: KernelInfoReply) => valid
+        ) should be (kernelInfoReply)
+      }
+
+      it("should implicitly convert from a kernelInfoReply instance to valid json") {
+        Json.toJson(kernelInfoReply) should be (kernelInfoReplyJson)
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelInfoRequestSpec.scala
----------------------------------------------------------------------
diff --git a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelInfoRequestSpec.scala b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelInfoRequestSpec.scala
new file mode 100644
index 0000000..e63b4c3
--- /dev/null
+++ b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelInfoRequestSpec.scala
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.content
+
+import org.scalatest.{FunSpec, Matchers}
+import play.api.data.validation.ValidationError
+import play.api.libs.json._
+
+class KernelInfoRequestSpec extends FunSpec with Matchers {
+  val kernelInfoRequestJson: JsValue = Json.parse("""
+  {}
+  """)
+
+  val kernelInfoRequest: KernelInfoRequest = KernelInfoRequest()
+
+  describe("KernelInfoRequest") {
+    describe("#toTypeString") {
+      it("should return correct type") {
+        KernelInfoRequest.toTypeString should be ("kernel_info_request")
+      }
+    }
+
+    describe("implicit conversions") {
+      it("should implicitly convert from valid json to a KernelInfoRequest instance") {
+        // This is the least safe way to convert as an error is thrown if it fails
+        // This is the least safe way to convert as an error is thrown if it fails
+        kernelInfoRequestJson.as[KernelInfoRequest] should be (kernelInfoRequest)
+      }
+
+      it("should also work with asOpt") {
+        // This is safer, but we lose the error information as it returns
+        // None if the conversion fails
+        val newKernelInfoRequest = kernelInfoRequestJson.asOpt[KernelInfoRequest]
+
+        newKernelInfoRequest.get should be (kernelInfoRequest)
+      }
+
+      it("should also work with validate") {
+        // This is the safest as it collects all error information (not just first error) and reports it
+        val KernelInfoRequestResults = kernelInfoRequestJson.validate[KernelInfoRequest]
+
+        KernelInfoRequestResults.fold(
+          (invalid: Seq[(JsPath, Seq[ValidationError])]) => println("Failed!"),
+          (valid: KernelInfoRequest) => valid
+        ) should be (kernelInfoRequest)
+      }
+
+      it("should implicitly convert from a KernelInfoRequest instance to valid json") {
+        Json.toJson(kernelInfoRequest) should be (kernelInfoRequestJson)
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelStatusSpec.scala
----------------------------------------------------------------------
diff --git a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelStatusSpec.scala b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelStatusSpec.scala
new file mode 100644
index 0000000..4663d98
--- /dev/null
+++ b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelStatusSpec.scala
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.content
+
+import org.scalatest.{FunSpec, Matchers}
+import play.api.data.validation.ValidationError
+import play.api.libs.json.{JsPath, JsValue, Json}
+
+class KernelStatusSpec extends FunSpec with Matchers {
+  val kernelStatusJson: JsValue = Json.parse("""
+  {
+    "execution_state": "<STRING>"
+  }
+  """)
+
+  val kernelStatus: KernelStatus = KernelStatus("<STRING>")
+
+  describe("KernelStatus") {
+    describe("#toTypeString") {
+      it("should return correct type") {
+        KernelStatus.toTypeString should be ("status")
+      }
+    }
+
+    describe("implicit conversions") {
+      it("should implicitly convert from valid json to a kernelStatus instance") {
+        // This is the least safe way to convert as an error is thrown if it fails
+        kernelStatusJson.as[KernelStatus] should be (kernelStatus)
+      }
+
+      it("should also work with asOpt") {
+        // This is safer, but we lose the error information as it returns
+        // None if the conversion fails
+        val newKernelStatus = kernelStatusJson.asOpt[KernelStatus]
+
+        newKernelStatus.get should be (kernelStatus)
+      }
+
+      it("should also work with validate") {
+        // This is the safest as it collects all error information (not just first error) and reports it
+        val kernelStatusResults = kernelStatusJson.validate[KernelStatus]
+
+        kernelStatusResults.fold(
+          (invalid: Seq[(JsPath, Seq[ValidationError])]) => println("Failed!"),
+          (valid: KernelStatus) => valid
+        ) should be (kernelStatus)
+      }
+
+      it("should implicitly convert from a kernelStatus instance to valid json") {
+        Json.toJson(kernelStatus) should be (kernelStatusJson)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/ShutdownReplySpec.scala
----------------------------------------------------------------------
diff --git a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/ShutdownReplySpec.scala b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/ShutdownReplySpec.scala
new file mode 100644
index 0000000..15e8eba
--- /dev/null
+++ b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/ShutdownReplySpec.scala
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.content
+
+import org.scalatest.{FunSpec, Matchers}
+import play.api.data.validation.ValidationError
+import play.api.libs.json._
+
+class ShutdownReplySpec extends FunSpec with Matchers {
+  val shutdownReplyJson: JsValue = Json.parse("""
+  {
+    "restart": true
+  }
+  """)
+
+  val shutdownReply: ShutdownReply = ShutdownReply(
+    true
+  )
+
+  describe("ShutdownReply") {
+    describe("#toTypeString") {
+      it("should return correct type") {
+        ShutdownReply.toTypeString should be ("shutdown_reply")
+      }
+    }
+
+    describe("implicit conversions") {
+      it("should implicitly convert from valid json to a ShutdownReply instance") {
+        // This is the least safe way to convert as an error is thrown if it fails
+        shutdownReplyJson.as[ShutdownReply] should be (shutdownReply)
+      }
+
+      it("should also work with asOpt") {
+        // This is safer, but we lose the error information as it returns
+        // None if the conversion fails
+        val newShutdownReply = shutdownReplyJson.asOpt[ShutdownReply]
+
+        newShutdownReply.get should be (shutdownReply)
+      }
+
+      it("should also work with validate") {
+        // This is the safest as it collects all error information (not just first error) and reports it
+        val ShutdownReplyResults = shutdownReplyJson.validate[ShutdownReply]
+
+        ShutdownReplyResults.fold(
+          (invalid: Seq[(JsPath, Seq[ValidationError])]) => println("Failed!"),
+          (valid: ShutdownReply) => valid
+        ) should be (shutdownReply)
+      }
+
+      it("should implicitly convert from a ShutdownReply instance to valid json") {
+        Json.toJson(shutdownReply) should be (shutdownReplyJson)
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/ShutdownRequestSpec.scala
----------------------------------------------------------------------
diff --git a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/ShutdownRequestSpec.scala b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/ShutdownRequestSpec.scala
new file mode 100644
index 0000000..fdc063d
--- /dev/null
+++ b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/ShutdownRequestSpec.scala
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.content
+
+import org.scalatest.{FunSpec, Matchers}
+import play.api.data.validation.ValidationError
+import play.api.libs.json._
+
+class ShutdownRequestSpec extends FunSpec with Matchers {
+  val shutdownRequestJson: JsValue = Json.parse("""
+  {
+    "restart": true
+  }
+  """)
+
+  val shutdownRequest: ShutdownRequest = ShutdownRequest(
+    true
+  )
+
+  describe("ShutdownRequest") {
+    describe("#toTypeString") {
+      it("should return correct type") {
+        ShutdownRequest.toTypeString should be ("shutdown_request")
+      }
+    }
+
+    describe("implicit conversions") {
+      it("should implicitly convert from valid json to a ShutdownRequest instance") {
+        // This is the least safe way to convert as an error is thrown if it fails
+        shutdownRequestJson.as[ShutdownRequest] should be (shutdownRequest)
+      }
+
+      it("should also work with asOpt") {
+        // This is safer, but we lose the error information as it returns
+        // None if the conversion fails
+        val newShutdownRequest = shutdownRequestJson.asOpt[ShutdownRequest]
+
+        newShutdownRequest.get should be (shutdownRequest)
+      }
+
+      it("should also work with validate") {
+        // This is the safest as it collects all error information (not just first error) and reports it
+        val ShutdownRequestResults = shutdownRequestJson.validate[ShutdownRequest]
+
+        ShutdownRequestResults.fold(
+          (invalid: Seq[(JsPath, Seq[ValidationError])]) => println("Failed!"),
+          (valid: ShutdownRequest) => valid
+        ) should be (shutdownRequest)
+      }
+
+      it("should implicitly convert from a ShutdownRequest instance to valid json") {
+        Json.toJson(shutdownRequest) should be (shutdownRequestJson)
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/StreamContentSpec.scala
----------------------------------------------------------------------
diff --git a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/StreamContentSpec.scala b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/StreamContentSpec.scala
new file mode 100644
index 0000000..5c435a8
--- /dev/null
+++ b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/StreamContentSpec.scala
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.content
+
+import org.scalatest.{FunSpec, Matchers}
+import play.api.data.validation.ValidationError
+import play.api.libs.json._
+
+class StreamContentSpec extends FunSpec with Matchers {
+  val streamJson: JsValue = Json.parse("""
+  {
+    "text": "<STRING>",
+    "name": "<STRING>"
+  }
+  """)
+
+  val stream = StreamContent("<STRING>", "<STRING>")
+
+  describe("StreamContent") {
+    describe("#toTypeString") {
+      it("should return correct type") {
+        StreamContent.toTypeString should be ("stream")
+      }
+    }
+
+    describe("implicit conversions") {
+      it("should implicitly convert from valid json to a StreamContent instance") {
+        // This is the least safe way to convert as an error is thrown if it fails
+        streamJson.as[StreamContent] should be (stream)
+      }
+
+      it("should also work with asOpt") {
+        // This is safer, but we lose the error information as it returns
+        // None if the conversion fails
+        val newCompleteRequest = streamJson.asOpt[StreamContent]
+
+        newCompleteRequest.get should be (stream)
+      }
+
+      it("should also work with validate") {
+        // This is the safest as it collects all error information (not just first error) and reports it
+        val CompleteRequestResults = streamJson.validate[StreamContent]
+
+        CompleteRequestResults.fold(
+          (invalid: Seq[(JsPath, Seq[ValidationError])]) => println("Failed!"),
+          (valid: StreamContent) => valid
+        ) should be (stream)
+      }
+
+      it("should implicitly convert from a StreamContent instance to valid json") {
+        Json.toJson(stream) should be (streamJson)
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/package.scala
----------------------------------------------------------------------
diff --git a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/package.scala b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/package.scala
new file mode 100644
index 0000000..0174f9b
--- /dev/null
+++ b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/package.scala
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol
+
+//import akka.zeromq.ZMQMessage
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.kernel.protocol.v5.content.{CompleteRequest, ExecuteRequest}
+import play.api.libs.json.Json
+
+package object v5Test {
+  //  The header for the message
+  val MockHeader : Header = Header("<UUID>","<USER>","<SESSION>",
+    MessageType.Outgoing.ClearOutput.toString, "<VERSION>")
+  //  The parent header for the message
+  val MockParenHeader: Header = Header("<PARENT-UUID>","<PARENT-USER>","<PARENT-SESSION>",
+    MessageType.Outgoing.ClearOutput.toString, "<PARENT-VERSION>")
+  //  The actual kernel message
+  val MockKernelMessage : KernelMessage = KernelMessage(Seq("<ID>"), "<SIGNATURE>", MockHeader,
+    MockParenHeader, Metadata(), "<CONTENT>")
+  //  Use the implicit to convert the KernelMessage to ZMQMessage
+  //val MockZMQMessage : ZMQMessage = MockKernelMessage
+
+  val MockExecuteRequest: ExecuteRequest =
+    ExecuteRequest("spark code", false, true, Map(), false)
+  val MockExecuteRequestKernelMessage = MockKernelMessage.copy(
+    contentString =  Json.toJson(MockExecuteRequest).toString
+  )
+  val MockKernelMessageWithBadExecuteRequest = new KernelMessage(
+    Seq[String](), "test message", MockHeader, MockParenHeader, Map[String, String](),
+    """
+        {"code" : 124 }
+    """
+  )
+  val MockCompleteRequest: CompleteRequest = CompleteRequest("", 0)
+  val MockCompleteRequestKernelMessage: KernelMessage = MockKernelMessage.copy(contentString = Json.toJson(MockCompleteRequest).toString)
+  val MockKernelMessageWithBadJSON: KernelMessage = MockKernelMessage.copy(contentString = "inval1d")
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkBridge.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkBridge.scala b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkBridge.scala
deleted file mode 100644
index e0e8f60..0000000
--- a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkBridge.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.pyspark
-
-import com.ibm.spark.interpreter.broker.producer.{StandardSQLContextProducer, StandardJavaSparkContextProducer, SQLContextProducerLike, JavaSparkContextProducerLike}
-import com.ibm.spark.interpreter.broker.{BrokerState, BrokerBridge}
-import com.ibm.spark.kernel.api.KernelLike
-import org.apache.spark.SparkContext
-
-/**
- * Represents constants for the PySpark bridge.
- */
-object PySparkBridge {
-  /** Represents the maximum amount of code that can be queued for Python. */
-  val MaxQueuedCode = 500
-
-  /**
-   * Creates a new PySparkBridge instance.
-   *
-   * @param brokerState The container of broker state to expose
-   * @param kernel The kernel API to expose through the bridge
-   *
-   * @return The new PySpark bridge
-   */
-  def apply(
-    brokerState: BrokerState,
-    kernel: KernelLike
-  ): PySparkBridge = {
-    new PySparkBridge(
-      _brokerState = brokerState,
-      _kernel = kernel
-    ) with StandardJavaSparkContextProducer with StandardSQLContextProducer
-  }
-}
-
-/**
- * Represents the API available to PySpark to act as the bridge for data
- * between the JVM and Python.
- *
- * @param _brokerState The container of broker state to expose
- * @param _kernel The kernel API to expose through the bridge
- */
-class PySparkBridge private (
-  private val _brokerState: BrokerState,
-  private val _kernel: KernelLike
-) extends BrokerBridge(_brokerState, _kernel) {
-  override val brokerName: String = "PySpark"
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkException.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkException.scala b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkException.scala
deleted file mode 100644
index 664c806..0000000
--- a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkException.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.pyspark
-
-import com.ibm.spark.interpreter.broker.BrokerException
-
-/**
- * Represents a generic PySpark exception.
- *
- * @param message The message to associate with the exception
- */
-class PySparkException(message: String) extends BrokerException(message)
-

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkInterpreter.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkInterpreter.scala b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkInterpreter.scala
deleted file mode 100644
index 615ed19..0000000
--- a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkInterpreter.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.pyspark
-
-import java.net.URL
-
-import com.ibm.spark.interpreter.Results.Result
-import com.ibm.spark.interpreter._
-import com.ibm.spark.kernel.api.KernelLike
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
-import org.slf4j.LoggerFactory
-import py4j.GatewayServer
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-import scala.tools.nsc.interpreter.{InputStream, OutputStream}
-
-/**
- * Represents an interpreter interface to PySpark. Requires a properly-set
- * SPARK_HOME, PYTHONPATH pointing to Spark's Python source, and py4j installed
- * where it is accessible to the Spark Kernel.
- *
- */
-class PySparkInterpreter(
-) extends Interpreter {
-  private val logger = LoggerFactory.getLogger(this.getClass)
-  private var _kernel:KernelLike = _
-
-  // TODO: Replace hard-coded maximum queue count
-  /** Represents the state used by this interpreter's Python instance. */
-  private lazy val pySparkState = new PySparkState(500)
-
-  /** Represents the bridge used by this interpreter's Python interface. */
-  private lazy val pySparkBridge = PySparkBridge(
-    pySparkState,
-    _kernel
-  )
-
-
-  /** Represents the interface for Python to talk to JVM Spark components. */
-  private lazy val gatewayServer = new GatewayServer(pySparkBridge, 0)
-
-  /** Represents the process handler used for the PySpark process. */
-  private lazy val pySparkProcessHandler: PySparkProcessHandler =
-    new PySparkProcessHandler(
-      pySparkBridge,
-      restartOnFailure = true,
-      restartOnCompletion = true
-    )
-
-  private lazy val pySparkService = new PySparkService(
-    gatewayServer,
-    pySparkBridge,
-    pySparkProcessHandler
-  )
-  private lazy val pySparkTransformer = new PySparkTransformer
-
-  /**
-   * Initializes the interpreter.
-   * @param kernel The kernel
-   * @return The newly initialized interpreter
-   */
-  override def init(kernel: KernelLike): Interpreter = {
-    _kernel = kernel
-    this
-  }
-
-  // Unsupported (but can be invoked)
-  override def bindSparkContext(sparkContext: SparkContext): Unit = {}
-
-  // Unsupported (but can be invoked)
-  override def bindSqlContext(sqlContext: SQLContext): Unit = {}
-
-  /**
-   * Executes the provided code with the option to silence output.
-   * @param code The code to execute
-   * @param silent Whether or not to execute the code silently (no output)
-   * @return The success/failure of the interpretation and the output from the
-   *         execution or the failure
-   */
-  override def interpret(code: String, silent: Boolean):
-    (Result, Either[ExecuteOutput, ExecuteFailure]) =
-  {
-    if (!pySparkService.isRunning) pySparkService.start()
-
-    val futureResult = pySparkTransformer.transformToInterpreterResult(
-      pySparkService.submitCode(code)
-    )
-
-    Await.result(futureResult, Duration.Inf)
-  }
-
-  /**
-   * Starts the interpreter, initializing any internal state.
-   * @return A reference to the interpreter
-   */
-  override def start(): Interpreter = {
-    pySparkService.start()
-
-    this
-  }
-
-  /**
-   * Stops the interpreter, removing any previous internal state.
-   * @return A reference to the interpreter
-   */
-  override def stop(): Interpreter = {
-    pySparkService.stop()
-
-    this
-  }
-
-  /**
-   * Returns the class loader used by this interpreter.
-   *
-   * @return The runtime class loader used by this interpreter
-   */
-  override def classLoader: ClassLoader = this.getClass.getClassLoader
-
-  // Unsupported (but can be invoked)
-  override def lastExecutionVariableName: Option[String] = None
-
-  // Unsupported (but can be invoked)
-  override def read(variableName: String): Option[AnyRef] = None
-
-  // Unsupported (but can be invoked)
-  override def completion(code: String, pos: Int): (Int, List[String]) =
-    (pos, Nil)
-
-  // Unsupported
-  override def updatePrintStreams(in: InputStream, out: OutputStream, err: OutputStream): Unit = ???
-
-  // Unsupported
-  override def classServerURI: String = ""
-
-  // Unsupported
-  override def interrupt(): Interpreter = ???
-
-  // Unsupported
-  override def bind(variableName: String, typeName: String, value: Any, modifiers: List[String]): Unit = ???
-
-  // Unsupported
-  override def addJars(jars: URL*): Unit = ???
-
-  // Unsupported
-  override def doQuietly[T](body: => T): T = ???
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkProcess.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkProcess.scala b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkProcess.scala
deleted file mode 100644
index ace6635..0000000
--- a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkProcess.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.pyspark
-
-import java.io.{FileOutputStream, File}
-
-import com.ibm.spark.interpreter.broker.BrokerProcess
-import org.apache.commons.exec.environment.EnvironmentUtils
-import org.apache.commons.exec._
-import org.apache.commons.io.IOUtils
-import org.apache.spark.SparkContext
-import org.slf4j.LoggerFactory
-
-/**
- * Represents the Python process used to evaluate PySpark code.
- *
- * @param pySparkBridge The bridge to use to retrieve kernel output streams
- *                      and the Spark version to be verified
- * @param pySparkProcessHandler The handler to use when the process fails or
- *                              completes
- * @param port The port to provide to the PySpark process to use to connect
- *             back to the JVM
- * @param sparkVersion The version of Spark that the process will be using
- */
-class PySparkProcess(
-  private val pySparkBridge: PySparkBridge,
-  private val pySparkProcessHandler: PySparkProcessHandler,
-  private val port: Int,
-  private val sparkVersion: String
-) extends BrokerProcess(
-  processName = "python",
-  entryResource = "PySpark/pyspark_runner.py",
-  otherResources = Nil,
-  brokerBridge = pySparkBridge,
-  brokerProcessHandler = pySparkProcessHandler,
-  arguments = Seq(port.toString, sparkVersion)
-) {
-
-  override val brokerName: String = "PySpark"
-  private val logger = LoggerFactory.getLogger(this.getClass)
-
-  private val sparkHome = Option(System.getenv("SPARK_HOME"))
-    .orElse(Option(System.getProperty("spark.home")))
-  private val pythonPath = Option(System.getenv("PYTHONPATH"))
-
-  assert(sparkHome.nonEmpty, "PySpark process requires Spark Home to be set!")
-  if (pythonPath.isEmpty) logger.warn("PYTHONPATH not provided for PySpark!")
-
-  /**
-   * Creates a new process environment to be used for environment variable
-   * retrieval by the new process.
-   *
-   * @return The map of environment variables and their respective values
-   */
-  override protected def newProcessEnvironment(): Map[String, String] = {
-    val baseEnvironment = super.newProcessEnvironment()
-
-    import java.io.File.pathSeparator
-
-    val baseSparkHome = sparkHome.get
-    val basePythonPath = pythonPath.getOrElse("")
-    val updatedPythonPath =
-      (basePythonPath.split(pathSeparator) :+ s"$baseSparkHome/python/")
-        .map(_.trim)
-        .filter(_.nonEmpty)
-        .map(new File(_))
-        .distinct
-        .mkString(pathSeparator)
-
-    // Note: Adding the new map values should override the old ones
-    baseEnvironment ++ Map(
-      "SPARK_HOME" -> baseSparkHome,
-      "PYTHONPATH" -> updatedPythonPath
-    )
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkProcessHandler.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkProcessHandler.scala b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkProcessHandler.scala
deleted file mode 100644
index ab2aeb1..0000000
--- a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkProcessHandler.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.pyspark
-
-import com.ibm.spark.interpreter.broker.BrokerProcessHandler
-
-/**
- * Represents the handler for events triggered by the PySpark process.
- *
- * @param pySparkBridge The bridge to reset when the process fails or completes
- * @param restartOnFailure If true, restarts the process if it fails
- * @param restartOnCompletion If true, restarts the process if it completes
- */
-class PySparkProcessHandler(
-  private val pySparkBridge: PySparkBridge,
-  private val restartOnFailure: Boolean,
-  private val restartOnCompletion: Boolean
-  ) extends BrokerProcessHandler(
-  pySparkBridge,
-  restartOnFailure,
-  restartOnCompletion
-) {
-  override val brokerName: String = "PySpark"
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkService.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkService.scala b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkService.scala
deleted file mode 100644
index ec264e2..0000000
--- a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkService.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.pyspark
-
-import com.ibm.spark.interpreter.broker.BrokerService
-import com.ibm.spark.kernel.interpreter.pyspark.PySparkTypes._
-import org.apache.spark.SparkContext
-import org.slf4j.LoggerFactory
-import py4j.GatewayServer
-
-import scala.concurrent.Future
-
-/**
- * Represents the service that provides the high-level interface between the
- * JVM and Python.
- *
- * @param gatewayServer The backend to start to communicate between the JVM and
- *                      Python
- * @param pySparkBridge The bridge to use for communication between the JVM and
- *                      Python
- * @param pySparkProcessHandler The handler used for events that occur with
- *                              the PySpark process
- */
-class PySparkService(
-  private val gatewayServer: GatewayServer,
-  private val pySparkBridge: PySparkBridge,
-  private val pySparkProcessHandler: PySparkProcessHandler
-) extends BrokerService {
-  private val logger = LoggerFactory.getLogger(this.getClass)
-  @volatile private var _isRunning: Boolean = false
-  override def isRunning: Boolean = _isRunning
-
-
-  /** Represents the process used to execute Python code via the bridge. */
-  private lazy val pySparkProcess = {
-    val p = new PySparkProcess(
-      pySparkBridge,
-      pySparkProcessHandler,
-      gatewayServer.getListeningPort,
-      org.apache.spark.SPARK_VERSION
-    )
-
-    // Update handlers to correctly reset and restart the process
-    pySparkProcessHandler.setResetMethod(message => {
-      p.stop()
-      pySparkBridge.state.reset(message)
-    })
-    pySparkProcessHandler.setRestartMethod(() => p.start())
-
-    p
-  }
-
-  /** Starts the PySpark service. */
-  def start(): Unit = {
-    // Start without forking the gateway server (needs to have access to
-    // SparkContext in current JVM)
-    logger.debug("Starting gateway server")
-    gatewayServer.start()
-
-    val port = gatewayServer.getListeningPort
-    logger.debug(s"Gateway server running on port $port")
-
-    // Start the Python process used to execute code
-    logger.debug("Launching process to execute Python code")
-    pySparkProcess.start()
-
-    _isRunning = true
-  }
-
-  /**
-   * Submits code to the PySpark service to be executed and return a result.
-   *
-   * @param code The code to execute
-   *
-   * @return The result as a future to eventually return
-   */
-  def submitCode(code: Code): Future[CodeResults] = {
-    pySparkBridge.state.pushCode(code)
-  }
-
-  /** Stops the running PySpark service. */
-  def stop(): Unit = {
-    // Stop the Python process used to execute code
-    pySparkProcess.stop()
-
-    // Stop the server used as an entrypoint for Python
-    gatewayServer.shutdown()
-
-    _isRunning = false
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkState.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkState.scala b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkState.scala
deleted file mode 100644
index 2d0f63f..0000000
--- a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkState.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.pyspark
-
-import com.ibm.spark.interpreter.broker.BrokerState
-
-/**
- * Represents the state structure of PySpark.
- *
- * @param maxQueuedCode The maximum amount of code to support being queued
- *                      at the same time for PySpark execution
- */
-class PySparkState(private val maxQueuedCode: Int)
-  extends BrokerState(maxQueuedCode)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkTransformer.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkTransformer.scala b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkTransformer.scala
deleted file mode 100644
index 146ca8e..0000000
--- a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkTransformer.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.pyspark
-
-import com.ibm.spark.interpreter.broker.BrokerTransformer
-
-/**
- * Represents a utility that can transform raw PySpark information to
- * kernel information.
- */
-class PySparkTransformer extends BrokerTransformer

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkTypes.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkTypes.scala b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkTypes.scala
deleted file mode 100644
index 1004a1f..0000000
--- a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkTypes.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.pyspark
-
-import com.ibm.spark.interpreter.broker.BrokerTypesProvider
-
-/**
- * Represents all types associated with the PySpark interface.
- */
-object PySparkTypes extends BrokerTypesProvider

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/package.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/package.scala b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/package.scala
deleted file mode 100644
index 618a678..0000000
--- a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/package.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter
-
-import com.ibm.spark.interpreter.broker.{BrokerCode, BrokerPromise}
-
-/**
- * Contains aliases to broker types.
- */
-package object pyspark {
-  /**
-   * Represents a promise made regarding the completion of PySpark code
-   * execution.
-   */
-  type PySparkPromise = BrokerPromise
-
-  /**
-   * Represents a block of PyPython code to be evaluated.
-   */
-  type PySparkCode = BrokerCode
-}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/com/ibm/spark/magic/builtin/PySpark.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/com/ibm/spark/magic/builtin/PySpark.scala b/pyspark-interpreter/src/main/scala/com/ibm/spark/magic/builtin/PySpark.scala
deleted file mode 100644
index a0a79b5..0000000
--- a/pyspark-interpreter/src/main/scala/com/ibm/spark/magic/builtin/PySpark.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.magic.builtin
-
-import com.ibm.spark.interpreter.{ExecuteError, ExecuteAborted}
-import com.ibm.spark.kernel.interpreter.pyspark.{PySparkInterpreter, PySparkException}
-import com.ibm.spark.kernel.protocol.v5.MIMEType
-import com.ibm.spark.magic.{CellMagicOutput, CellMagic}
-import com.ibm.spark.magic.dependencies.IncludeKernel
-
-/**
- * Represents the magic interface to use the PySpark interpreter.
- */
-class PySpark extends CellMagic with IncludeKernel {
-  override def execute(code: String): CellMagicOutput = {
-    val pySpark = kernel.interpreter("PySpark")
-
-    if (pySpark.isEmpty || pySpark.get == null)
-      throw new PySparkException("PySpark is not available!")
-
-    pySpark.get match {
-      case pySparkInterpreter: PySparkInterpreter =>
-        val (_, output) = pySparkInterpreter.interpret(code)
-        output match {
-          case Left(executeOutput) =>
-            CellMagicOutput(MIMEType.PlainText -> executeOutput)
-          case Right(executeFailure) => executeFailure match {
-            case executeAborted: ExecuteAborted =>
-              throw new PySparkException("PySpark code was aborted!")
-            case executeError: ExecuteError =>
-              throw new PySparkException(executeError.value)
-          }
-        }
-      case otherInterpreter =>
-        val className = otherInterpreter.getClass.getName
-        throw new PySparkException(s"Invalid PySpark interpreter: $className")
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkBridge.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkBridge.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkBridge.scala
new file mode 100644
index 0000000..e0e8f60
--- /dev/null
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkBridge.scala
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.pyspark
+
+import com.ibm.spark.interpreter.broker.producer.{StandardSQLContextProducer, StandardJavaSparkContextProducer, SQLContextProducerLike, JavaSparkContextProducerLike}
+import com.ibm.spark.interpreter.broker.{BrokerState, BrokerBridge}
+import com.ibm.spark.kernel.api.KernelLike
+import org.apache.spark.SparkContext
+
+/**
+ * Represents constants for the PySpark bridge.
+ */
+object PySparkBridge {
+  /** Represents the maximum amount of code that can be queued for Python. */
+  val MaxQueuedCode = 500
+
+  /**
+   * Creates a new PySparkBridge instance.
+   *
+   * @param brokerState The container of broker state to expose
+   * @param kernel The kernel API to expose through the bridge
+   *
+   * @return The new PySpark bridge
+   */
+  def apply(
+    brokerState: BrokerState,
+    kernel: KernelLike
+  ): PySparkBridge = {
+    new PySparkBridge(
+      _brokerState = brokerState,
+      _kernel = kernel
+    ) with StandardJavaSparkContextProducer with StandardSQLContextProducer
+  }
+}
+
+/**
+ * Represents the API available to PySpark to act as the bridge for data
+ * between the JVM and Python.
+ *
+ * @param _brokerState The container of broker state to expose
+ * @param _kernel The kernel API to expose through the bridge
+ */
+class PySparkBridge private (
+  private val _brokerState: BrokerState,
+  private val _kernel: KernelLike
+) extends BrokerBridge(_brokerState, _kernel) {
+  override val brokerName: String = "PySpark"
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkException.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkException.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkException.scala
new file mode 100644
index 0000000..664c806
--- /dev/null
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkException.scala
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.pyspark
+
+import com.ibm.spark.interpreter.broker.BrokerException
+
+/**
+ * Represents a generic PySpark exception.
+ *
+ * @param message The message to associate with the exception
+ */
+class PySparkException(message: String) extends BrokerException(message)
+

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
new file mode 100644
index 0000000..615ed19
--- /dev/null
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.pyspark
+
+import java.net.URL
+
+import com.ibm.spark.interpreter.Results.Result
+import com.ibm.spark.interpreter._
+import com.ibm.spark.kernel.api.KernelLike
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.slf4j.LoggerFactory
+import py4j.GatewayServer
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.tools.nsc.interpreter.{InputStream, OutputStream}
+
+/**
+ * Represents an interpreter interface to PySpark. Requires a properly-set
+ * SPARK_HOME, PYTHONPATH pointing to Spark's Python source, and py4j installed
+ * where it is accessible to the Spark Kernel.
+ *
+ */
+class PySparkInterpreter(
+) extends Interpreter {
+  private val logger = LoggerFactory.getLogger(this.getClass)
+  private var _kernel:KernelLike = _
+
+  // TODO: Replace hard-coded maximum queue count
+  /** Represents the state used by this interpreter's Python instance. */
+  private lazy val pySparkState = new PySparkState(500)
+
+  /** Represents the bridge used by this interpreter's Python interface. */
+  private lazy val pySparkBridge = PySparkBridge(
+    pySparkState,
+    _kernel
+  )
+
+
+  /** Represents the interface for Python to talk to JVM Spark components. */
+  private lazy val gatewayServer = new GatewayServer(pySparkBridge, 0)
+
+  /** Represents the process handler used for the PySpark process. */
+  private lazy val pySparkProcessHandler: PySparkProcessHandler =
+    new PySparkProcessHandler(
+      pySparkBridge,
+      restartOnFailure = true,
+      restartOnCompletion = true
+    )
+
+  private lazy val pySparkService = new PySparkService(
+    gatewayServer,
+    pySparkBridge,
+    pySparkProcessHandler
+  )
+  private lazy val pySparkTransformer = new PySparkTransformer
+
+  /**
+   * Initializes the interpreter.
+   * @param kernel The kernel
+   * @return The newly initialized interpreter
+   */
+  override def init(kernel: KernelLike): Interpreter = {
+    _kernel = kernel
+    this
+  }
+
+  // Unsupported (but can be invoked)
+  override def bindSparkContext(sparkContext: SparkContext): Unit = {}
+
+  // Unsupported (but can be invoked)
+  override def bindSqlContext(sqlContext: SQLContext): Unit = {}
+
+  /**
+   * Executes the provided code with the option to silence output.
+   * @param code The code to execute
+   * @param silent Whether or not to execute the code silently (no output)
+   * @return The success/failure of the interpretation and the output from the
+   *         execution or the failure
+   */
+  override def interpret(code: String, silent: Boolean):
+    (Result, Either[ExecuteOutput, ExecuteFailure]) =
+  {
+    if (!pySparkService.isRunning) pySparkService.start()
+
+    val futureResult = pySparkTransformer.transformToInterpreterResult(
+      pySparkService.submitCode(code)
+    )
+
+    Await.result(futureResult, Duration.Inf)
+  }
+
+  /**
+   * Starts the interpreter, initializing any internal state.
+   * @return A reference to the interpreter
+   */
+  override def start(): Interpreter = {
+    pySparkService.start()
+
+    this
+  }
+
+  /**
+   * Stops the interpreter, removing any previous internal state.
+   * @return A reference to the interpreter
+   */
+  override def stop(): Interpreter = {
+    pySparkService.stop()
+
+    this
+  }
+
+  /**
+   * Returns the class loader used by this interpreter.
+   *
+   * @return The runtime class loader used by this interpreter
+   */
+  override def classLoader: ClassLoader = this.getClass.getClassLoader
+
+  // Unsupported (but can be invoked)
+  override def lastExecutionVariableName: Option[String] = None
+
+  // Unsupported (but can be invoked)
+  override def read(variableName: String): Option[AnyRef] = None
+
+  // Unsupported (but can be invoked)
+  override def completion(code: String, pos: Int): (Int, List[String]) =
+    (pos, Nil)
+
+  // Unsupported
+  override def updatePrintStreams(in: InputStream, out: OutputStream, err: OutputStream): Unit = ???
+
+  // Unsupported
+  override def classServerURI: String = ""
+
+  // Unsupported
+  override def interrupt(): Interpreter = ???
+
+  // Unsupported
+  override def bind(variableName: String, typeName: String, value: Any, modifiers: List[String]): Unit = ???
+
+  // Unsupported
+  override def addJars(jars: URL*): Unit = ???
+
+  // Unsupported
+  override def doQuietly[T](body: => T): T = ???
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkProcess.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkProcess.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkProcess.scala
new file mode 100644
index 0000000..ace6635
--- /dev/null
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkProcess.scala
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.pyspark
+
+import java.io.{FileOutputStream, File}
+
+import com.ibm.spark.interpreter.broker.BrokerProcess
+import org.apache.commons.exec.environment.EnvironmentUtils
+import org.apache.commons.exec._
+import org.apache.commons.io.IOUtils
+import org.apache.spark.SparkContext
+import org.slf4j.LoggerFactory
+
+/**
+ * Represents the Python process used to evaluate PySpark code.
+ *
+ * @param pySparkBridge The bridge to use to retrieve kernel output streams
+ *                      and the Spark version to be verified
+ * @param pySparkProcessHandler The handler to use when the process fails or
+ *                              completes
+ * @param port The port to provide to the PySpark process to use to connect
+ *             back to the JVM
+ * @param sparkVersion The version of Spark that the process will be using
+ */
+class PySparkProcess(
+  private val pySparkBridge: PySparkBridge,
+  private val pySparkProcessHandler: PySparkProcessHandler,
+  private val port: Int,
+  private val sparkVersion: String
+) extends BrokerProcess(
+  processName = "python",
+  entryResource = "PySpark/pyspark_runner.py",
+  otherResources = Nil,
+  brokerBridge = pySparkBridge,
+  brokerProcessHandler = pySparkProcessHandler,
+  arguments = Seq(port.toString, sparkVersion)
+) {
+
+  override val brokerName: String = "PySpark"
+  private val logger = LoggerFactory.getLogger(this.getClass)
+
+  private val sparkHome = Option(System.getenv("SPARK_HOME"))
+    .orElse(Option(System.getProperty("spark.home")))
+  private val pythonPath = Option(System.getenv("PYTHONPATH"))
+
+  assert(sparkHome.nonEmpty, "PySpark process requires Spark Home to be set!")
+  if (pythonPath.isEmpty) logger.warn("PYTHONPATH not provided for PySpark!")
+
+  /**
+   * Creates a new process environment to be used for environment variable
+   * retrieval by the new process.
+   *
+   * @return The map of environment variables and their respective values
+   */
+  override protected def newProcessEnvironment(): Map[String, String] = {
+    val baseEnvironment = super.newProcessEnvironment()
+
+    import java.io.File.pathSeparator
+
+    val baseSparkHome = sparkHome.get
+    val basePythonPath = pythonPath.getOrElse("")
+    val updatedPythonPath =
+      (basePythonPath.split(pathSeparator) :+ s"$baseSparkHome/python/")
+        .map(_.trim)
+        .filter(_.nonEmpty)
+        .map(new File(_))
+        .distinct
+        .mkString(pathSeparator)
+
+    // Note: Adding the new map values should override the old ones
+    baseEnvironment ++ Map(
+      "SPARK_HOME" -> baseSparkHome,
+      "PYTHONPATH" -> updatedPythonPath
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkProcessHandler.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkProcessHandler.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkProcessHandler.scala
new file mode 100644
index 0000000..ab2aeb1
--- /dev/null
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkProcessHandler.scala
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.pyspark
+
+import com.ibm.spark.interpreter.broker.BrokerProcessHandler
+
+/**
+ * Represents the handler for events triggered by the PySpark process.
+ *
+ * @param pySparkBridge The bridge to reset when the process fails or completes
+ * @param restartOnFailure If true, restarts the process if it fails
+ * @param restartOnCompletion If true, restarts the process if it completes
+ */
+class PySparkProcessHandler(
+  private val pySparkBridge: PySparkBridge,
+  private val restartOnFailure: Boolean,
+  private val restartOnCompletion: Boolean
+  ) extends BrokerProcessHandler(
+  pySparkBridge,
+  restartOnFailure,
+  restartOnCompletion
+) {
+  override val brokerName: String = "PySpark"
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkService.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkService.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkService.scala
new file mode 100644
index 0000000..ec264e2
--- /dev/null
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkService.scala
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.pyspark
+
+import com.ibm.spark.interpreter.broker.BrokerService
+import com.ibm.spark.kernel.interpreter.pyspark.PySparkTypes._
+import org.apache.spark.SparkContext
+import org.slf4j.LoggerFactory
+import py4j.GatewayServer
+
+import scala.concurrent.Future
+
+/**
+ * Represents the service that provides the high-level interface between the
+ * JVM and Python.
+ *
+ * @param gatewayServer The backend to start to communicate between the JVM and
+ *                      Python
+ * @param pySparkBridge The bridge to use for communication between the JVM and
+ *                      Python
+ * @param pySparkProcessHandler The handler used for events that occur with
+ *                              the PySpark process
+ */
+class PySparkService(
+  private val gatewayServer: GatewayServer,
+  private val pySparkBridge: PySparkBridge,
+  private val pySparkProcessHandler: PySparkProcessHandler
+) extends BrokerService {
+  private val logger = LoggerFactory.getLogger(this.getClass)
+  @volatile private var _isRunning: Boolean = false
+  override def isRunning: Boolean = _isRunning
+
+
+  /** Represents the process used to execute Python code via the bridge. */
+  private lazy val pySparkProcess = {
+    val p = new PySparkProcess(
+      pySparkBridge,
+      pySparkProcessHandler,
+      gatewayServer.getListeningPort,
+      org.apache.spark.SPARK_VERSION
+    )
+
+    // Update handlers to correctly reset and restart the process
+    pySparkProcessHandler.setResetMethod(message => {
+      p.stop()
+      pySparkBridge.state.reset(message)
+    })
+    pySparkProcessHandler.setRestartMethod(() => p.start())
+
+    p
+  }
+
+  /** Starts the PySpark service. */
+  def start(): Unit = {
+    // Start without forking the gateway server (needs to have access to
+    // SparkContext in current JVM)
+    logger.debug("Starting gateway server")
+    gatewayServer.start()
+
+    val port = gatewayServer.getListeningPort
+    logger.debug(s"Gateway server running on port $port")
+
+    // Start the Python process used to execute code
+    logger.debug("Launching process to execute Python code")
+    pySparkProcess.start()
+
+    _isRunning = true
+  }
+
+  /**
+   * Submits code to the PySpark service to be executed and return a result.
+   *
+   * @param code The code to execute
+   *
+   * @return The result as a future to eventually return
+   */
+  def submitCode(code: Code): Future[CodeResults] = {
+    pySparkBridge.state.pushCode(code)
+  }
+
+  /** Stops the running PySpark service. */
+  def stop(): Unit = {
+    // Stop the Python process used to execute code
+    pySparkProcess.stop()
+
+    // Stop the server used as an entrypoint for Python
+    gatewayServer.shutdown()
+
+    _isRunning = false
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkState.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkState.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkState.scala
new file mode 100644
index 0000000..2d0f63f
--- /dev/null
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkState.scala
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.pyspark
+
+import com.ibm.spark.interpreter.broker.BrokerState
+
+/**
+ * Represents the state structure of PySpark.
+ *
+ * @param maxQueuedCode The maximum amount of code to support being queued
+ *                      at the same time for PySpark execution
+ */
+class PySparkState(private val maxQueuedCode: Int)
+  extends BrokerState(maxQueuedCode)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkTransformer.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkTransformer.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkTransformer.scala
new file mode 100644
index 0000000..146ca8e
--- /dev/null
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkTransformer.scala
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.pyspark
+
+import com.ibm.spark.interpreter.broker.BrokerTransformer
+
+/**
+ * Represents a utility that can transform raw PySpark information to
+ * kernel information.
+ */
+class PySparkTransformer extends BrokerTransformer

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkTypes.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkTypes.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkTypes.scala
new file mode 100644
index 0000000..1004a1f
--- /dev/null
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkTypes.scala
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.pyspark
+
+import com.ibm.spark.interpreter.broker.BrokerTypesProvider
+
+/**
+ * Represents all types associated with the PySpark interface.
+ */
+object PySparkTypes extends BrokerTypesProvider

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/package.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/package.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/package.scala
new file mode 100644
index 0000000..618a678
--- /dev/null
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/package.scala
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter
+
+import com.ibm.spark.interpreter.broker.{BrokerCode, BrokerPromise}
+
+/**
+ * Contains aliases to broker types.
+ */
+package object pyspark {
+  /**
+   * Represents a promise made regarding the completion of PySpark code
+   * execution.
+   */
+  type PySparkPromise = BrokerPromise
+
+  /**
+   * Represents a block of PyPython code to be evaluated.
+   */
+  type PySparkCode = BrokerCode
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/org/apache/toree/magic/builtin/PySpark.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/magic/builtin/PySpark.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/magic/builtin/PySpark.scala
new file mode 100644
index 0000000..a0a79b5
--- /dev/null
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/magic/builtin/PySpark.scala
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.magic.builtin
+
+import com.ibm.spark.interpreter.{ExecuteError, ExecuteAborted}
+import com.ibm.spark.kernel.interpreter.pyspark.{PySparkInterpreter, PySparkException}
+import com.ibm.spark.kernel.protocol.v5.MIMEType
+import com.ibm.spark.magic.{CellMagicOutput, CellMagic}
+import com.ibm.spark.magic.dependencies.IncludeKernel
+
+/**
+ * Represents the magic interface to use the PySpark interpreter.
+ */
+class PySpark extends CellMagic with IncludeKernel {
+  override def execute(code: String): CellMagicOutput = {
+    val pySpark = kernel.interpreter("PySpark")
+
+    if (pySpark.isEmpty || pySpark.get == null)
+      throw new PySparkException("PySpark is not available!")
+
+    pySpark.get match {
+      case pySparkInterpreter: PySparkInterpreter =>
+        val (_, output) = pySparkInterpreter.interpret(code)
+        output match {
+          case Left(executeOutput) =>
+            CellMagicOutput(MIMEType.PlainText -> executeOutput)
+          case Right(executeFailure) => executeFailure match {
+            case executeAborted: ExecuteAborted =>
+              throw new PySparkException("PySpark code was aborted!")
+            case executeError: ExecuteError =>
+              throw new PySparkException(executeError.value)
+          }
+        }
+      case otherInterpreter =>
+        val className = otherInterpreter.getClass.getName
+        throw new PySparkException(s"Invalid PySpark interpreter: $className")
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaException.scala
----------------------------------------------------------------------
diff --git a/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaException.scala b/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaException.scala
deleted file mode 100644
index 978cf90..0000000
--- a/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaException.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.scala
-
-/**
- * Represents a generic Scala exception.
- *
- * @param message The message to associate with the exception
- */
-class ScalaException(message: String) extends Throwable(message)