You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by ma...@apache.org on 2016/11/07 14:29:32 UTC
[09/15] incubator-toree git commit: Fixed some minor issues with the
Kernel Info message handling
Fixed some minor issues with the Kernel Info message handling
Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/2d3cea40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/2d3cea40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/2d3cea40
Branch: refs/heads/master
Commit: 2d3cea4062135b38210faae47f77e3fffaf51d1d
Parents: e5d8d0f
Author: Marius van Niekerk <ma...@maxpoint.com>
Authored: Thu Oct 20 17:45:41 2016 -0400
Committer: Marius van Niekerk <ma...@maxpoint.com>
Committed: Thu Oct 20 17:51:00 2016 -0400
----------------------------------------------------------------------
.../pyspark/PySparkInterpreter.scala | 38 +++++++++++---------
.../interpreter/sparkr/SparkRInterpreter.scala | 2 +-
.../kernel/interpreter/sql/SqlInterpreter.scala | 2 +-
3 files changed, 23 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2d3cea40/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
index 72b0aa3..6e05975 100644
--- a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
@@ -37,6 +37,9 @@ import scala.tools.nsc.interpreter.{InputStream, OutputStream}
*/
class PySparkInterpreter(
) extends Interpreter {
+ /** Maximum time to wait for the python kernel to be readu */
+ private val WAIT_DURATION: Long = java.util.concurrent.TimeUnit.SECONDS.toMillis(50)
+
private val PythonExecEnv = "PYTHON_EXEC"
private val logger = LoggerFactory.getLogger(this.getClass)
private var _kernel:KernelLike = _
@@ -73,7 +76,8 @@ class PySparkInterpreter(
/**
* Initializes the interpreter.
- * @param kernel The kernel
+ *
+ * @param kernel The kernel
* @return The newly initialized interpreter
*/
override def init(kernel: KernelLike): Interpreter = {
@@ -83,7 +87,8 @@ class PySparkInterpreter(
/**
* Executes the provided code with the option to silence output.
- * @param code The code to execute
+ *
+ * @param code The code to execute
* @param silent Whether or not to execute the code silently (no output)
* @return The success/failure of the interpretation and the output from the
* execution or the failure
@@ -101,7 +106,8 @@ class PySparkInterpreter(
/**
* Starts the interpreter, initializing any internal state.
- * @return A reference to the interpreter
+ *
+ * @return A reference to the interpreter
*/
override def start(): Interpreter = {
pySparkService.start()
@@ -111,7 +117,8 @@ class PySparkInterpreter(
/**
* Stops the interpreter, removing any previous internal state.
- * @return A reference to the interpreter
+ *
+ * @return A reference to the interpreter
*/
override def stop(): Interpreter = {
pySparkService.stop()
@@ -148,21 +155,18 @@ class PySparkInterpreter(
override def doQuietly[T](body: => T): T = ???
override def languageInfo: LanguageInfo = {
- if (!pySparkService.isRunning) pySparkService.start()
-
- import scala.util.control.Breaks._
- val waitLimit = System.currentTimeMillis() + java.util.concurrent.TimeUnit.SECONDS.toMillis(5)
- while (!pySparkState.isReady) {
- if (System.currentTimeMillis > waitLimit) {
- logger.warn("Timed out waiting for broker state to become ready")
- break
- }
- }
-
- LanguageInfo(
+ if (!pySparkService.isRunning) or (!pySparkState.isReady) {
+ LanguageInfo(
+ "python",
+ version = "UNKNOWN",
+ fileExtension = Some(".py"),
+ pygmentsLexer = Some("python"))
+ } else {
+ LanguageInfo(
"python",
pySparkState.getVersion(),
fileExtension = Some(".py"),
- pygmentsLexer = Some("ipython2"))
+ pygmentsLexer = Some("python"))
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2d3cea40/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
index d6be29b..975dee1 100644
--- a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
+++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala
@@ -139,6 +139,6 @@ class SparkRInterpreter(
// Unsupported
override def doQuietly[T](body: => T): T = ???
- override def languageInfo = LanguageInfo("scala", BuildInfo.scalaVersion, fileExtension = Some(".scala"))
+ override def languageInfo = LanguageInfo("R", "Unknown", fileExtension = Some(".R"), pygmentsLexer = Some("r"))
}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2d3cea40/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala
----------------------------------------------------------------------
diff --git a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala
index b6e272f..df2952b 100644
--- a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala
+++ b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala
@@ -107,6 +107,6 @@ class SqlInterpreter() extends Interpreter {
// Unsupported
override def doQuietly[T](body: => T): T = ???
- override def languageInfo = LanguageInfo("scala", BuildInfo.scalaVersion, fileExtension = Some(".scala"))
+ override def languageInfo = LanguageInfo("sql", BuildInfo.sparkVersion, fileExtension = Some(".sql"), pygmentsLexer = Some("sql"))
}