You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/06/27 06:39:14 UTC

[33/50] [abbrv] incubator-livy git commit: LIVY-313. Fixed SparkRInterpreter always returning success. (#307)

LIVY-313. Fixed SparkRInterpreter always returning success. (#307)

* LIVY-313. Fixed SparkRInterpreter always returning success.

- Stopped redirecting stderr to stdout.
- Continue to read ErrorStream (it was only being read once).
- Checking for any errors returned by stderr before returning success.

* Fixing scalastyle check error

* Changing the way errors are handled in SparkRInterpreter

* Fixing scalastyle check error

* Updating SparkRSessionSpec


Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/0de0e286
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/0de0e286
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/0de0e286

Branch: refs/heads/master
Commit: 0de0e28658d5490e289a290b3cdf8a9f12e19eb0
Parents: 5e6f9ed
Author: Jonathan Alter <jo...@users.noreply.github.com>
Authored: Wed Mar 22 23:56:57 2017 -0700
Committer: Jeff Zhang <zj...@gmail.com>
Committed: Thu Mar 23 14:56:57 2017 +0800

----------------------------------------------------------------------
 .../cloudera/livy/repl/SparkRInterpreter.scala  | 61 ++++++++++++++------
 .../livy/repl/SparkRInterpreterSpec.scala       | 13 ++---
 .../cloudera/livy/repl/SparkRSessionSpec.scala  |  8 +--
 3 files changed, 53 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0de0e286/repl/src/main/scala/com/cloudera/livy/repl/SparkRInterpreter.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/com/cloudera/livy/repl/SparkRInterpreter.scala b/repl/src/main/scala/com/cloudera/livy/repl/SparkRInterpreter.scala
index 8e5f3c0..7318b1e 100644
--- a/repl/src/main/scala/com/cloudera/livy/repl/SparkRInterpreter.scala
+++ b/repl/src/main/scala/com/cloudera/livy/repl/SparkRInterpreter.scala
@@ -37,9 +37,12 @@ import org.json4s.JsonDSL._
 import com.cloudera.livy.client.common.ClientConf
 import com.cloudera.livy.rsc.RSCConf
 
+private case class RequestResponse(content: String, error: Boolean)
+
 // scalastyle:off println
 object SparkRInterpreter {
   private val LIVY_END_MARKER = "----LIVY_END_OF_COMMAND----"
+  private val LIVY_ERROR_MARKER = "----LIVY_END_OF_ERROR----"
   private val PRINT_MARKER = f"""print("$LIVY_END_MARKER")"""
   private val EXPECTED_OUTPUT = f"""[1] "$LIVY_END_MARKER""""
 
@@ -188,18 +191,25 @@ class SparkRInterpreter(process: Process,
     }
 
     try {
-      var content: JObject = TEXT_PLAIN -> sendRequest(code)
-
-      // If we rendered anything, pass along the last image.
-      tempFile.foreach { case file =>
-        val bytes = Files.readAllBytes(file)
-        if (bytes.nonEmpty) {
-          val image = Base64.encodeBase64String(bytes)
-          content = content ~ (IMAGE_PNG -> image)
+      val response = sendRequest(code)
+
+      if (response.error) {
+        Interpreter.ExecuteError("Error", response.content)
+      } else {
+        var content: JObject = TEXT_PLAIN -> response.content
+
+        // If we rendered anything, pass along the last image.
+        tempFile.foreach { case file =>
+          val bytes = Files.readAllBytes(file)
+          if (bytes.nonEmpty) {
+            val image = Base64.encodeBase64String(bytes)
+            content = content ~ (IMAGE_PNG -> image)
+          }
         }
+
+        Interpreter.ExecuteSuccess(content)
       }
 
-      Interpreter.ExecuteSuccess(content)
     } catch {
       case e: Error =>
         Interpreter.ExecuteError("Error", e.output)
@@ -211,14 +221,16 @@ class SparkRInterpreter(process: Process,
 
   }
 
-  private def sendRequest(code: String): String = {
-    stdin.println(s"""try(eval(parse(text="${StringEscapeUtils.escapeJava(code)}")))""")
+  private def sendRequest(code: String): RequestResponse = {
+    stdin.println(s"""tryCatch(eval(parse(text="${StringEscapeUtils.escapeJava(code)}"))
+                     |,error = function(e) sprintf("%s%s", e, "${LIVY_ERROR_MARKER}"))
+                  """.stripMargin)
     stdin.flush()
 
     stdin.println(PRINT_MARKER)
     stdin.flush()
 
-    readTo(EXPECTED_OUTPUT)
+    readTo(EXPECTED_OUTPUT, LIVY_ERROR_MARKER)
   }
 
   override protected def sendShutdownRequest() = {
@@ -242,7 +254,10 @@ class SparkRInterpreter(process: Process,
   }
 
   @tailrec
-  private def readTo(marker: String, output: StringBuilder = StringBuilder.newBuilder): String = {
+  private def readTo(
+      marker: String,
+      errorMarker: String,
+      output: StringBuilder = StringBuilder.newBuilder): RequestResponse = {
     var char = readChar(output)
 
     // Remove any ANSI color codes which match the pattern "\u001b\\[[0-9;]*[mG]".
@@ -259,15 +274,25 @@ class SparkRInterpreter(process: Process,
     }
 
     if (output.endsWith(marker)) {
-      val result = output.toString()
-      result.substring(0, result.length - marker.length)
-        .stripPrefix("\n")
-        .stripSuffix("\n")
+      var result = stripMarker(output.toString(), marker)
+
+      if (result.endsWith(errorMarker + "\"")) {
+        result = stripMarker(result, "\\n" + errorMarker)
+        RequestResponse(result, error = true)
+      } else {
+        RequestResponse(result, error = false)
+      }
     } else {
-      readTo(marker, output)
+      readTo(marker, errorMarker, output)
     }
   }
 
+  private def stripMarker(result: String, marker: String): String = {
+    result.replace(marker, "")
+      .stripPrefix("\n")
+      .stripSuffix("\n")
+  }
+
   private def readChar(output: StringBuilder): Char = {
     val byte = stdout.read()
     if (byte == -1) {

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0de0e286/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala b/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala
index e9db106..f4f709f 100644
--- a/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala
+++ b/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala
@@ -84,19 +84,18 @@ class SparkRInterpreterSpec extends BaseInterpreterSpec {
 
   it should "report an error if accessing an unknown variable" in withInterpreter { interpreter =>
     val response = interpreter.execute("x")
-    response should equal(Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> "Error in eval(expr, envir, enclos) : object 'x' not found"
+    response should equal(Interpreter.ExecuteError(
+      "Error",
+      """[1] "Error in eval(expr, envir, enclos): object 'x' not found""""
     ))
   }
 
 
   it should "not hang when executing incomplete statements" in withInterpreter { interpreter =>
     val response = interpreter.execute("x[")
-    response should equal(Interpreter.ExecuteSuccess(
-      TEXT_PLAIN ->
-        """Error in parse(text = "x[") : <text>:2:0: unexpected end of input
-          |1: x[
-          |   ^""".stripMargin
+    response should equal(Interpreter.ExecuteError(
+      "Error",
+        """[1] "Error in parse(text = \"x[\"): <text>:2:0: unexpected end of input\n1: x[\n   ^""""
     ))
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0de0e286/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala b/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala
index 5592977..4bbf87c 100644
--- a/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala
+++ b/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala
@@ -133,11 +133,11 @@ class SparkRSessionSpec extends BaseSessionSpec {
 
     val result = parse(statement.output)
     val expectedResult = Extraction.decompose(Map(
-      "status" -> "ok",
+      "status" -> "error",
       "execution_count" -> 0,
-      "data" -> Map(
-        "text/plain" -> "Error in eval(expr, envir, enclos) : object 'x' not found"
-      )
+      "ename" -> "Error",
+      "evalue" -> "[1] \"Error in eval(expr, envir, enclos): object 'x' not found\"",
+      "traceback" -> List()
     ))
 
     result should equal (expectedResult)