You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by ma...@apache.org on 2016/11/07 14:29:32 UTC

[09/15] incubator-toree git commit: Fixed some minor issues with the Kernel Info message handling

Fixed some minor issues with the Kernel Info message handling


Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/2d3cea40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/2d3cea40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/2d3cea40

Branch: refs/heads/master
Commit: 2d3cea4062135b38210faae47f77e3fffaf51d1d
Parents: e5d8d0f
Author: Marius van Niekerk <ma...@maxpoint.com>
Authored: Thu Oct 20 17:45:41 2016 -0400
Committer: Marius van Niekerk <ma...@maxpoint.com>
Committed: Thu Oct 20 17:51:00 2016 -0400

----------------------------------------------------------------------
 .../pyspark/PySparkInterpreter.scala            | 38 +++++++++++---------
 .../interpreter/sparkr/SparkRInterpreter.scala  |  2 +-
 .../kernel/interpreter/sql/SqlInterpreter.scala |  2 +-
 3 files changed, 23 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2d3cea40/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
index 72b0aa3..6e05975 100644
--- a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
@@ -37,6 +37,9 @@ import scala.tools.nsc.interpreter.{InputStream, OutputStream}
  */
 class PySparkInterpreter(
 ) extends Interpreter {
+  /** Maximum time to wait for the python kernel to be readu */
+  private val WAIT_DURATION: Long = java.util.concurrent.TimeUnit.SECONDS.toMillis(50)
+
   private val PythonExecEnv = "PYTHON_EXEC"
   private val logger = LoggerFactory.getLogger(this.getClass)
   private var _kernel:KernelLike = _
@@ -73,7 +76,8 @@ class PySparkInterpreter(
 
   /**
    * Initializes the interpreter.
-   * @param kernel The kernel
+    *
+    * @param kernel The kernel
    * @return The newly initialized interpreter
    */
   override def init(kernel: KernelLike): Interpreter = {
@@ -83,7 +87,8 @@ class PySparkInterpreter(
 
   /**
    * Executes the provided code with the option to silence output.
-   * @param code The code to execute
+    *
+    * @param code The code to execute
    * @param silent Whether or not to execute the code silently (no output)
    * @return The success/failure of the interpretation and the output from the
    *         execution or the failure
@@ -101,7 +106,8 @@ class PySparkInterpreter(
 
   /**
    * Starts the interpreter, initializing any internal state.
-   * @return A reference to the interpreter
+    *
+    * @return A reference to the interpreter
    */
   override def start(): Interpreter = {
     pySparkService.start()
@@ -111,7 +117,8 @@ class PySparkInterpreter(
 
   /**
    * Stops the interpreter, removing any previous internal state.
-   * @return A reference to the interpreter
+    *
+    * @return A reference to the interpreter
    */
   override def stop(): Interpreter = {
     pySparkService.stop()
@@ -148,21 +155,18 @@ class PySparkInterpreter(
   override def doQuietly[T](body: => T): T = ???
 
   override def languageInfo: LanguageInfo = {
-    if (!pySparkService.isRunning) pySparkService.start()
-
-    import scala.util.control.Breaks._
-    val waitLimit = System.currentTimeMillis() + java.util.concurrent.TimeUnit.SECONDS.toMillis(5)
-    while (!pySparkState.isReady) {
-      if (System.currentTimeMillis > waitLimit) {
-        logger.warn("Timed out waiting for broker state to become ready")
-        break
-      }
-    }
-
-    LanguageInfo(
+    if (!pySparkService.isRunning) or (!pySparkState.isReady) {
+      LanguageInfo(
+        "python",
+        version = "UNKNOWN",
+        fileExtension = Some(".py"),
+        pygmentsLexer = Some("python"))
+    } else {
+      LanguageInfo(
         "python",
         pySparkState.getVersion(),
         fileExtension = Some(".py"),
-        pygmentsLexer = Some("ipython2"))
+        pygmentsLexer = Some("python"))
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2d3cea40/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
index d6be29b..975dee1 100644
--- a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
@@ -139,6 +139,6 @@ class SparkRInterpreter(
   // Unsupported
   override def doQuietly[T](body: => T): T = ???
 
-  override def languageInfo = LanguageInfo("scala", BuildInfo.scalaVersion, fileExtension = Some(".scala"))
+  override def languageInfo = LanguageInfo("R", "Unknown", fileExtension = Some(".R"), pygmentsLexer = Some("r"))
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2d3cea40/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala
index b6e272f..df2952b 100644
--- a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala
+++ b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala
@@ -107,6 +107,6 @@ class SqlInterpreter() extends Interpreter {
   // Unsupported
   override def doQuietly[T](body: => T): T = ???
 
-  override def languageInfo = LanguageInfo("scala", BuildInfo.scalaVersion, fileExtension = Some(".scala"))
+  override def languageInfo = LanguageInfo("sql", BuildInfo.sparkVersion, fileExtension = Some(".sql"), pygmentsLexer = Some("sql"))
 
 }