You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by ma...@apache.org on 2016/11/07 14:29:37 UTC

[14/15] incubator-toree git commit: Added proper version grabbing for python and R using subprocess calls.

Added proper version grabbing for python and R using subprocess calls.


Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/15327b58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/15327b58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/15327b58

Branch: refs/heads/master
Commit: 15327b589fcd2fcd64c9689a6e4b491e72b643cc
Parents: 9ccf178
Author: Marius van Niekerk <ma...@gmail.com>
Authored: Wed Nov 2 10:21:15 2016 -0400
Committer: Marius van Niekerk <ma...@gmail.com>
Committed: Wed Nov 2 10:21:15 2016 -0400

----------------------------------------------------------------------
 .../toree/interpreter/broker/BrokerState.scala  |  9 +-----
 .../interpreter/broker/BrokerStateSpec.scala    |  6 ++--
 .../main/resources/PySpark/pyspark_runner.py    |  8 ++++--
 .../pyspark/PySparkInterpreter.scala            | 29 ++++++++++----------
 .../interpreter/sparkr/SparkRInterpreter.scala  | 14 +++++++++-
 .../interpreter/sparkr/SparkRProcess.scala      |  4 ++-
 .../interpreter/sparkr/SparkRService.scala      |  3 ++
 7 files changed, 44 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/15327b58/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala
index 43ee65c..3d1e3ab 100644
--- a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala
+++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala
@@ -38,7 +38,6 @@ class BrokerState(private val maxQueuedCode: Int) {
   import scala.collection.JavaConverters._
 
   @volatile private var _isReady: Boolean = false
-  @volatile private var _version: String = _
 
   protected val codeQueue: java.util.Queue[BrokerCode] =
     new java.util.concurrent.ConcurrentLinkedQueue[BrokerCode]()
@@ -128,17 +127,11 @@ class BrokerState(private val maxQueuedCode: Int) {
 
    * @param version The language version used by the broker service
    */
-  def markReady(version: String): Unit = {
+  def markReady(): Unit = {
     _isReady = true
-    _version = version
   }
 
   /**
-  * Retrieve the runtime language version used by the broker service
-  */
-  def getVersion(): String = _version
-
-  /**
    * Marks the specified code as successfully completed using its id.
    *
    * @param codeId The id of the code to mark as a success

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/15327b58/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerStateSpec.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerStateSpec.scala b/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerStateSpec.scala
index 3617816..43374f8 100644
--- a/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerStateSpec.scala
+++ b/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerStateSpec.scala
@@ -78,7 +78,7 @@ class BrokerStateSpec extends FunSpec with Matchers with OneInstancePerTest {
 
     describe("#isReady") {
       it("should return true if the broker state is marked as ready") {
-        brokerState.markReady("1.0.0")
+        brokerState.markReady()
         brokerState.isReady should be (true)
       }
 
@@ -90,11 +90,11 @@ class BrokerStateSpec extends FunSpec with Matchers with OneInstancePerTest {
     describe("#markReady") {
       it("should mark the state of the broker as ready") {
         // Mark once to make sure that the state gets set
-        brokerState.markReady("1.0.0")
+        brokerState.markReady()
         brokerState.isReady should be (true)
 
         // Mark a second time to ensure that the state does not change
-        brokerState.markReady("1.0.0")
+        brokerState.markReady()
         brokerState.isReady should be (true)
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/15327b58/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py b/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
index f73805f..e3864f8 100644
--- a/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
+++ b/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
@@ -15,7 +15,11 @@
 # limitations under the License.
 #
 
-import sys, getopt, traceback, re, ast, platform
+import sys
+import getopt
+import traceback
+import re
+import ast
 
 print("PYTHON::: Starting imports")
 from py4j.java_gateway import java_import, JavaGateway, GatewayClient
@@ -53,7 +57,7 @@ java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
 
 bridge = gateway.entry_point
 state = bridge.state()
-state.markReady(platform.python_version())
+state.markReady()
 
 if sparkVersion.startswith("1.2"):
     java_import(gateway.jvm, "org.apache.spark.sql.SparkSession")

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/15327b58/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
index a76e1d0..d408217 100644
--- a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
@@ -41,6 +41,7 @@ class PySparkInterpreter(
   private val WAIT_DURATION: Long = java.util.concurrent.TimeUnit.SECONDS.toMillis(50)
 
   private val PythonExecEnv = "PYTHON_EXEC"
+  private lazy val pythonExecutable = Option(System.getenv(PythonExecEnv)).getOrElse("python")
   private val logger = LoggerFactory.getLogger(this.getClass)
   private var _kernel:KernelLike = _
 
@@ -67,7 +68,7 @@ class PySparkInterpreter(
     )
 
   private lazy val pySparkService = new PySparkService(
-    Option(System.getenv(PythonExecEnv)).getOrElse("python"),
+    pythonExecutable,
     gatewayServer,
     pySparkBridge,
     pySparkProcessHandler
@@ -155,18 +156,18 @@ class PySparkInterpreter(
   override def doQuietly[T](body: => T): T = ???
 
   override def languageInfo: LanguageInfo = {
-    if ((!pySparkService.isRunning) || (!pySparkState.isReady)) {
-      LanguageInfo(
-        "python",
-        version = "UNKNOWN",
-        fileExtension = Some(".py"),
-        pygmentsLexer = Some("python"))
-    } else {
-      LanguageInfo(
-        "python",
-        pySparkState.getVersion(),
-        fileExtension = Some(".py"),
-        pygmentsLexer = Some("python"))
-    }
+    import scala.sys.process._
+
+    // Issue a subprocess call to grab the python version.  This is better than polling a child process.
+    val version = Seq(
+      pythonExecutable,
+      "-c",
+      "import sys; print('{s.major}.{s.minor}.{s.micro}'.format(s=sys.version_info))").!!
+
+    LanguageInfo(
+      "python",
+      version = version,
+      fileExtension = Some(".py"),
+      pygmentsLexer = Some("python"))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/15327b58/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
index 975dee1..54bf14a 100644
--- a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
@@ -38,6 +38,7 @@ class SparkRInterpreter(
 ) extends Interpreter {
   private val logger = LoggerFactory.getLogger(this.getClass)
   private var _kernel: KernelLike = _
+  private val rScriptExecutable = "Rscript"
 
   // TODO: Replace hard-coded maximum queue count
   /** Represents the state used by this interpreter's R instance. */
@@ -61,6 +62,7 @@ class SparkRInterpreter(
     )
 
   private lazy val sparkRService = new SparkRService(
+    rScriptExecutable,
     rBackend,
     sparkRBridge,
     sparkRProcessHandler
@@ -139,6 +141,16 @@ class SparkRInterpreter(
   // Unsupported
   override def doQuietly[T](body: => T): T = ???
 
-  override def languageInfo = LanguageInfo("R", "Unknown", fileExtension = Some(".R"), pygmentsLexer = Some("r"))
+  override def languageInfo = {
+    import sys.process._
+
+    // Issue a subprocess call to grab the R version.  This is better than polling a child process.
+    val version = Seq(
+      rScriptExecutable,
+      "-e",
+      "cat(R.version$major, '.', R.version$minor, sep='', fill=TRUE)").!!
+
+    LanguageInfo("R", version = version, fileExtension = Some(".R"), pygmentsLexer = Some("r"))
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/15327b58/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala
index 0fa453f..d1c145a 100644
--- a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
 /**
  * Represents the R process used to evaluate SparkR code.
  *
+ * @param processName The name of the Rscript process to run.
  * @param sparkRBridge The bridge to use to retrieve kernel output streams
  *                      and the Spark version to be verified
  * @param sparkRProcessHandler The handler to use when the process fails or
@@ -30,11 +31,12 @@ import scala.collection.JavaConverters._
  *             back to the JVM
  */
 class SparkRProcess(
+  processName: String,
   private val sparkRBridge: SparkRBridge,
   private val sparkRProcessHandler: SparkRProcessHandler,
   private val port: Int
 ) extends BrokerProcess(
-  processName = "Rscript",
+  processName = processName,
   entryResource = "kernelR/sparkr_runner.R",
   otherResources = Seq("kernelR/sparkr_runner_utils.R"),
   brokerBridge = sparkRBridge,

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/15327b58/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala
index f373ab2..350aee0 100644
--- a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala
@@ -29,12 +29,14 @@ import scala.tools.nsc.interpreter._
  * Represents the service that provides the high-level interface between the
  * JVM and R.
  *
+ * @param processName The name of the Rscript process to run.
  * @param rBackend The backend to start to communicate between the JVM and R
  * @param sparkRBridge The bridge to use for communication between the JVM and R
  * @param sparkRProcessHandler The handler used for events that occur with the
  *                             SparkR process
  */
 class SparkRService(
+  processName: String,
   private val rBackend: ReflectiveRBackend,
   private val sparkRBridge: SparkRBridge,
   private val sparkRProcessHandler: SparkRProcessHandler
@@ -47,6 +49,7 @@ class SparkRService(
   /** Represents the process used to execute R code via the bridge. */
   private lazy val sparkRProcess: SparkRProcess = {
     val p = new SparkRProcess(
+      processName,
       sparkRBridge,
       sparkRProcessHandler,
       rBackendPort