You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by ma...@apache.org on 2016/11/07 14:29:37 UTC
[14/15] incubator-toree git commit: Added proper version grabbing for
python and R using subprocess calls.
Added proper version grabbing for python and R using subprocess calls.
Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/15327b58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/15327b58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/15327b58
Branch: refs/heads/master
Commit: 15327b589fcd2fcd64c9689a6e4b491e72b643cc
Parents: 9ccf178
Author: Marius van Niekerk <ma...@gmail.com>
Authored: Wed Nov 2 10:21:15 2016 -0400
Committer: Marius van Niekerk <ma...@gmail.com>
Committed: Wed Nov 2 10:21:15 2016 -0400
----------------------------------------------------------------------
.../toree/interpreter/broker/BrokerState.scala | 9 +-----
.../interpreter/broker/BrokerStateSpec.scala | 6 ++--
.../main/resources/PySpark/pyspark_runner.py | 8 ++++--
.../pyspark/PySparkInterpreter.scala | 29 ++++++++++----------
.../interpreter/sparkr/SparkRInterpreter.scala | 14 +++++++++-
.../interpreter/sparkr/SparkRProcess.scala | 4 ++-
.../interpreter/sparkr/SparkRService.scala | 3 ++
7 files changed, 44 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/15327b58/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala
index 43ee65c..3d1e3ab 100644
--- a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala
+++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala
@@ -38,7 +38,6 @@ class BrokerState(private val maxQueuedCode: Int) {
import scala.collection.JavaConverters._
@volatile private var _isReady: Boolean = false
- @volatile private var _version: String = _
protected val codeQueue: java.util.Queue[BrokerCode] =
new java.util.concurrent.ConcurrentLinkedQueue[BrokerCode]()
@@ -128,17 +127,11 @@ class BrokerState(private val maxQueuedCode: Int) {
* @param version The language version used by the broker service
*/
- def markReady(version: String): Unit = {
+ def markReady(): Unit = {
_isReady = true
- _version = version
}
/**
- * Retrieve the runtime language version used by the broker service
- */
- def getVersion(): String = _version
-
- /**
* Marks the specified code as successfully completed using its id.
*
* @param codeId The id of the code to mark as a success
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/15327b58/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerStateSpec.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerStateSpec.scala b/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerStateSpec.scala
index 3617816..43374f8 100644
--- a/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerStateSpec.scala
+++ b/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerStateSpec.scala
@@ -78,7 +78,7 @@ class BrokerStateSpec extends FunSpec with Matchers with OneInstancePerTest {
describe("#isReady") {
it("should return true if the broker state is marked as ready") {
- brokerState.markReady("1.0.0")
+ brokerState.markReady()
brokerState.isReady should be (true)
}
@@ -90,11 +90,11 @@ class BrokerStateSpec extends FunSpec with Matchers with OneInstancePerTest {
describe("#markReady") {
it("should mark the state of the broker as ready") {
// Mark once to make sure that the state gets set
- brokerState.markReady("1.0.0")
+ brokerState.markReady()
brokerState.isReady should be (true)
// Mark a second time to ensure that the state does not change
- brokerState.markReady("1.0.0")
+ brokerState.markReady()
brokerState.isReady should be (true)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/15327b58/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py b/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
index f73805f..e3864f8 100644
--- a/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
+++ b/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
@@ -15,7 +15,11 @@
# limitations under the License.
#
-import sys, getopt, traceback, re, ast, platform
+import sys
+import getopt
+import traceback
+import re
+import ast
print("PYTHON::: Starting imports")
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
@@ -53,7 +57,7 @@ java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
bridge = gateway.entry_point
state = bridge.state()
-state.markReady(platform.python_version())
+state.markReady()
if sparkVersion.startswith("1.2"):
java_import(gateway.jvm, "org.apache.spark.sql.SparkSession")
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/15327b58/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
index a76e1d0..d408217 100644
--- a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
@@ -41,6 +41,7 @@ class PySparkInterpreter(
private val WAIT_DURATION: Long = java.util.concurrent.TimeUnit.SECONDS.toMillis(50)
private val PythonExecEnv = "PYTHON_EXEC"
+ private lazy val pythonExecutable = Option(System.getenv(PythonExecEnv)).getOrElse("python")
private val logger = LoggerFactory.getLogger(this.getClass)
private var _kernel:KernelLike = _
@@ -67,7 +68,7 @@ class PySparkInterpreter(
)
private lazy val pySparkService = new PySparkService(
- Option(System.getenv(PythonExecEnv)).getOrElse("python"),
+ pythonExecutable,
gatewayServer,
pySparkBridge,
pySparkProcessHandler
@@ -155,18 +156,18 @@ class PySparkInterpreter(
override def doQuietly[T](body: => T): T = ???
override def languageInfo: LanguageInfo = {
- if ((!pySparkService.isRunning) || (!pySparkState.isReady)) {
- LanguageInfo(
- "python",
- version = "UNKNOWN",
- fileExtension = Some(".py"),
- pygmentsLexer = Some("python"))
- } else {
- LanguageInfo(
- "python",
- pySparkState.getVersion(),
- fileExtension = Some(".py"),
- pygmentsLexer = Some("python"))
- }
+ import scala.sys.process._
+
+ // Issue a subprocess call to grab the python version. This is better than polling a child process.
+ val version = Seq(
+ pythonExecutable,
+ "-c",
+ "import sys; print('{s.major}.{s.minor}.{s.micro}'.format(s=sys.version_info))").!!
+
+ LanguageInfo(
+ "python",
+ version = version,
+ fileExtension = Some(".py"),
+ pygmentsLexer = Some("python"))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/15327b58/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
index 975dee1..54bf14a 100644
--- a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
@@ -38,6 +38,7 @@ class SparkRInterpreter(
) extends Interpreter {
private val logger = LoggerFactory.getLogger(this.getClass)
private var _kernel: KernelLike = _
+ private val rScriptExecutable = "Rscript"
// TODO: Replace hard-coded maximum queue count
/** Represents the state used by this interpreter's R instance. */
@@ -61,6 +62,7 @@ class SparkRInterpreter(
)
private lazy val sparkRService = new SparkRService(
+ rScriptExecutable,
rBackend,
sparkRBridge,
sparkRProcessHandler
@@ -139,6 +141,16 @@ class SparkRInterpreter(
// Unsupported
override def doQuietly[T](body: => T): T = ???
- override def languageInfo = LanguageInfo("R", "Unknown", fileExtension = Some(".R"), pygmentsLexer = Some("r"))
+ override def languageInfo = {
+ import sys.process._
+
+ // Issue a subprocess call to grab the R version. This is better than polling a child process.
+ val version = Seq(
+ rScriptExecutable,
+ "-e",
+ "cat(R.version$major, '.', R.version$minor, sep='', fill=TRUE)").!!
+
+ LanguageInfo("R", version = version, fileExtension = Some(".R"), pygmentsLexer = Some("r"))
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/15327b58/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala
index 0fa453f..d1c145a 100644
--- a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
/**
* Represents the R process used to evaluate SparkR code.
*
+ * @param processName The name of the Rscript process to run.
* @param sparkRBridge The bridge to use to retrieve kernel output streams
* and the Spark version to be verified
* @param sparkRProcessHandler The handler to use when the process fails or
@@ -30,11 +31,12 @@ import scala.collection.JavaConverters._
* back to the JVM
*/
class SparkRProcess(
+ processName: String,
private val sparkRBridge: SparkRBridge,
private val sparkRProcessHandler: SparkRProcessHandler,
private val port: Int
) extends BrokerProcess(
- processName = "Rscript",
+ processName = processName,
entryResource = "kernelR/sparkr_runner.R",
otherResources = Seq("kernelR/sparkr_runner_utils.R"),
brokerBridge = sparkRBridge,
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/15327b58/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala
index f373ab2..350aee0 100644
--- a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala
@@ -29,12 +29,14 @@ import scala.tools.nsc.interpreter._
* Represents the service that provides the high-level interface between the
* JVM and R.
*
+ * @param processName The name of the Rscript process to run.
* @param rBackend The backend to start to communicate between the JVM and R
* @param sparkRBridge The bridge to use for communication between the JVM and R
* @param sparkRProcessHandler The handler used for events that occur with the
* SparkR process
*/
class SparkRService(
+ processName: String,
private val rBackend: ReflectiveRBackend,
private val sparkRBridge: SparkRBridge,
private val sparkRProcessHandler: SparkRProcessHandler
@@ -47,6 +49,7 @@ class SparkRService(
/** Represents the process used to execute R code via the bridge. */
private lazy val sparkRProcess: SparkRProcess = {
val p = new SparkRProcess(
+ processName,
sparkRBridge,
sparkRProcessHandler,
rBackendPort