You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/11/24 05:31:45 UTC

incubator-livy git commit: [LIVY-299][REPL] Output multiple lines in one statement block

Repository: incubator-livy
Updated Branches:
  refs/heads/master 1f59e102b -> f893d1991


[LIVY-299][REPL] Output multiple lines in one statement block

## What changes were proposed in this pull request?

Livy Scala Interpreter doesn't support output multiple lines in one code block, this will miss some outputs, so here change the logic to support it.

## How was this patch tested?

Add new unit tests.

Author: jerryshao <ss...@hortonworks.com>

Closes #66 from jerryshao/LIVY-299.


Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/f893d199
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/f893d199
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/f893d199

Branch: refs/heads/master
Commit: f893d19918beb41bd82169f1c3da88a1fb9e934d
Parents: 1f59e10
Author: jerryshao <ss...@hortonworks.com>
Authored: Fri Nov 24 13:31:38 2017 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Fri Nov 24 13:31:38 2017 +0800

----------------------------------------------------------------------
 .../org/apache/livy/test/InteractiveIT.scala    | 22 ++++++------
 .../livy/repl/AbstractSparkInterpreter.scala    | 32 +++++++++++++++--
 .../livy/repl/PythonInterpreterSpec.scala       | 11 ++++++
 .../apache/livy/repl/ScalaInterpreterSpec.scala | 38 ++++++++++++--------
 .../apache/livy/repl/SharedSessionSpec.scala    |  4 +--
 .../livy/repl/SparkRInterpreterSpec.scala       | 11 ++++++
 .../org/apache/livy/repl/SparkSessionSpec.scala | 12 +++----
 .../interactive/InteractiveSessionSpec.scala    |  2 +-
 8 files changed, 95 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/f893d199/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala b/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala
index 22ef739..43d66cb 100644
--- a/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala
+++ b/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala
@@ -34,8 +34,8 @@ class InteractiveIT extends BaseIntegrationTestSuite {
   test("basic interactive session") {
     withNewSession(Spark) { s =>
       s.run("val sparkVersion = sc.version").result().left.foreach(info(_))
-      s.run("1+1").verifyResult("res0: Int = 2")
-      s.run("""sc.getConf.get("spark.executor.instances")""").verifyResult("res1: String = 1")
+      s.run("1+1").verifyResult("res0: Int = 2\n")
+      s.run("""sc.getConf.get("spark.executor.instances")""").verifyResult("res1: String = 1\n")
       s.run("val sql = new org.apache.spark.sql.SQLContext(sc)").verifyResult(
         ".*" + Pattern.quote(
         "sql: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext") + ".*")
@@ -46,8 +46,8 @@ class InteractiveIT extends BaseIntegrationTestSuite {
       // Verify Livy internal configurations are not exposed.
       // TODO separate all these checks to different sub tests after merging new IT code.
       s.run("""sc.getConf.getAll.exists(_._1.startsWith("spark.__livy__."))""")
-        .verifyResult(".*false")
-      s.run("""sys.props.exists(_._1.startsWith("spark.__livy__."))""").verifyResult(".*false")
+        .verifyResult(".*false\n")
+      s.run("""sys.props.exists(_._1.startsWith("spark.__livy__."))""").verifyResult(".*false\n")
       s.run("""val str = "str"""")
       s.complete("str.", "scala", 4).verifyContaining(List("compare", "contains"))
       s.complete("str2.", "scala", 5).verifyNone()
@@ -137,16 +137,16 @@ class InteractiveIT extends BaseIntegrationTestSuite {
       // Check is the library loaded in JVM in the proper class loader.
       s.run("Thread.currentThread.getContextClassLoader.loadClass" +
           """("org.codehaus.plexus.util.FileUtils")""")
-        .verifyResult(".*Class\\[_\\] = class org.codehaus.plexus.util.FileUtils")
+        .verifyResult(".*Class\\[_\\] = class org.codehaus.plexus.util.FileUtils\n")
 
       // Check does Scala interpreter see the library.
-      s.run("import org.codehaus.plexus.util._").verifyResult("import org.codehaus.plexus.util._")
+      s.run("import org.codehaus.plexus.util._").verifyResult("import org.codehaus.plexus.util._\n")
 
       // Check does SparkContext see classes defined by Scala interpreter.
-      s.run("case class Item(i: Int)").verifyResult("defined class Item")
+      s.run("case class Item(i: Int)").verifyResult("defined class Item\n")
       s.run("val rdd = sc.parallelize(Array.fill(10){new Item(scala.util.Random.nextInt(1000))})")
         .verifyResult("rdd.*")
-      s.run("rdd.count()").verifyResult(".*= 10")
+      s.run("rdd.count()").verifyResult(".*= 10\n")
     }
   }
 
@@ -164,15 +164,15 @@ class InteractiveIT extends BaseIntegrationTestSuite {
   test("recover interactive session") {
     withNewSession(Spark) { s =>
       val stmt1 = s.run("1")
-      stmt1.verifyResult("res0: Int = 1")
+      stmt1.verifyResult("res0: Int = 1\n")
 
       restartLivy()
 
       // Verify session still exists.
       s.verifySessionIdle()
-      s.run("2").verifyResult("res1: Int = 2")
+      s.run("2").verifyResult("res1: Int = 2\n")
       // Verify statement result is preserved.
-      stmt1.verifyResult("res0: Int = 1")
+      stmt1.verifyResult("res0: Int = 1\n")
 
       s.stop()
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/f893d199/repl/src/main/scala/org/apache/livy/repl/AbstractSparkInterpreter.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/livy/repl/AbstractSparkInterpreter.scala b/repl/src/main/scala/org/apache/livy/repl/AbstractSparkInterpreter.scala
index fab8d95..d2ac04f 100644
--- a/repl/src/main/scala/org/apache/livy/repl/AbstractSparkInterpreter.scala
+++ b/repl/src/main/scala/org/apache/livy/repl/AbstractSparkInterpreter.scala
@@ -262,8 +262,34 @@ abstract class AbstractSparkInterpreter extends Interpreter with Logging {
           case Interpreter.ExecuteError(_, _, _) =>
             result
 
-          case _ =>
-            executeLines(tail, result)
+          case Interpreter.ExecuteAborted(_) =>
+            result
+
+          case Interpreter.ExecuteSuccess(e) =>
+            val mergedRet = resultFromLastLine match {
+              case Interpreter.ExecuteSuccess(s) =>
+                // Because of SparkMagic related specific logic, so we will only merge text/plain
+                // result. For any magic related output, still follow the old way.
+                if (s.values.contains(TEXT_PLAIN) && e.values.contains(TEXT_PLAIN)) {
+                  val lastRet = s.values.getOrElse(TEXT_PLAIN, "").asInstanceOf[String]
+                  val currRet = e.values.getOrElse(TEXT_PLAIN, "").asInstanceOf[String]
+                  if (lastRet.nonEmpty && currRet.nonEmpty) {
+                    Interpreter.ExecuteSuccess(TEXT_PLAIN -> s"$lastRet$currRet")
+                  } else if (lastRet.nonEmpty) {
+                    Interpreter.ExecuteSuccess(TEXT_PLAIN -> lastRet)
+                  } else if (currRet.nonEmpty) {
+                    Interpreter.ExecuteSuccess(TEXT_PLAIN -> currRet)
+                  } else {
+                    result
+                  }
+                } else {
+                  result
+                }
+
+              case _ => result
+            }
+
+            executeLines(tail, mergedRet)
         }
     }
   }
@@ -318,7 +344,7 @@ abstract class AbstractSparkInterpreter extends Interpreter with Logging {
   }
 
   private def readStdout() = {
-    val output = outputStream.toString("UTF-8").trim
+    val output = outputStream.toString("UTF-8")
     outputStream.reset()
 
     output

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/f893d199/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala b/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
index fb0704e..4a78c61 100644
--- a/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
+++ b/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
@@ -65,6 +65,17 @@ abstract class PythonBaseInterpreterSpec extends BaseInterpreterSpec {
     ))
   }
 
+  it should "get multiple outputs in one block" in withInterpreter { interpreter =>
+    val response = interpreter.execute(
+      """
+        |print("1")
+        |print("2")
+      """.stripMargin)
+    response should equal(Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "1\n2"
+    ))
+  }
+
   it should "parse a class" in withInterpreter { interpreter =>
     val response = interpreter.execute(
       """

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/f893d199/repl/src/test/scala/org/apache/livy/repl/ScalaInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/ScalaInterpreterSpec.scala b/repl/src/test/scala/org/apache/livy/repl/ScalaInterpreterSpec.scala
index 5715e4d..a13efeb 100644
--- a/repl/src/test/scala/org/apache/livy/repl/ScalaInterpreterSpec.scala
+++ b/repl/src/test/scala/org/apache/livy/repl/ScalaInterpreterSpec.scala
@@ -33,24 +33,24 @@ class ScalaInterpreterSpec extends BaseInterpreterSpec {
   it should "execute `1 + 2` == 3" in withInterpreter { interpreter =>
     val response = interpreter.execute("1 + 2")
     response should equal (Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> "res0: Int = 3"
+      TEXT_PLAIN -> "res0: Int = 3\n"
     ))
   }
 
   it should "execute multiple statements" in withInterpreter { interpreter =>
     var response = interpreter.execute("val x = 1")
     response should equal (Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> "x: Int = 1"
+      TEXT_PLAIN -> "x: Int = 1\n"
     ))
 
     response = interpreter.execute("val y = 2")
     response should equal (Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> "y: Int = 2"
+      TEXT_PLAIN -> "y: Int = 2\n"
     ))
 
     response = interpreter.execute("x + y")
     response should equal (Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> "res0: Int = 3"
+      TEXT_PLAIN -> "res0: Int = 3\n"
     ))
   }
 
@@ -64,7 +64,7 @@ class ScalaInterpreterSpec extends BaseInterpreterSpec {
         |x + y
       """.stripMargin)
     response should equal(Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> "res2: Int = 3"
+      TEXT_PLAIN -> "x: Int = 1\ny: Int = 2\nres2: Int = 3\n"
     ))
   }
 
@@ -96,14 +96,24 @@ class ScalaInterpreterSpec extends BaseInterpreterSpec {
       """.stripMargin)
 
     response should equal(Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> "res0: Int = 3"
+      TEXT_PLAIN -> "res0: Int = 3\n"
     ))
   }
 
   it should "capture stdout" in withInterpreter { interpreter =>
     val response = interpreter.execute("println(\"Hello World\")")
     response should equal(Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> "Hello World"
+      TEXT_PLAIN -> "Hello World\n"
+    ))
+
+    val resp1 = interpreter.execute("print(1)\nprint(2)")
+    resp1 should equal(Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "12"
+    ))
+
+    val resp2 = interpreter.execute("println(1)\nprintln(2)")
+    resp2 should equal(Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "1\n2\n"
     ))
   }
 
@@ -123,7 +133,7 @@ class ScalaInterpreterSpec extends BaseInterpreterSpec {
       """sc.parallelize(0 to 1).map { i => i+1 }.collect""".stripMargin)
 
     response should equal(Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> "res0: Array[Int] = Array(1, 2)"
+      TEXT_PLAIN -> "res0: Array[Int] = Array(1, 2)\n"
     ))
   }
 
@@ -144,7 +154,7 @@ class ScalaInterpreterSpec extends BaseInterpreterSpec {
       """val r = 1
         |// comment
       """.stripMargin)
-    response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
+    response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1\n"))
 
     response = interpreter.execute(
       """val r = 1
@@ -153,7 +163,7 @@ class ScalaInterpreterSpec extends BaseInterpreterSpec {
         |comment
         |*/
       """.stripMargin)
-    response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
+    response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1\n"))
 
     // Test statements ending with a mix of single line and multi-line comments
     response = interpreter.execute(
@@ -165,7 +175,7 @@ class ScalaInterpreterSpec extends BaseInterpreterSpec {
         |*/
         |// comment
       """.stripMargin)
-    response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
+    response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1\n"))
 
     response = interpreter.execute(
       """val r = 1
@@ -175,7 +185,7 @@ class ScalaInterpreterSpec extends BaseInterpreterSpec {
         |comment
         |*/
       """.stripMargin)
-    response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
+    response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1\n"))
 
     // Make sure incomplete statement is still returned as incomplete statement.
     response = interpreter.execute("sc.")
@@ -195,12 +205,12 @@ class ScalaInterpreterSpec extends BaseInterpreterSpec {
 
     try {
       response should equal(
-        Interpreter.ExecuteSuccess(TEXT_PLAIN -> s"r: String = \n$stringWithComment"))
+        Interpreter.ExecuteSuccess(TEXT_PLAIN -> s"r: String = \n$stringWithComment\n"))
     } catch {
       case _: Exception =>
         response should equal(
           // Scala 2.11 doesn't have a " " after "="
-          Interpreter.ExecuteSuccess(TEXT_PLAIN -> s"r: String =\n$stringWithComment"))
+          Interpreter.ExecuteSuccess(TEXT_PLAIN -> s"r: String =\n$stringWithComment\n"))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/f893d199/repl/src/test/scala/org/apache/livy/repl/SharedSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/SharedSessionSpec.scala b/repl/src/test/scala/org/apache/livy/repl/SharedSessionSpec.scala
index 1edb8a2..18a6db7 100644
--- a/repl/src/test/scala/org/apache/livy/repl/SharedSessionSpec.scala
+++ b/repl/src/test/scala/org/apache/livy/repl/SharedSessionSpec.scala
@@ -48,7 +48,7 @@ class SharedSessionSpec extends BaseSessionSpec(Shared) {
       "status" -> "ok",
       "execution_count" -> 0,
       "data" -> Map(
-        "text/plain" -> "res0: Int = 3"
+        "text/plain" -> "res0: Int = 3\n"
       )
     ))
 
@@ -82,7 +82,7 @@ class SharedSessionSpec extends BaseSessionSpec(Shared) {
       "status" -> "ok",
       "execution_count" -> 0,
       "data" -> Map(
-        "text/plain" -> "res0: Array[Int] = Array(1, 2)"
+        "text/plain" -> "res0: Array[Int] = Array(1, 2)\n"
       )
     ))
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/f893d199/repl/src/test/scala/org/apache/livy/repl/SparkRInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/SparkRInterpreterSpec.scala b/repl/src/test/scala/org/apache/livy/repl/SparkRInterpreterSpec.scala
index e032e15..4976396 100644
--- a/repl/src/test/scala/org/apache/livy/repl/SparkRInterpreterSpec.scala
+++ b/repl/src/test/scala/org/apache/livy/repl/SparkRInterpreterSpec.scala
@@ -77,6 +77,17 @@ class SparkRInterpreterSpec extends BaseInterpreterSpec {
     ))
   }
 
+  it should "get multiple outputs in one block" in withInterpreter { interpreter =>
+    val response = interpreter.execute(
+      """
+        |print("1")
+        |print("2")
+      """.stripMargin)
+    response should equal(Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "[1] \"1\"\n[1] \"2\""
+    ))
+  }
+
   it should "capture stdout" in withInterpreter { interpreter =>
     val response = interpreter.execute("cat(3)")
     response should equal(Interpreter.ExecuteSuccess(

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/f893d199/repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala b/repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala
index dad817b..90e2828 100644
--- a/repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala
+++ b/repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala
@@ -39,7 +39,7 @@ class SparkSessionSpec extends BaseSessionSpec(Spark) {
       "status" -> "ok",
       "execution_count" -> 0,
       "data" -> Map(
-        "text/plain" -> "res0: Int = 3"
+        "text/plain" -> "res0: Int = 3\n"
       )
     ))
 
@@ -56,7 +56,7 @@ class SparkSessionSpec extends BaseSessionSpec(Spark) {
       "status" -> "ok",
       "execution_count" -> 0,
       "data" -> Map(
-        "text/plain" -> "x: Int = 1"
+        "text/plain" -> "x: Int = 1\n"
       )
     ))
 
@@ -70,7 +70,7 @@ class SparkSessionSpec extends BaseSessionSpec(Spark) {
       "status" -> "ok",
       "execution_count" -> 1,
       "data" -> Map(
-        "text/plain" -> "y: Int = 2"
+        "text/plain" -> "y: Int = 2\n"
       )
     ))
 
@@ -84,7 +84,7 @@ class SparkSessionSpec extends BaseSessionSpec(Spark) {
       "status" -> "ok",
       "execution_count" -> 2,
       "data" -> Map(
-        "text/plain" -> "res0: Int = 3"
+        "text/plain" -> "res0: Int = 3\n"
       )
     ))
 
@@ -100,7 +100,7 @@ class SparkSessionSpec extends BaseSessionSpec(Spark) {
       "status" -> "ok",
       "execution_count" -> 0,
       "data" -> Map(
-        "text/plain" -> "Hello World"
+        "text/plain" -> "Hello World\n"
       )
     ))
 
@@ -169,7 +169,7 @@ class SparkSessionSpec extends BaseSessionSpec(Spark) {
       "status" -> "ok",
       "execution_count" -> 0,
       "data" -> Map(
-        "text/plain" -> "res0: Array[Int] = Array(1, 2)"
+        "text/plain" -> "res0: Array[Int] = Array(1, 2)\n"
       )
     ))
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/f893d199/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
index 39790f7..3fa2fc4 100644
--- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
@@ -185,7 +185,7 @@ class InteractiveSessionSpec extends FunSpec
       scalaResult should equal (Extraction.decompose(Map(
         "status" -> "ok",
         "execution_count" -> 1,
-        "data" -> Map("text/plain" -> "res0: Int = 3")))
+        "data" -> Map("text/plain" -> "res0: Int = 3\n")))
       )
 
       val rResult = executeStatement("1 + 2", Some("sparkr"))