You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/06/27 06:39:14 UTC
[33/50] [abbrv] incubator-livy git commit: LIVY-313. Fixed
SparkRInterpreter always returning success. (#307)
LIVY-313. Fixed SparkRInterpreter always returning success. (#307)
* LIVY-313. Fixed SparkRInterpreter always returning success.
- Stopped redirecting stderr to stdout.
- Continue to read ErrorStream (it was only being read once).
- Checking for any errors returned by stderr before returning success.
* Fixing scalastyle check error
* Changing the way errors are handled in SparkRInterpreter
* Fixing scalastyle check error
* Updating SparkRSessionSpec
Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/0de0e286
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/0de0e286
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/0de0e286
Branch: refs/heads/master
Commit: 0de0e28658d5490e289a290b3cdf8a9f12e19eb0
Parents: 5e6f9ed
Author: Jonathan Alter <jo...@users.noreply.github.com>
Authored: Wed Mar 22 23:56:57 2017 -0700
Committer: Jeff Zhang <zj...@gmail.com>
Committed: Thu Mar 23 14:56:57 2017 +0800
----------------------------------------------------------------------
.../cloudera/livy/repl/SparkRInterpreter.scala | 61 ++++++++++++++------
.../livy/repl/SparkRInterpreterSpec.scala | 13 ++---
.../cloudera/livy/repl/SparkRSessionSpec.scala | 8 +--
3 files changed, 53 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0de0e286/repl/src/main/scala/com/cloudera/livy/repl/SparkRInterpreter.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/com/cloudera/livy/repl/SparkRInterpreter.scala b/repl/src/main/scala/com/cloudera/livy/repl/SparkRInterpreter.scala
index 8e5f3c0..7318b1e 100644
--- a/repl/src/main/scala/com/cloudera/livy/repl/SparkRInterpreter.scala
+++ b/repl/src/main/scala/com/cloudera/livy/repl/SparkRInterpreter.scala
@@ -37,9 +37,12 @@ import org.json4s.JsonDSL._
import com.cloudera.livy.client.common.ClientConf
import com.cloudera.livy.rsc.RSCConf
+private case class RequestResponse(content: String, error: Boolean)
+
// scalastyle:off println
object SparkRInterpreter {
private val LIVY_END_MARKER = "----LIVY_END_OF_COMMAND----"
+ private val LIVY_ERROR_MARKER = "----LIVY_END_OF_ERROR----"
private val PRINT_MARKER = f"""print("$LIVY_END_MARKER")"""
private val EXPECTED_OUTPUT = f"""[1] "$LIVY_END_MARKER""""
@@ -188,18 +191,25 @@ class SparkRInterpreter(process: Process,
}
try {
- var content: JObject = TEXT_PLAIN -> sendRequest(code)
-
- // If we rendered anything, pass along the last image.
- tempFile.foreach { case file =>
- val bytes = Files.readAllBytes(file)
- if (bytes.nonEmpty) {
- val image = Base64.encodeBase64String(bytes)
- content = content ~ (IMAGE_PNG -> image)
+ val response = sendRequest(code)
+
+ if (response.error) {
+ Interpreter.ExecuteError("Error", response.content)
+ } else {
+ var content: JObject = TEXT_PLAIN -> response.content
+
+ // If we rendered anything, pass along the last image.
+ tempFile.foreach { case file =>
+ val bytes = Files.readAllBytes(file)
+ if (bytes.nonEmpty) {
+ val image = Base64.encodeBase64String(bytes)
+ content = content ~ (IMAGE_PNG -> image)
+ }
}
+
+ Interpreter.ExecuteSuccess(content)
}
- Interpreter.ExecuteSuccess(content)
} catch {
case e: Error =>
Interpreter.ExecuteError("Error", e.output)
@@ -211,14 +221,16 @@ class SparkRInterpreter(process: Process,
}
- private def sendRequest(code: String): String = {
- stdin.println(s"""try(eval(parse(text="${StringEscapeUtils.escapeJava(code)}")))""")
+ private def sendRequest(code: String): RequestResponse = {
+ stdin.println(s"""tryCatch(eval(parse(text="${StringEscapeUtils.escapeJava(code)}"))
+ |,error = function(e) sprintf("%s%s", e, "${LIVY_ERROR_MARKER}"))
+ """.stripMargin)
stdin.flush()
stdin.println(PRINT_MARKER)
stdin.flush()
- readTo(EXPECTED_OUTPUT)
+ readTo(EXPECTED_OUTPUT, LIVY_ERROR_MARKER)
}
override protected def sendShutdownRequest() = {
@@ -242,7 +254,10 @@ class SparkRInterpreter(process: Process,
}
@tailrec
- private def readTo(marker: String, output: StringBuilder = StringBuilder.newBuilder): String = {
+ private def readTo(
+ marker: String,
+ errorMarker: String,
+ output: StringBuilder = StringBuilder.newBuilder): RequestResponse = {
var char = readChar(output)
// Remove any ANSI color codes which match the pattern "\u001b\\[[0-9;]*[mG]".
@@ -259,15 +274,25 @@ class SparkRInterpreter(process: Process,
}
if (output.endsWith(marker)) {
- val result = output.toString()
- result.substring(0, result.length - marker.length)
- .stripPrefix("\n")
- .stripSuffix("\n")
+ var result = stripMarker(output.toString(), marker)
+
+ if (result.endsWith(errorMarker + "\"")) {
+ result = stripMarker(result, "\\n" + errorMarker)
+ RequestResponse(result, error = true)
+ } else {
+ RequestResponse(result, error = false)
+ }
} else {
- readTo(marker, output)
+ readTo(marker, errorMarker, output)
}
}
+ private def stripMarker(result: String, marker: String): String = {
+ result.replace(marker, "")
+ .stripPrefix("\n")
+ .stripSuffix("\n")
+ }
+
private def readChar(output: StringBuilder): Char = {
val byte = stdout.read()
if (byte == -1) {
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0de0e286/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala b/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala
index e9db106..f4f709f 100644
--- a/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala
+++ b/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala
@@ -84,19 +84,18 @@ class SparkRInterpreterSpec extends BaseInterpreterSpec {
it should "report an error if accessing an unknown variable" in withInterpreter { interpreter =>
val response = interpreter.execute("x")
- response should equal(Interpreter.ExecuteSuccess(
- TEXT_PLAIN -> "Error in eval(expr, envir, enclos) : object 'x' not found"
+ response should equal(Interpreter.ExecuteError(
+ "Error",
+ """[1] "Error in eval(expr, envir, enclos): object 'x' not found""""
))
}
it should "not hang when executing incomplete statements" in withInterpreter { interpreter =>
val response = interpreter.execute("x[")
- response should equal(Interpreter.ExecuteSuccess(
- TEXT_PLAIN ->
- """Error in parse(text = "x[") : <text>:2:0: unexpected end of input
- |1: x[
- | ^""".stripMargin
+ response should equal(Interpreter.ExecuteError(
+ "Error",
+ """[1] "Error in parse(text = \"x[\"): <text>:2:0: unexpected end of input\n1: x[\n ^""""
))
}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0de0e286/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala b/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala
index 5592977..4bbf87c 100644
--- a/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala
+++ b/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala
@@ -133,11 +133,11 @@ class SparkRSessionSpec extends BaseSessionSpec {
val result = parse(statement.output)
val expectedResult = Extraction.decompose(Map(
- "status" -> "ok",
+ "status" -> "error",
"execution_count" -> 0,
- "data" -> Map(
- "text/plain" -> "Error in eval(expr, envir, enclos) : object 'x' not found"
- )
+ "ename" -> "Error",
+ "evalue" -> "[1] \"Error in eval(expr, envir, enclos): object 'x' not found\"",
+ "traceback" -> List()
))
result should equal (expectedResult)