You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lb...@apache.org on 2016/01/22 23:07:12 UTC
[04/51] [abbrv] incubator-toree git commit: Moved scala files to new
locations based on new package
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/InspectRequestSpec.scala
----------------------------------------------------------------------
diff --git a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/InspectRequestSpec.scala b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/InspectRequestSpec.scala
new file mode 100644
index 0000000..2ff561d
--- /dev/null
+++ b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/InspectRequestSpec.scala
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.content
+
+import org.scalatest.{FunSpec, Matchers}
+import play.api.data.validation.ValidationError
+import play.api.libs.json._
+
+class InspectRequestSpec extends FunSpec with Matchers {
+ val inspectRequestJson: JsValue = Json.parse("""
+ {
+ "code": "<STRING>",
+ "cursor_pos": 999,
+ "detail_level": 1
+ }
+ """)
+
+ val inspectRequest: InspectRequest = InspectRequest(
+ "<STRING>", 999, 1
+ )
+
+ describe("InspectRequest") {
+ describe("#toTypeString") {
+ it("should return correct type") {
+ InspectRequest.toTypeString should be ("inspect_request")
+ }
+ }
+
+ describe("implicit conversions") {
+ it("should implicitly convert from valid json to a InspectRequest instance") {
+ // This is the least safe way to convert as an error is thrown if it fails
+ inspectRequestJson.as[InspectRequest] should be (inspectRequest)
+ }
+
+ it("should also work with asOpt") {
+ // This is safer, but we lose the error information as it returns
+ // None if the conversion fails
+ val newInspectRequest = inspectRequestJson.asOpt[InspectRequest]
+
+ newInspectRequest.get should be (inspectRequest)
+ }
+
+ it("should also work with validate") {
+ // This is the safest as it collects all error information (not just first error) and reports it
+ val InspectRequestResults = inspectRequestJson.validate[InspectRequest]
+
+ InspectRequestResults.fold(
+ (invalid: Seq[(JsPath, Seq[ValidationError])]) => println("Failed!"),
+ (valid: InspectRequest) => valid
+ ) should be (inspectRequest)
+ }
+
+ it("should implicitly convert from a InspectRequest instance to valid json") {
+ Json.toJson(inspectRequest) should be (inspectRequestJson)
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelInfoReplySpec.scala
----------------------------------------------------------------------
diff --git a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelInfoReplySpec.scala b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelInfoReplySpec.scala
new file mode 100644
index 0000000..7d704e2
--- /dev/null
+++ b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelInfoReplySpec.scala
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.content
+
+import org.scalatest.{FunSpec, Matchers}
+import play.api.data.validation.ValidationError
+import play.api.libs.json._
+
+class KernelInfoReplySpec extends FunSpec with Matchers {
+ val kernelInfoReplyJson: JsValue = Json.parse("""
+ {
+ "protocol_version": "x.y.z",
+ "implementation": "<name>",
+ "implementation_version": "z.y.x",
+ "language_info": { "name": "<some language>" },
+ "language_version": "a.b.c",
+ "banner": "<some banner>"
+ }
+ """)
+
+ val kernelInfoReply: KernelInfoReply = KernelInfoReply(
+ "x.y.z", "<name>", "z.y.x", Map("name" -> "<some language>"), "a.b.c", "<some banner>"
+ )
+
+ describe("KernelInfoReply") {
+ describe("#toTypeString") {
+ it("should return correct type") {
+ KernelInfoReply.toTypeString should be ("kernel_info_reply")
+ }
+ }
+
+ describe("implicit conversions") {
+ it("should implicitly convert from valid json to a kernelInfoReply instance") {
+ // This is the least safe way to convert as an error is thrown if it fails
+ kernelInfoReplyJson.as[KernelInfoReply] should be (kernelInfoReply)
+ }
+
+ it("should also work with asOpt") {
+ // This is safer, but we lose the error information as it returns
+ // None if the conversion fails
+ val newKernelInfoReply = kernelInfoReplyJson.asOpt[KernelInfoReply]
+
+ newKernelInfoReply.get should be (kernelInfoReply)
+ }
+
+ it("should also work with validate") {
+ // This is the safest as it collects all error information (not just first error) and reports it
+ val kernelInfoReplyResults = kernelInfoReplyJson.validate[KernelInfoReply]
+
+ kernelInfoReplyResults.fold(
+ (invalid: Seq[(JsPath, Seq[ValidationError])]) => println("Failed!"),
+ (valid: KernelInfoReply) => valid
+ ) should be (kernelInfoReply)
+ }
+
+ it("should implicitly convert from a kernelInfoReply instance to valid json") {
+ Json.toJson(kernelInfoReply) should be (kernelInfoReplyJson)
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelInfoRequestSpec.scala
----------------------------------------------------------------------
diff --git a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelInfoRequestSpec.scala b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelInfoRequestSpec.scala
new file mode 100644
index 0000000..e63b4c3
--- /dev/null
+++ b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelInfoRequestSpec.scala
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.content
+
+import org.scalatest.{FunSpec, Matchers}
+import play.api.data.validation.ValidationError
+import play.api.libs.json._
+
+class KernelInfoRequestSpec extends FunSpec with Matchers {
+ val kernelInfoRequestJson: JsValue = Json.parse("""
+ {}
+ """)
+
+ val kernelInfoRequest: KernelInfoRequest = KernelInfoRequest()
+
+ describe("KernelInfoRequest") {
+ describe("#toTypeString") {
+ it("should return correct type") {
+ KernelInfoRequest.toTypeString should be ("kernel_info_request")
+ }
+ }
+
+ describe("implicit conversions") {
+ it("should implicitly convert from valid json to a KernelInfoRequest instance") {
+ // This is the least safe way to convert as an error is thrown if it fails
+ // This is the least safe way to convert as an error is thrown if it fails
+ kernelInfoRequestJson.as[KernelInfoRequest] should be (kernelInfoRequest)
+ }
+
+ it("should also work with asOpt") {
+ // This is safer, but we lose the error information as it returns
+ // None if the conversion fails
+ val newKernelInfoRequest = kernelInfoRequestJson.asOpt[KernelInfoRequest]
+
+ newKernelInfoRequest.get should be (kernelInfoRequest)
+ }
+
+ it("should also work with validate") {
+ // This is the safest as it collects all error information (not just first error) and reports it
+ val KernelInfoRequestResults = kernelInfoRequestJson.validate[KernelInfoRequest]
+
+ KernelInfoRequestResults.fold(
+ (invalid: Seq[(JsPath, Seq[ValidationError])]) => println("Failed!"),
+ (valid: KernelInfoRequest) => valid
+ ) should be (kernelInfoRequest)
+ }
+
+ it("should implicitly convert from a KernelInfoRequest instance to valid json") {
+ Json.toJson(kernelInfoRequest) should be (kernelInfoRequestJson)
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelStatusSpec.scala
----------------------------------------------------------------------
diff --git a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelStatusSpec.scala b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelStatusSpec.scala
new file mode 100644
index 0000000..4663d98
--- /dev/null
+++ b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelStatusSpec.scala
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.content
+
+import org.scalatest.{FunSpec, Matchers}
+import play.api.data.validation.ValidationError
+import play.api.libs.json.{JsPath, JsValue, Json}
+
+class KernelStatusSpec extends FunSpec with Matchers {
+ val kernelStatusJson: JsValue = Json.parse("""
+ {
+ "execution_state": "<STRING>"
+ }
+ """)
+
+ val kernelStatus: KernelStatus = KernelStatus("<STRING>")
+
+ describe("KernelStatus") {
+ describe("#toTypeString") {
+ it("should return correct type") {
+ KernelStatus.toTypeString should be ("status")
+ }
+ }
+
+ describe("implicit conversions") {
+ it("should implicitly convert from valid json to a kernelStatus instance") {
+ // This is the least safe way to convert as an error is thrown if it fails
+ kernelStatusJson.as[KernelStatus] should be (kernelStatus)
+ }
+
+ it("should also work with asOpt") {
+ // This is safer, but we lose the error information as it returns
+ // None if the conversion fails
+ val newKernelStatus = kernelStatusJson.asOpt[KernelStatus]
+
+ newKernelStatus.get should be (kernelStatus)
+ }
+
+ it("should also work with validate") {
+ // This is the safest as it collects all error information (not just first error) and reports it
+ val kernelStatusResults = kernelStatusJson.validate[KernelStatus]
+
+ kernelStatusResults.fold(
+ (invalid: Seq[(JsPath, Seq[ValidationError])]) => println("Failed!"),
+ (valid: KernelStatus) => valid
+ ) should be (kernelStatus)
+ }
+
+ it("should implicitly convert from a kernelStatus instance to valid json") {
+ Json.toJson(kernelStatus) should be (kernelStatusJson)
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/ShutdownReplySpec.scala
----------------------------------------------------------------------
diff --git a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/ShutdownReplySpec.scala b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/ShutdownReplySpec.scala
new file mode 100644
index 0000000..15e8eba
--- /dev/null
+++ b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/ShutdownReplySpec.scala
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.content
+
+import org.scalatest.{FunSpec, Matchers}
+import play.api.data.validation.ValidationError
+import play.api.libs.json._
+
+class ShutdownReplySpec extends FunSpec with Matchers {
+ val shutdownReplyJson: JsValue = Json.parse("""
+ {
+ "restart": true
+ }
+ """)
+
+ val shutdownReply: ShutdownReply = ShutdownReply(
+ true
+ )
+
+ describe("ShutdownReply") {
+ describe("#toTypeString") {
+ it("should return correct type") {
+ ShutdownReply.toTypeString should be ("shutdown_reply")
+ }
+ }
+
+ describe("implicit conversions") {
+ it("should implicitly convert from valid json to a ShutdownReply instance") {
+ // This is the least safe way to convert as an error is thrown if it fails
+ shutdownReplyJson.as[ShutdownReply] should be (shutdownReply)
+ }
+
+ it("should also work with asOpt") {
+ // This is safer, but we lose the error information as it returns
+ // None if the conversion fails
+ val newShutdownReply = shutdownReplyJson.asOpt[ShutdownReply]
+
+ newShutdownReply.get should be (shutdownReply)
+ }
+
+ it("should also work with validate") {
+ // This is the safest as it collects all error information (not just first error) and reports it
+ val ShutdownReplyResults = shutdownReplyJson.validate[ShutdownReply]
+
+ ShutdownReplyResults.fold(
+ (invalid: Seq[(JsPath, Seq[ValidationError])]) => println("Failed!"),
+ (valid: ShutdownReply) => valid
+ ) should be (shutdownReply)
+ }
+
+ it("should implicitly convert from a ShutdownReply instance to valid json") {
+ Json.toJson(shutdownReply) should be (shutdownReplyJson)
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/ShutdownRequestSpec.scala
----------------------------------------------------------------------
diff --git a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/ShutdownRequestSpec.scala b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/ShutdownRequestSpec.scala
new file mode 100644
index 0000000..fdc063d
--- /dev/null
+++ b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/ShutdownRequestSpec.scala
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.content
+
+import org.scalatest.{FunSpec, Matchers}
+import play.api.data.validation.ValidationError
+import play.api.libs.json._
+
+class ShutdownRequestSpec extends FunSpec with Matchers {
+ val shutdownRequestJson: JsValue = Json.parse("""
+ {
+ "restart": true
+ }
+ """)
+
+ val shutdownRequest: ShutdownRequest = ShutdownRequest(
+ true
+ )
+
+ describe("ShutdownRequest") {
+ describe("#toTypeString") {
+ it("should return correct type") {
+ ShutdownRequest.toTypeString should be ("shutdown_request")
+ }
+ }
+
+ describe("implicit conversions") {
+ it("should implicitly convert from valid json to a ShutdownRequest instance") {
+ // This is the least safe way to convert as an error is thrown if it fails
+ shutdownRequestJson.as[ShutdownRequest] should be (shutdownRequest)
+ }
+
+ it("should also work with asOpt") {
+ // This is safer, but we lose the error information as it returns
+ // None if the conversion fails
+ val newShutdownRequest = shutdownRequestJson.asOpt[ShutdownRequest]
+
+ newShutdownRequest.get should be (shutdownRequest)
+ }
+
+ it("should also work with validate") {
+ // This is the safest as it collects all error information (not just first error) and reports it
+ val ShutdownRequestResults = shutdownRequestJson.validate[ShutdownRequest]
+
+ ShutdownRequestResults.fold(
+ (invalid: Seq[(JsPath, Seq[ValidationError])]) => println("Failed!"),
+ (valid: ShutdownRequest) => valid
+ ) should be (shutdownRequest)
+ }
+
+ it("should implicitly convert from a ShutdownRequest instance to valid json") {
+ Json.toJson(shutdownRequest) should be (shutdownRequestJson)
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/StreamContentSpec.scala
----------------------------------------------------------------------
diff --git a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/StreamContentSpec.scala b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/StreamContentSpec.scala
new file mode 100644
index 0000000..5c435a8
--- /dev/null
+++ b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/StreamContentSpec.scala
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol.v5.content
+
+import org.scalatest.{FunSpec, Matchers}
+import play.api.data.validation.ValidationError
+import play.api.libs.json._
+
+class StreamContentSpec extends FunSpec with Matchers {
+ val streamJson: JsValue = Json.parse("""
+ {
+ "text": "<STRING>",
+ "name": "<STRING>"
+ }
+ """)
+
+ val stream = StreamContent("<STRING>", "<STRING>")
+
+ describe("StreamContent") {
+ describe("#toTypeString") {
+ it("should return correct type") {
+ StreamContent.toTypeString should be ("stream")
+ }
+ }
+
+ describe("implicit conversions") {
+ it("should implicitly convert from valid json to a StreamContent instance") {
+ // This is the least safe way to convert as an error is thrown if it fails
+ streamJson.as[StreamContent] should be (stream)
+ }
+
+ it("should also work with asOpt") {
+ // This is safer, but we lose the error information as it returns
+ // None if the conversion fails
+ val newCompleteRequest = streamJson.asOpt[StreamContent]
+
+ newCompleteRequest.get should be (stream)
+ }
+
+ it("should also work with validate") {
+ // This is the safest as it collects all error information (not just first error) and reports it
+ val CompleteRequestResults = streamJson.validate[StreamContent]
+
+ CompleteRequestResults.fold(
+ (invalid: Seq[(JsPath, Seq[ValidationError])]) => println("Failed!"),
+ (valid: StreamContent) => valid
+ ) should be (stream)
+ }
+
+ it("should implicitly convert from a StreamContent instance to valid json") {
+ Json.toJson(stream) should be (streamJson)
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/package.scala
----------------------------------------------------------------------
diff --git a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/package.scala b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/package.scala
new file mode 100644
index 0000000..0174f9b
--- /dev/null
+++ b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/package.scala
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.kernel.protocol
+
+//import akka.zeromq.ZMQMessage
+import com.ibm.spark.kernel.protocol.v5._
+import com.ibm.spark.kernel.protocol.v5.content.{CompleteRequest, ExecuteRequest}
+import play.api.libs.json.Json
+
+package object v5Test {
+ // The header for the message
+ val MockHeader : Header = Header("<UUID>","<USER>","<SESSION>",
+ MessageType.Outgoing.ClearOutput.toString, "<VERSION>")
+ // The parent header for the message
+ val MockParenHeader: Header = Header("<PARENT-UUID>","<PARENT-USER>","<PARENT-SESSION>",
+ MessageType.Outgoing.ClearOutput.toString, "<PARENT-VERSION>")
+ // The actual kernel message
+ val MockKernelMessage : KernelMessage = KernelMessage(Seq("<ID>"), "<SIGNATURE>", MockHeader,
+ MockParenHeader, Metadata(), "<CONTENT>")
+ // Use the implicit to convert the KernelMessage to ZMQMessage
+ //val MockZMQMessage : ZMQMessage = MockKernelMessage
+
+ val MockExecuteRequest: ExecuteRequest =
+ ExecuteRequest("spark code", false, true, Map(), false)
+ val MockExecuteRequestKernelMessage = MockKernelMessage.copy(
+ contentString = Json.toJson(MockExecuteRequest).toString
+ )
+ val MockKernelMessageWithBadExecuteRequest = new KernelMessage(
+ Seq[String](), "test message", MockHeader, MockParenHeader, Map[String, String](),
+ """
+ {"code" : 124 }
+ """
+ )
+ val MockCompleteRequest: CompleteRequest = CompleteRequest("", 0)
+ val MockCompleteRequestKernelMessage: KernelMessage = MockKernelMessage.copy(contentString = Json.toJson(MockCompleteRequest).toString)
+ val MockKernelMessageWithBadJSON: KernelMessage = MockKernelMessage.copy(contentString = "inval1d")
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkBridge.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkBridge.scala b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkBridge.scala
deleted file mode 100644
index e0e8f60..0000000
--- a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkBridge.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.pyspark
-
-import com.ibm.spark.interpreter.broker.producer.{StandardSQLContextProducer, StandardJavaSparkContextProducer, SQLContextProducerLike, JavaSparkContextProducerLike}
-import com.ibm.spark.interpreter.broker.{BrokerState, BrokerBridge}
-import com.ibm.spark.kernel.api.KernelLike
-import org.apache.spark.SparkContext
-
-/**
- * Represents constants for the PySpark bridge.
- */
-object PySparkBridge {
- /** Represents the maximum amount of code that can be queued for Python. */
- val MaxQueuedCode = 500
-
- /**
- * Creates a new PySparkBridge instance.
- *
- * @param brokerState The container of broker state to expose
- * @param kernel The kernel API to expose through the bridge
- *
- * @return The new PySpark bridge
- */
- def apply(
- brokerState: BrokerState,
- kernel: KernelLike
- ): PySparkBridge = {
- new PySparkBridge(
- _brokerState = brokerState,
- _kernel = kernel
- ) with StandardJavaSparkContextProducer with StandardSQLContextProducer
- }
-}
-
-/**
- * Represents the API available to PySpark to act as the bridge for data
- * between the JVM and Python.
- *
- * @param _brokerState The container of broker state to expose
- * @param _kernel The kernel API to expose through the bridge
- */
-class PySparkBridge private (
- private val _brokerState: BrokerState,
- private val _kernel: KernelLike
-) extends BrokerBridge(_brokerState, _kernel) {
- override val brokerName: String = "PySpark"
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkException.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkException.scala b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkException.scala
deleted file mode 100644
index 664c806..0000000
--- a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkException.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.pyspark
-
-import com.ibm.spark.interpreter.broker.BrokerException
-
-/**
- * Represents a generic PySpark exception.
- *
- * @param message The message to associate with the exception
- */
-class PySparkException(message: String) extends BrokerException(message)
-
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkInterpreter.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkInterpreter.scala b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkInterpreter.scala
deleted file mode 100644
index 615ed19..0000000
--- a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkInterpreter.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.pyspark
-
-import java.net.URL
-
-import com.ibm.spark.interpreter.Results.Result
-import com.ibm.spark.interpreter._
-import com.ibm.spark.kernel.api.KernelLike
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
-import org.slf4j.LoggerFactory
-import py4j.GatewayServer
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-import scala.tools.nsc.interpreter.{InputStream, OutputStream}
-
-/**
- * Represents an interpreter interface to PySpark. Requires a properly-set
- * SPARK_HOME, PYTHONPATH pointing to Spark's Python source, and py4j installed
- * where it is accessible to the Spark Kernel.
- *
- */
-class PySparkInterpreter(
-) extends Interpreter {
- private val logger = LoggerFactory.getLogger(this.getClass)
- private var _kernel:KernelLike = _
-
- // TODO: Replace hard-coded maximum queue count
- /** Represents the state used by this interpreter's Python instance. */
- private lazy val pySparkState = new PySparkState(500)
-
- /** Represents the bridge used by this interpreter's Python interface. */
- private lazy val pySparkBridge = PySparkBridge(
- pySparkState,
- _kernel
- )
-
-
- /** Represents the interface for Python to talk to JVM Spark components. */
- private lazy val gatewayServer = new GatewayServer(pySparkBridge, 0)
-
- /** Represents the process handler used for the PySpark process. */
- private lazy val pySparkProcessHandler: PySparkProcessHandler =
- new PySparkProcessHandler(
- pySparkBridge,
- restartOnFailure = true,
- restartOnCompletion = true
- )
-
- private lazy val pySparkService = new PySparkService(
- gatewayServer,
- pySparkBridge,
- pySparkProcessHandler
- )
- private lazy val pySparkTransformer = new PySparkTransformer
-
- /**
- * Initializes the interpreter.
- * @param kernel The kernel
- * @return The newly initialized interpreter
- */
- override def init(kernel: KernelLike): Interpreter = {
- _kernel = kernel
- this
- }
-
- // Unsupported (but can be invoked)
- override def bindSparkContext(sparkContext: SparkContext): Unit = {}
-
- // Unsupported (but can be invoked)
- override def bindSqlContext(sqlContext: SQLContext): Unit = {}
-
- /**
- * Executes the provided code with the option to silence output.
- * @param code The code to execute
- * @param silent Whether or not to execute the code silently (no output)
- * @return The success/failure of the interpretation and the output from the
- * execution or the failure
- */
- override def interpret(code: String, silent: Boolean):
- (Result, Either[ExecuteOutput, ExecuteFailure]) =
- {
- if (!pySparkService.isRunning) pySparkService.start()
-
- val futureResult = pySparkTransformer.transformToInterpreterResult(
- pySparkService.submitCode(code)
- )
-
- Await.result(futureResult, Duration.Inf)
- }
-
- /**
- * Starts the interpreter, initializing any internal state.
- * @return A reference to the interpreter
- */
- override def start(): Interpreter = {
- pySparkService.start()
-
- this
- }
-
- /**
- * Stops the interpreter, removing any previous internal state.
- * @return A reference to the interpreter
- */
- override def stop(): Interpreter = {
- pySparkService.stop()
-
- this
- }
-
- /**
- * Returns the class loader used by this interpreter.
- *
- * @return The runtime class loader used by this interpreter
- */
- override def classLoader: ClassLoader = this.getClass.getClassLoader
-
- // Unsupported (but can be invoked)
- override def lastExecutionVariableName: Option[String] = None
-
- // Unsupported (but can be invoked)
- override def read(variableName: String): Option[AnyRef] = None
-
- // Unsupported (but can be invoked)
- override def completion(code: String, pos: Int): (Int, List[String]) =
- (pos, Nil)
-
- // Unsupported
- override def updatePrintStreams(in: InputStream, out: OutputStream, err: OutputStream): Unit = ???
-
- // Unsupported
- override def classServerURI: String = ""
-
- // Unsupported
- override def interrupt(): Interpreter = ???
-
- // Unsupported
- override def bind(variableName: String, typeName: String, value: Any, modifiers: List[String]): Unit = ???
-
- // Unsupported
- override def addJars(jars: URL*): Unit = ???
-
- // Unsupported
- override def doQuietly[T](body: => T): T = ???
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkProcess.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkProcess.scala b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkProcess.scala
deleted file mode 100644
index ace6635..0000000
--- a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkProcess.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.pyspark
-
-import java.io.{FileOutputStream, File}
-
-import com.ibm.spark.interpreter.broker.BrokerProcess
-import org.apache.commons.exec.environment.EnvironmentUtils
-import org.apache.commons.exec._
-import org.apache.commons.io.IOUtils
-import org.apache.spark.SparkContext
-import org.slf4j.LoggerFactory
-
-/**
- * Represents the Python process used to evaluate PySpark code.
- *
- * @param pySparkBridge The bridge to use to retrieve kernel output streams
- * and the Spark version to be verified
- * @param pySparkProcessHandler The handler to use when the process fails or
- * completes
- * @param port The port to provide to the PySpark process to use to connect
- * back to the JVM
- * @param sparkVersion The version of Spark that the process will be using
- */
-class PySparkProcess(
- private val pySparkBridge: PySparkBridge,
- private val pySparkProcessHandler: PySparkProcessHandler,
- private val port: Int,
- private val sparkVersion: String
-) extends BrokerProcess(
- processName = "python",
- entryResource = "PySpark/pyspark_runner.py",
- otherResources = Nil,
- brokerBridge = pySparkBridge,
- brokerProcessHandler = pySparkProcessHandler,
- arguments = Seq(port.toString, sparkVersion)
-) {
-
- override val brokerName: String = "PySpark"
- private val logger = LoggerFactory.getLogger(this.getClass)
-
- private val sparkHome = Option(System.getenv("SPARK_HOME"))
- .orElse(Option(System.getProperty("spark.home")))
- private val pythonPath = Option(System.getenv("PYTHONPATH"))
-
- assert(sparkHome.nonEmpty, "PySpark process requires Spark Home to be set!")
- if (pythonPath.isEmpty) logger.warn("PYTHONPATH not provided for PySpark!")
-
- /**
- * Creates a new process environment to be used for environment variable
- * retrieval by the new process.
- *
- * @return The map of environment variables and their respective values
- */
- override protected def newProcessEnvironment(): Map[String, String] = {
- val baseEnvironment = super.newProcessEnvironment()
-
- import java.io.File.pathSeparator
-
- val baseSparkHome = sparkHome.get
- val basePythonPath = pythonPath.getOrElse("")
- val updatedPythonPath =
- (basePythonPath.split(pathSeparator) :+ s"$baseSparkHome/python/")
- .map(_.trim)
- .filter(_.nonEmpty)
- .map(new File(_))
- .distinct
- .mkString(pathSeparator)
-
- // Note: Adding the new map values should override the old ones
- baseEnvironment ++ Map(
- "SPARK_HOME" -> baseSparkHome,
- "PYTHONPATH" -> updatedPythonPath
- )
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkProcessHandler.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkProcessHandler.scala b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkProcessHandler.scala
deleted file mode 100644
index ab2aeb1..0000000
--- a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkProcessHandler.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.pyspark
-
-import com.ibm.spark.interpreter.broker.BrokerProcessHandler
-
-/**
- * Represents the handler for events triggered by the PySpark process.
- *
- * @param pySparkBridge The bridge to reset when the process fails or completes
- * @param restartOnFailure If true, restarts the process if it fails
- * @param restartOnCompletion If true, restarts the process if it completes
- */
-class PySparkProcessHandler(
- private val pySparkBridge: PySparkBridge,
- private val restartOnFailure: Boolean,
- private val restartOnCompletion: Boolean
- ) extends BrokerProcessHandler(
- pySparkBridge,
- restartOnFailure,
- restartOnCompletion
-) {
- override val brokerName: String = "PySpark"
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkService.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkService.scala b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkService.scala
deleted file mode 100644
index ec264e2..0000000
--- a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkService.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.pyspark
-
-import com.ibm.spark.interpreter.broker.BrokerService
-import com.ibm.spark.kernel.interpreter.pyspark.PySparkTypes._
-import org.apache.spark.SparkContext
-import org.slf4j.LoggerFactory
-import py4j.GatewayServer
-
-import scala.concurrent.Future
-
-/**
- * Represents the service that provides the high-level interface between the
- * JVM and Python.
- *
- * @param gatewayServer The backend to start to communicate between the JVM and
- * Python
- * @param pySparkBridge The bridge to use for communication between the JVM and
- * Python
- * @param pySparkProcessHandler The handler used for events that occur with
- * the PySpark process
- */
-class PySparkService(
- private val gatewayServer: GatewayServer,
- private val pySparkBridge: PySparkBridge,
- private val pySparkProcessHandler: PySparkProcessHandler
-) extends BrokerService {
- private val logger = LoggerFactory.getLogger(this.getClass)
- @volatile private var _isRunning: Boolean = false
- override def isRunning: Boolean = _isRunning
-
-
- /** Represents the process used to execute Python code via the bridge. */
- private lazy val pySparkProcess = {
- val p = new PySparkProcess(
- pySparkBridge,
- pySparkProcessHandler,
- gatewayServer.getListeningPort,
- org.apache.spark.SPARK_VERSION
- )
-
- // Update handlers to correctly reset and restart the process
- pySparkProcessHandler.setResetMethod(message => {
- p.stop()
- pySparkBridge.state.reset(message)
- })
- pySparkProcessHandler.setRestartMethod(() => p.start())
-
- p
- }
-
- /** Starts the PySpark service. */
- def start(): Unit = {
- // Start without forking the gateway server (needs to have access to
- // SparkContext in current JVM)
- logger.debug("Starting gateway server")
- gatewayServer.start()
-
- val port = gatewayServer.getListeningPort
- logger.debug(s"Gateway server running on port $port")
-
- // Start the Python process used to execute code
- logger.debug("Launching process to execute Python code")
- pySparkProcess.start()
-
- _isRunning = true
- }
-
- /**
- * Submits code to the PySpark service to be executed and return a result.
- *
- * @param code The code to execute
- *
- * @return The result as a future to eventually return
- */
- def submitCode(code: Code): Future[CodeResults] = {
- pySparkBridge.state.pushCode(code)
- }
-
- /** Stops the running PySpark service. */
- def stop(): Unit = {
- // Stop the Python process used to execute code
- pySparkProcess.stop()
-
- // Stop the server used as an entrypoint for Python
- gatewayServer.shutdown()
-
- _isRunning = false
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkState.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkState.scala b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkState.scala
deleted file mode 100644
index 2d0f63f..0000000
--- a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkState.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.pyspark
-
-import com.ibm.spark.interpreter.broker.BrokerState
-
-/**
- * Represents the state structure of PySpark.
- *
- * @param maxQueuedCode The maximum amount of code to support being queued
- * at the same time for PySpark execution
- */
-class PySparkState(private val maxQueuedCode: Int)
- extends BrokerState(maxQueuedCode)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkTransformer.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkTransformer.scala b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkTransformer.scala
deleted file mode 100644
index 146ca8e..0000000
--- a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkTransformer.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.pyspark
-
-import com.ibm.spark.interpreter.broker.BrokerTransformer
-
-/**
- * Represents a utility that can transform raw PySpark information to
- * kernel information.
- */
-class PySparkTransformer extends BrokerTransformer
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkTypes.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkTypes.scala b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkTypes.scala
deleted file mode 100644
index 1004a1f..0000000
--- a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkTypes.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.pyspark
-
-import com.ibm.spark.interpreter.broker.BrokerTypesProvider
-
-/**
- * Represents all types associated with the PySpark interface.
- */
-object PySparkTypes extends BrokerTypesProvider
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/package.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/package.scala b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/package.scala
deleted file mode 100644
index 618a678..0000000
--- a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/package.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter
-
-import com.ibm.spark.interpreter.broker.{BrokerCode, BrokerPromise}
-
-/**
- * Contains aliases to broker types.
- */
-package object pyspark {
- /**
- * Represents a promise made regarding the completion of PySpark code
- * execution.
- */
- type PySparkPromise = BrokerPromise
-
- /**
- * Represents a block of PyPython code to be evaluated.
- */
- type PySparkCode = BrokerCode
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/com/ibm/spark/magic/builtin/PySpark.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/com/ibm/spark/magic/builtin/PySpark.scala b/pyspark-interpreter/src/main/scala/com/ibm/spark/magic/builtin/PySpark.scala
deleted file mode 100644
index a0a79b5..0000000
--- a/pyspark-interpreter/src/main/scala/com/ibm/spark/magic/builtin/PySpark.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.magic.builtin
-
-import com.ibm.spark.interpreter.{ExecuteError, ExecuteAborted}
-import com.ibm.spark.kernel.interpreter.pyspark.{PySparkInterpreter, PySparkException}
-import com.ibm.spark.kernel.protocol.v5.MIMEType
-import com.ibm.spark.magic.{CellMagicOutput, CellMagic}
-import com.ibm.spark.magic.dependencies.IncludeKernel
-
-/**
- * Represents the magic interface to use the PySpark interpreter.
- */
-class PySpark extends CellMagic with IncludeKernel {
- override def execute(code: String): CellMagicOutput = {
- val pySpark = kernel.interpreter("PySpark")
-
- if (pySpark.isEmpty || pySpark.get == null)
- throw new PySparkException("PySpark is not available!")
-
- pySpark.get match {
- case pySparkInterpreter: PySparkInterpreter =>
- val (_, output) = pySparkInterpreter.interpret(code)
- output match {
- case Left(executeOutput) =>
- CellMagicOutput(MIMEType.PlainText -> executeOutput)
- case Right(executeFailure) => executeFailure match {
- case executeAborted: ExecuteAborted =>
- throw new PySparkException("PySpark code was aborted!")
- case executeError: ExecuteError =>
- throw new PySparkException(executeError.value)
- }
- }
- case otherInterpreter =>
- val className = otherInterpreter.getClass.getName
- throw new PySparkException(s"Invalid PySpark interpreter: $className")
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkBridge.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkBridge.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkBridge.scala
new file mode 100644
index 0000000..e0e8f60
--- /dev/null
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkBridge.scala
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.pyspark
+
+import com.ibm.spark.interpreter.broker.producer.{StandardSQLContextProducer, StandardJavaSparkContextProducer, SQLContextProducerLike, JavaSparkContextProducerLike}
+import com.ibm.spark.interpreter.broker.{BrokerState, BrokerBridge}
+import com.ibm.spark.kernel.api.KernelLike
+import org.apache.spark.SparkContext
+
+/**
+ * Represents constants for the PySpark bridge.
+ */
+object PySparkBridge {
+ /** Represents the maximum amount of code that can be queued for Python. */
+ val MaxQueuedCode = 500
+
+ /**
+ * Creates a new PySparkBridge instance.
+ *
+ * @param brokerState The container of broker state to expose
+ * @param kernel The kernel API to expose through the bridge
+ *
+ * @return The new PySpark bridge
+ */
+ def apply(
+ brokerState: BrokerState,
+ kernel: KernelLike
+ ): PySparkBridge = {
+ new PySparkBridge(
+ _brokerState = brokerState,
+ _kernel = kernel
+ ) with StandardJavaSparkContextProducer with StandardSQLContextProducer
+ }
+}
+
+/**
+ * Represents the API available to PySpark to act as the bridge for data
+ * between the JVM and Python.
+ *
+ * @param _brokerState The container of broker state to expose
+ * @param _kernel The kernel API to expose through the bridge
+ */
+class PySparkBridge private (
+ private val _brokerState: BrokerState,
+ private val _kernel: KernelLike
+) extends BrokerBridge(_brokerState, _kernel) {
+ override val brokerName: String = "PySpark"
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkException.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkException.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkException.scala
new file mode 100644
index 0000000..664c806
--- /dev/null
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkException.scala
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.pyspark
+
+import com.ibm.spark.interpreter.broker.BrokerException
+
+/**
+ * Represents a generic PySpark exception.
+ *
+ * @param message The message to associate with the exception
+ */
+class PySparkException(message: String) extends BrokerException(message)
+
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
new file mode 100644
index 0000000..615ed19
--- /dev/null
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.pyspark
+
+import java.net.URL
+
+import com.ibm.spark.interpreter.Results.Result
+import com.ibm.spark.interpreter._
+import com.ibm.spark.kernel.api.KernelLike
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.slf4j.LoggerFactory
+import py4j.GatewayServer
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.tools.nsc.interpreter.{InputStream, OutputStream}
+
+/**
+ * Represents an interpreter interface to PySpark. Requires a properly-set
+ * SPARK_HOME, PYTHONPATH pointing to Spark's Python source, and py4j installed
+ * where it is accessible to the Spark Kernel.
+ *
+ */
+class PySparkInterpreter(
+) extends Interpreter {
+ private val logger = LoggerFactory.getLogger(this.getClass)
+ private var _kernel:KernelLike = _
+
+ // TODO: Replace hard-coded maximum queue count
+ /** Represents the state used by this interpreter's Python instance. */
+ private lazy val pySparkState = new PySparkState(500)
+
+ /** Represents the bridge used by this interpreter's Python interface. */
+ private lazy val pySparkBridge = PySparkBridge(
+ pySparkState,
+ _kernel
+ )
+
+
+ /** Represents the interface for Python to talk to JVM Spark components. */
+ private lazy val gatewayServer = new GatewayServer(pySparkBridge, 0)
+
+ /** Represents the process handler used for the PySpark process. */
+ private lazy val pySparkProcessHandler: PySparkProcessHandler =
+ new PySparkProcessHandler(
+ pySparkBridge,
+ restartOnFailure = true,
+ restartOnCompletion = true
+ )
+
+ private lazy val pySparkService = new PySparkService(
+ gatewayServer,
+ pySparkBridge,
+ pySparkProcessHandler
+ )
+ private lazy val pySparkTransformer = new PySparkTransformer
+
+ /**
+ * Initializes the interpreter.
+ * @param kernel The kernel
+ * @return The newly initialized interpreter
+ */
+ override def init(kernel: KernelLike): Interpreter = {
+ _kernel = kernel
+ this
+ }
+
+ // Unsupported (but can be invoked)
+ override def bindSparkContext(sparkContext: SparkContext): Unit = {}
+
+ // Unsupported (but can be invoked)
+ override def bindSqlContext(sqlContext: SQLContext): Unit = {}
+
+ /**
+ * Executes the provided code with the option to silence output.
+ * @param code The code to execute
+ * @param silent Whether or not to execute the code silently (no output)
+ * @return The success/failure of the interpretation and the output from the
+ * execution or the failure
+ */
+ override def interpret(code: String, silent: Boolean):
+ (Result, Either[ExecuteOutput, ExecuteFailure]) =
+ {
+ if (!pySparkService.isRunning) pySparkService.start()
+
+ val futureResult = pySparkTransformer.transformToInterpreterResult(
+ pySparkService.submitCode(code)
+ )
+
+ Await.result(futureResult, Duration.Inf)
+ }
+
+ /**
+ * Starts the interpreter, initializing any internal state.
+ * @return A reference to the interpreter
+ */
+ override def start(): Interpreter = {
+ pySparkService.start()
+
+ this
+ }
+
+ /**
+ * Stops the interpreter, removing any previous internal state.
+ * @return A reference to the interpreter
+ */
+ override def stop(): Interpreter = {
+ pySparkService.stop()
+
+ this
+ }
+
+ /**
+ * Returns the class loader used by this interpreter.
+ *
+ * @return The runtime class loader used by this interpreter
+ */
+ override def classLoader: ClassLoader = this.getClass.getClassLoader
+
+ // Unsupported (but can be invoked)
+ override def lastExecutionVariableName: Option[String] = None
+
+ // Unsupported (but can be invoked)
+ override def read(variableName: String): Option[AnyRef] = None
+
+ // Unsupported (but can be invoked)
+ override def completion(code: String, pos: Int): (Int, List[String]) =
+ (pos, Nil)
+
+ // Unsupported
+ override def updatePrintStreams(in: InputStream, out: OutputStream, err: OutputStream): Unit = ???
+
+ // Unsupported
+ override def classServerURI: String = ""
+
+ // Unsupported
+ override def interrupt(): Interpreter = ???
+
+ // Unsupported
+ override def bind(variableName: String, typeName: String, value: Any, modifiers: List[String]): Unit = ???
+
+ // Unsupported
+ override def addJars(jars: URL*): Unit = ???
+
+ // Unsupported
+ override def doQuietly[T](body: => T): T = ???
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkProcess.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkProcess.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkProcess.scala
new file mode 100644
index 0000000..ace6635
--- /dev/null
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkProcess.scala
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.pyspark
+
+import java.io.{FileOutputStream, File}
+
+import com.ibm.spark.interpreter.broker.BrokerProcess
+import org.apache.commons.exec.environment.EnvironmentUtils
+import org.apache.commons.exec._
+import org.apache.commons.io.IOUtils
+import org.apache.spark.SparkContext
+import org.slf4j.LoggerFactory
+
+/**
+ * Represents the Python process used to evaluate PySpark code.
+ *
+ * @param pySparkBridge The bridge to use to retrieve kernel output streams
+ * and the Spark version to be verified
+ * @param pySparkProcessHandler The handler to use when the process fails or
+ * completes
+ * @param port The port to provide to the PySpark process to use to connect
+ * back to the JVM
+ * @param sparkVersion The version of Spark that the process will be using
+ */
+class PySparkProcess(
+ private val pySparkBridge: PySparkBridge,
+ private val pySparkProcessHandler: PySparkProcessHandler,
+ private val port: Int,
+ private val sparkVersion: String
+) extends BrokerProcess(
+ processName = "python",
+ entryResource = "PySpark/pyspark_runner.py",
+ otherResources = Nil,
+ brokerBridge = pySparkBridge,
+ brokerProcessHandler = pySparkProcessHandler,
+ arguments = Seq(port.toString, sparkVersion)
+) {
+
+ override val brokerName: String = "PySpark"
+ private val logger = LoggerFactory.getLogger(this.getClass)
+
+ private val sparkHome = Option(System.getenv("SPARK_HOME"))
+ .orElse(Option(System.getProperty("spark.home")))
+ private val pythonPath = Option(System.getenv("PYTHONPATH"))
+
+ assert(sparkHome.nonEmpty, "PySpark process requires Spark Home to be set!")
+ if (pythonPath.isEmpty) logger.warn("PYTHONPATH not provided for PySpark!")
+
+ /**
+ * Creates a new process environment to be used for environment variable
+ * retrieval by the new process.
+ *
+ * @return The map of environment variables and their respective values
+ */
+ override protected def newProcessEnvironment(): Map[String, String] = {
+ val baseEnvironment = super.newProcessEnvironment()
+
+ import java.io.File.pathSeparator
+
+ val baseSparkHome = sparkHome.get
+ val basePythonPath = pythonPath.getOrElse("")
+ val updatedPythonPath =
+ (basePythonPath.split(pathSeparator) :+ s"$baseSparkHome/python/")
+ .map(_.trim)
+ .filter(_.nonEmpty)
+ .map(new File(_))
+ .distinct
+ .mkString(pathSeparator)
+
+ // Note: Adding the new map values should override the old ones
+ baseEnvironment ++ Map(
+ "SPARK_HOME" -> baseSparkHome,
+ "PYTHONPATH" -> updatedPythonPath
+ )
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkProcessHandler.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkProcessHandler.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkProcessHandler.scala
new file mode 100644
index 0000000..ab2aeb1
--- /dev/null
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkProcessHandler.scala
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.pyspark
+
+import com.ibm.spark.interpreter.broker.BrokerProcessHandler
+
+/**
+ * Represents the handler for events triggered by the PySpark process.
+ *
+ * @param pySparkBridge The bridge to reset when the process fails or completes
+ * @param restartOnFailure If true, restarts the process if it fails
+ * @param restartOnCompletion If true, restarts the process if it completes
+ */
+class PySparkProcessHandler(
+ private val pySparkBridge: PySparkBridge,
+ private val restartOnFailure: Boolean,
+ private val restartOnCompletion: Boolean
+ ) extends BrokerProcessHandler(
+ pySparkBridge,
+ restartOnFailure,
+ restartOnCompletion
+) {
+ override val brokerName: String = "PySpark"
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkService.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkService.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkService.scala
new file mode 100644
index 0000000..ec264e2
--- /dev/null
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkService.scala
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.pyspark
+
+import com.ibm.spark.interpreter.broker.BrokerService
+import com.ibm.spark.kernel.interpreter.pyspark.PySparkTypes._
+import org.apache.spark.SparkContext
+import org.slf4j.LoggerFactory
+import py4j.GatewayServer
+
+import scala.concurrent.Future
+
+/**
+ * Represents the service that provides the high-level interface between the
+ * JVM and Python.
+ *
+ * @param gatewayServer The backend to start to communicate between the JVM and
+ * Python
+ * @param pySparkBridge The bridge to use for communication between the JVM and
+ * Python
+ * @param pySparkProcessHandler The handler used for events that occur with
+ * the PySpark process
+ */
+class PySparkService(
+ private val gatewayServer: GatewayServer,
+ private val pySparkBridge: PySparkBridge,
+ private val pySparkProcessHandler: PySparkProcessHandler
+) extends BrokerService {
+ private val logger = LoggerFactory.getLogger(this.getClass)
+ @volatile private var _isRunning: Boolean = false
+ override def isRunning: Boolean = _isRunning
+
+
+ /** Represents the process used to execute Python code via the bridge. */
+ private lazy val pySparkProcess = {
+ val p = new PySparkProcess(
+ pySparkBridge,
+ pySparkProcessHandler,
+ gatewayServer.getListeningPort,
+ org.apache.spark.SPARK_VERSION
+ )
+
+ // Update handlers to correctly reset and restart the process
+ pySparkProcessHandler.setResetMethod(message => {
+ p.stop()
+ pySparkBridge.state.reset(message)
+ })
+ pySparkProcessHandler.setRestartMethod(() => p.start())
+
+ p
+ }
+
+ /** Starts the PySpark service. */
+ def start(): Unit = {
+ // Start without forking the gateway server (needs to have access to
+ // SparkContext in current JVM)
+ logger.debug("Starting gateway server")
+ gatewayServer.start()
+
+ val port = gatewayServer.getListeningPort
+ logger.debug(s"Gateway server running on port $port")
+
+ // Start the Python process used to execute code
+ logger.debug("Launching process to execute Python code")
+ pySparkProcess.start()
+
+ _isRunning = true
+ }
+
+ /**
+ * Submits code to the PySpark service to be executed and return a result.
+ *
+ * @param code The code to execute
+ *
+ * @return The result as a future to eventually return
+ */
+ def submitCode(code: Code): Future[CodeResults] = {
+ pySparkBridge.state.pushCode(code)
+ }
+
+ /** Stops the running PySpark service. */
+ def stop(): Unit = {
+ // Stop the Python process used to execute code
+ pySparkProcess.stop()
+
+ // Stop the server used as an entrypoint for Python
+ gatewayServer.shutdown()
+
+ _isRunning = false
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkState.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkState.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkState.scala
new file mode 100644
index 0000000..2d0f63f
--- /dev/null
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkState.scala
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.pyspark
+
+import com.ibm.spark.interpreter.broker.BrokerState
+
+/**
+ * Represents the state structure of PySpark.
+ *
+ * @param maxQueuedCode The maximum amount of code to support being queued
+ * at the same time for PySpark execution
+ */
+class PySparkState(private val maxQueuedCode: Int)
+ extends BrokerState(maxQueuedCode)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkTransformer.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkTransformer.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkTransformer.scala
new file mode 100644
index 0000000..146ca8e
--- /dev/null
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkTransformer.scala
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.pyspark
+
+import com.ibm.spark.interpreter.broker.BrokerTransformer
+
+/**
+ * Represents a utility that can transform raw PySpark information to
+ * kernel information.
+ */
+class PySparkTransformer extends BrokerTransformer
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkTypes.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkTypes.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkTypes.scala
new file mode 100644
index 0000000..1004a1f
--- /dev/null
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkTypes.scala
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter.pyspark
+
+import com.ibm.spark.interpreter.broker.BrokerTypesProvider
+
+/**
+ * Represents all types associated with the PySpark interface.
+ */
+object PySparkTypes extends BrokerTypesProvider
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/package.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/package.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/package.scala
new file mode 100644
index 0000000..618a678
--- /dev/null
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/package.scala
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.kernel.interpreter
+
+import com.ibm.spark.interpreter.broker.{BrokerCode, BrokerPromise}
+
+/**
+ * Contains aliases to broker types.
+ */
+package object pyspark {
+ /**
+ * Represents a promise made regarding the completion of PySpark code
+ * execution.
+ */
+ type PySparkPromise = BrokerPromise
+
+ /**
+ * Represents a block of PyPython code to be evaluated.
+ */
+ type PySparkCode = BrokerCode
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/pyspark-interpreter/src/main/scala/org/apache/toree/magic/builtin/PySpark.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/magic/builtin/PySpark.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/magic/builtin/PySpark.scala
new file mode 100644
index 0000000..a0a79b5
--- /dev/null
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/magic/builtin/PySpark.scala
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.spark.magic.builtin
+
+import com.ibm.spark.interpreter.{ExecuteError, ExecuteAborted}
+import com.ibm.spark.kernel.interpreter.pyspark.{PySparkInterpreter, PySparkException}
+import com.ibm.spark.kernel.protocol.v5.MIMEType
+import com.ibm.spark.magic.{CellMagicOutput, CellMagic}
+import com.ibm.spark.magic.dependencies.IncludeKernel
+
+/**
+ * Represents the magic interface to use the PySpark interpreter.
+ */
+class PySpark extends CellMagic with IncludeKernel {
+ override def execute(code: String): CellMagicOutput = {
+ val pySpark = kernel.interpreter("PySpark")
+
+ if (pySpark.isEmpty || pySpark.get == null)
+ throw new PySparkException("PySpark is not available!")
+
+ pySpark.get match {
+ case pySparkInterpreter: PySparkInterpreter =>
+ val (_, output) = pySparkInterpreter.interpret(code)
+ output match {
+ case Left(executeOutput) =>
+ CellMagicOutput(MIMEType.PlainText -> executeOutput)
+ case Right(executeFailure) => executeFailure match {
+ case executeAborted: ExecuteAborted =>
+ throw new PySparkException("PySpark code was aborted!")
+ case executeError: ExecuteError =>
+ throw new PySparkException(executeError.value)
+ }
+ }
+ case otherInterpreter =>
+ val className = otherInterpreter.getClass.getName
+ throw new PySparkException(s"Invalid PySpark interpreter: $className")
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaException.scala
----------------------------------------------------------------------
diff --git a/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaException.scala b/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaException.scala
deleted file mode 100644
index 978cf90..0000000
--- a/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaException.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.kernel.interpreter.scala
-
-/**
- * Represents a generic Scala exception.
- *
- * @param message The message to associate with the exception
- */
-class ScalaException(message: String) extends Throwable(message)