You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by ma...@apache.org on 2016/11/07 14:29:30 UTC

[07/15] incubator-toree git commit: Ensuring the broker service is started before LanguageInfo is requested

Ensuring the broker service is started before LanguageInfo is requested


Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/e5d8d0f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/e5d8d0f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/e5d8d0f0

Branch: refs/heads/master
Commit: e5d8d0f01565c7eea99cbacb8c86df078a50e8a0
Parents: b3ef81b
Author: Liam Fisk <li...@xtra.co.nz>
Authored: Fri Jul 1 10:06:11 2016 +1200
Committer: Marius van Niekerk <ma...@maxpoint.com>
Committed: Thu Oct 20 17:50:59 2016 -0400

----------------------------------------------------------------------
 .../pyspark/PySparkInterpreter.scala            | 26 ++++++++++++++------
 1 file changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/e5d8d0f0/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
index 632f52d..72b0aa3 100644
--- a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
+++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala
@@ -89,8 +89,7 @@ class PySparkInterpreter(
    *         execution or the failure
    */
   override def interpret(code: String, silent: Boolean, output: Option[OutputStream]):
-    (Result, Either[ExecuteOutput, ExecuteFailure]) =
-  {
+    (Result, Either[ExecuteOutput, ExecuteFailure]) = {
     if (!pySparkService.isRunning) pySparkService.start()
 
     val futureResult = pySparkTransformer.transformToInterpreterResult(
@@ -148,11 +147,22 @@ class PySparkInterpreter(
   // Unsupported
   override def doQuietly[T](body: => T): T = ???
 
-  // TODO Identify how to plumb python version to here
-  override def languageInfo = LanguageInfo(
-    "python",
-    pySparkState.getVersion(),
-    fileExtension = Some(".py"),
-    pygmentsLexer = Some("ipython2"))
+  override def languageInfo: LanguageInfo = {
+    if (!pySparkService.isRunning) pySparkService.start()
 
+    import scala.util.control.Breaks._
+    val waitLimit = System.currentTimeMillis() + java.util.concurrent.TimeUnit.SECONDS.toMillis(5)
+    while (!pySparkState.isReady) {
+      if (System.currentTimeMillis > waitLimit) {
+        logger.warn("Timed out waiting for broker state to become ready")
+        break
+      }
+    }
+
+    LanguageInfo(
+        "python",
+        pySparkState.getVersion(),
+        fileExtension = Some(".py"),
+        pygmentsLexer = Some("ipython2"))
+  }
 }