You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/07/05 03:06:54 UTC
[25/33] incubator-livy git commit: LIVY-375. Change Livy code package
name to org.apache.livy
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/com/cloudera/livy/repl/ReplDriverSuite.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/com/cloudera/livy/repl/ReplDriverSuite.scala b/repl/src/test/scala/com/cloudera/livy/repl/ReplDriverSuite.scala
deleted file mode 100644
index c2eca91..0000000
--- a/repl/src/test/scala/com/cloudera/livy/repl/ReplDriverSuite.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.repl
-
-import java.net.URI
-import java.util.concurrent.TimeUnit
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import org.apache.spark.launcher.SparkLauncher
-import org.json4s._
-import org.json4s.jackson.JsonMethods._
-import org.scalatest.FunSuite
-import org.scalatest.concurrent.Eventually._
-
-import com.cloudera.livy._
-import com.cloudera.livy.rsc.{PingJob, RSCClient, RSCConf}
-import com.cloudera.livy.sessions.Spark
-
-class ReplDriverSuite extends FunSuite with LivyBaseUnitTestSuite {
-
- private implicit val formats = DefaultFormats
-
- test("start a repl session using the rsc") {
- val client = new LivyClientBuilder()
- .setConf(SparkLauncher.DRIVER_MEMORY, "512m")
- .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, sys.props("java.class.path"))
- .setConf(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH, sys.props("java.class.path"))
- .setConf(RSCConf.Entry.LIVY_JARS.key(), "")
- .setURI(new URI("rsc:/"))
- .setConf(RSCConf.Entry.DRIVER_CLASS.key(), classOf[ReplDriver].getName())
- .setConf(RSCConf.Entry.SESSION_KIND.key(), Spark().toString)
- .build()
- .asInstanceOf[RSCClient]
-
- try {
- // This is sort of what InteractiveSession.scala does to detect an idle session.
- client.submit(new PingJob()).get(60, TimeUnit.SECONDS)
-
- val statementId = client.submitReplCode("1 + 1").get
- eventually(timeout(30 seconds), interval(100 millis)) {
- val rawResult =
- client.getReplJobResults(statementId, 1).get(10, TimeUnit.SECONDS).statements(0)
- val result = rawResult.output
- assert((parse(result) \ Session.STATUS).extract[String] === Session.OK)
- }
- } finally {
- client.stop(true)
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/com/cloudera/livy/repl/ScalaInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/com/cloudera/livy/repl/ScalaInterpreterSpec.scala b/repl/src/test/scala/com/cloudera/livy/repl/ScalaInterpreterSpec.scala
deleted file mode 100644
index 9df77c3..0000000
--- a/repl/src/test/scala/com/cloudera/livy/repl/ScalaInterpreterSpec.scala
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.repl
-
-import org.apache.spark.SparkConf
-import org.json4s.{DefaultFormats, JValue}
-import org.json4s.JsonDSL._
-
-import com.cloudera.livy.rsc.RSCConf
-
-class ScalaInterpreterSpec extends BaseInterpreterSpec {
-
- implicit val formats = DefaultFormats
-
- override def createInterpreter(): Interpreter =
- new SparkInterpreter(new SparkConf())
-
- it should "execute `1 + 2` == 3" in withInterpreter { interpreter =>
- val response = interpreter.execute("1 + 2")
- response should equal (Interpreter.ExecuteSuccess(
- TEXT_PLAIN -> "res0: Int = 3"
- ))
- }
-
- it should "execute multiple statements" in withInterpreter { interpreter =>
- var response = interpreter.execute("val x = 1")
- response should equal (Interpreter.ExecuteSuccess(
- TEXT_PLAIN -> "x: Int = 1"
- ))
-
- response = interpreter.execute("val y = 2")
- response should equal (Interpreter.ExecuteSuccess(
- TEXT_PLAIN -> "y: Int = 2"
- ))
-
- response = interpreter.execute("x + y")
- response should equal (Interpreter.ExecuteSuccess(
- TEXT_PLAIN -> "res0: Int = 3"
- ))
- }
-
- it should "execute multiple statements in one block" in withInterpreter { interpreter =>
- val response = interpreter.execute(
- """
- |val x = 1
- |
- |val y = 2
- |
- |x + y
- """.stripMargin)
- response should equal(Interpreter.ExecuteSuccess(
- TEXT_PLAIN -> "res2: Int = 3"
- ))
- }
-
- it should "do table magic" in withInterpreter { interpreter =>
- val response = interpreter.execute(
- """val x = List(List(1, "a"), List(3, "b"))
- |%table x
- """.stripMargin)
-
- response should equal(Interpreter.ExecuteSuccess(
- APPLICATION_LIVY_TABLE_JSON -> (
- ("headers" -> List(
- ("type" -> "BIGINT_TYPE") ~ ("name" -> "0"),
- ("type" -> "STRING_TYPE") ~ ("name" -> "1")
- )) ~
- ("data" -> List(
- List[JValue](1, "a"),
- List[JValue](3, "b")
- ))
- )
- ))
- }
-
- it should "allow magic inside statements" in withInterpreter { interpreter =>
- val response = interpreter.execute(
- """val x = List(List(1, "a"), List(3, "b"))
- |%table x
- |1 + 2
- """.stripMargin)
-
- response should equal(Interpreter.ExecuteSuccess(
- TEXT_PLAIN -> "res0: Int = 3"
- ))
- }
-
- it should "capture stdout" in withInterpreter { interpreter =>
- val response = interpreter.execute("println(\"Hello World\")")
- response should equal(Interpreter.ExecuteSuccess(
- TEXT_PLAIN -> "Hello World"
- ))
- }
-
- it should "report an error if accessing an unknown variable" in withInterpreter { interpreter =>
- interpreter.execute("x") match {
- case Interpreter.ExecuteError(ename, evalue, _) =>
- ename should equal ("Error")
- evalue should include ("error: not found: value x")
-
- case other =>
- fail(s"Expected error, got $other.")
- }
- }
-
- it should "execute spark commands" in withInterpreter { interpreter =>
- val response = interpreter.execute(
- """sc.parallelize(0 to 1).map { i => i+1 }.collect""".stripMargin)
-
- response should equal(Interpreter.ExecuteSuccess(
- TEXT_PLAIN -> "res0: Array[Int] = Array(1, 2)"
- ))
- }
-
- it should "handle statements ending with comments" in withInterpreter { interpreter =>
- // Test statements with only comments
- var response = interpreter.execute("""// comment""")
- response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> ""))
-
- response = interpreter.execute(
- """/*
- |comment
- |*/
- """.stripMargin)
- response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> ""))
-
- // Test statements ending with comments
- response = interpreter.execute(
- """val r = 1
- |// comment
- """.stripMargin)
- response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
-
- response = interpreter.execute(
- """val r = 1
- |/*
- |comment
- |comment
- |*/
- """.stripMargin)
- response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
-
- // Test statements ending with a mix of single line and multi-line comments
- response = interpreter.execute(
- """val r = 1
- |// comment
- |/*
- |comment
- |comment
- |*/
- |// comment
- """.stripMargin)
- response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
-
- response = interpreter.execute(
- """val r = 1
- |/*
- |comment
- |// comment
- |comment
- |*/
- """.stripMargin)
- response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
-
- // Make sure incomplete statement is still returned as incomplete statement.
- response = interpreter.execute("sc.")
- response should equal(Interpreter.ExecuteIncomplete())
-
- // Make sure incomplete statement is still returned as incomplete statement.
- response = interpreter.execute(
- """sc.
- |// comment
- """.stripMargin)
- response should equal(Interpreter.ExecuteIncomplete())
-
- // Make sure our handling doesn't mess up a string with value like comments.
- val tripleQuotes = "\"\"\""
- val stringWithComment = s"/*\ncomment\n*/\n//comment"
- response = interpreter.execute(s"val r = $tripleQuotes$stringWithComment$tripleQuotes")
-
- try {
- response should equal(
- Interpreter.ExecuteSuccess(TEXT_PLAIN -> s"r: String = \n$stringWithComment"))
- } catch {
- case _: Exception =>
- response should equal(
- // Scala 2.11 doesn't have a " " after "="
- Interpreter.ExecuteSuccess(TEXT_PLAIN -> s"r: String =\n$stringWithComment"))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/com/cloudera/livy/repl/SessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/com/cloudera/livy/repl/SessionSpec.scala b/repl/src/test/scala/com/cloudera/livy/repl/SessionSpec.scala
deleted file mode 100644
index 92d2322..0000000
--- a/repl/src/test/scala/com/cloudera/livy/repl/SessionSpec.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.repl
-
-import java.util.Properties
-import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch, TimeUnit}
-
-import org.mockito.Mockito.when
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
-import org.scalatest.FunSpec
-import org.scalatest.Matchers._
-import org.scalatest.concurrent.Eventually
-import org.scalatest.mock.MockitoSugar.mock
-import org.scalatest.time._
-
-import com.cloudera.livy.LivyBaseUnitTestSuite
-import com.cloudera.livy.repl.Interpreter.ExecuteResponse
-import com.cloudera.livy.rsc.RSCConf
-
-class SessionSpec extends FunSpec with Eventually with LivyBaseUnitTestSuite {
- override implicit val patienceConfig =
- PatienceConfig(timeout = scaled(Span(10, Seconds)), interval = scaled(Span(100, Millis)))
-
- private val rscConf = new RSCConf(new Properties())
-
- describe("Session") {
- it("should call state changed callbacks in happy path") {
- val expectedStateTransitions =
- Array("not_started", "starting", "idle", "busy", "idle", "busy", "idle")
- val actualStateTransitions = new ConcurrentLinkedQueue[String]()
-
- val interpreter = mock[Interpreter]
- when(interpreter.kind).thenAnswer(new Answer[String] {
- override def answer(invocationOnMock: InvocationOnMock): String = "spark"
- })
-
- val session =
- new Session(rscConf, interpreter, { s => actualStateTransitions.add(s.toString) })
-
- session.start()
-
- session.execute("")
-
- eventually {
- actualStateTransitions.toArray shouldBe expectedStateTransitions
- }
- }
-
- it("should not transit to idle if there're any pending statements.") {
- val expectedStateTransitions =
- Array("not_started", "busy", "busy", "busy", "idle", "busy", "idle")
- val actualStateTransitions = new ConcurrentLinkedQueue[String]()
-
- val interpreter = mock[Interpreter]
- when(interpreter.kind).thenAnswer(new Answer[String] {
- override def answer(invocationOnMock: InvocationOnMock): String = "spark"
- })
-
- val blockFirstExecuteCall = new CountDownLatch(1)
- when(interpreter.execute("")).thenAnswer(new Answer[Interpreter.ExecuteResponse] {
- override def answer(invocation: InvocationOnMock): ExecuteResponse = {
- blockFirstExecuteCall.await(10, TimeUnit.SECONDS)
- null
- }
- })
- val session =
- new Session(rscConf, interpreter, { s => actualStateTransitions.add(s.toString) })
-
- for (_ <- 1 to 2) {
- session.execute("")
- }
-
- blockFirstExecuteCall.countDown()
- eventually {
- actualStateTransitions.toArray shouldBe expectedStateTransitions
- }
- }
-
- it("should remove old statements when reaching threshold") {
- val interpreter = mock[Interpreter]
- when(interpreter.kind).thenAnswer(new Answer[String] {
- override def answer(invocationOnMock: InvocationOnMock): String = "spark"
- })
-
- rscConf.set(RSCConf.Entry.RETAINED_STATEMENT_NUMBER, 2)
- val session = new Session(rscConf, interpreter)
- session.start()
-
- session.statements.size should be (0)
- session.execute("")
- session.statements.size should be (1)
- session.statements.map(_._1).toSet should be (Set(0))
- session.execute("")
- session.statements.size should be (2)
- session.statements.map(_._1).toSet should be (Set(0, 1))
- session.execute("")
- eventually {
- session.statements.size should be (2)
- session.statements.map(_._1).toSet should be (Set(1, 2))
- }
-
- // Continue submitting statements, total statements in memory should be 2.
- session.execute("")
- eventually {
- session.statements.size should be (2)
- session.statements.map(_._1).toSet should be (Set(2, 3))
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala b/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala
deleted file mode 100644
index bde49b1..0000000
--- a/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.repl
-
-import org.apache.spark.SparkConf
-import org.json4s.{DefaultFormats, JValue}
-import org.json4s.JsonDSL._
-import org.scalatest._
-
-import com.cloudera.livy.rsc.RSCConf
-
-class SparkRInterpreterSpec extends BaseInterpreterSpec {
-
- implicit val formats = DefaultFormats
-
- override protected def withFixture(test: NoArgTest): Outcome = {
- assume(!sys.props.getOrElse("skipRTests", "false").toBoolean, "Skipping R tests.")
- super.withFixture(test)
- }
-
- override def createInterpreter(): Interpreter = SparkRInterpreter(new SparkConf())
-
- it should "execute `1 + 2` == 3" in withInterpreter { interpreter =>
- val response = interpreter.execute("1 + 2")
- response should equal (Interpreter.ExecuteSuccess(
- TEXT_PLAIN -> "[1] 3"
- ))
- }
-
- it should "execute multiple statements" in withInterpreter { interpreter =>
- var response = interpreter.execute("x = 1")
- response should equal (Interpreter.ExecuteSuccess(
- TEXT_PLAIN -> ""
- ))
-
- response = interpreter.execute("y = 2")
- response should equal (Interpreter.ExecuteSuccess(
- TEXT_PLAIN -> ""
- ))
-
- response = interpreter.execute("x + y")
- response should equal (Interpreter.ExecuteSuccess(
- TEXT_PLAIN -> "[1] 3"
- ))
- }
-
- it should "execute multiple statements in one block" in withInterpreter { interpreter =>
- val response = interpreter.execute(
- """
- |x = 1
- |
- |y = 2
- |
- |x + y
- """.stripMargin)
- response should equal(Interpreter.ExecuteSuccess(
- TEXT_PLAIN -> "[1] 3"
- ))
- }
-
- it should "capture stdout" in withInterpreter { interpreter =>
- val response = interpreter.execute("cat(3)")
- response should equal(Interpreter.ExecuteSuccess(
- TEXT_PLAIN -> "3"
- ))
- }
-
- it should "report an error if accessing an unknown variable" in withInterpreter { interpreter =>
- val response = interpreter.execute("x")
- assert(response.isInstanceOf[Interpreter.ExecuteError])
- val errorResponse = response.asInstanceOf[Interpreter.ExecuteError]
- errorResponse.ename should be ("Error")
- assert(errorResponse.evalue.contains("object 'x' not found"))
- }
-
-
- it should "not hang when executing incomplete statements" in withInterpreter { interpreter =>
- val response = interpreter.execute("x[")
- response should equal(Interpreter.ExecuteError(
- "Error",
- """[1] "Error in parse(text = \"x[\"): <text>:2:0: unexpected end of input\n1: x[\n ^""""
- ))
- }
-
- it should "escape the statement" in withInterpreter { interpreter =>
- val response = interpreter.execute("print(\"a\")")
- response should equal(Interpreter.ExecuteSuccess(
- TEXT_PLAIN -> "[1] \"a\""
- ))
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala b/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala
deleted file mode 100644
index 2ca4130..0000000
--- a/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.repl
-
-import org.apache.spark.SparkConf
-import org.json4s.Extraction
-import org.json4s.jackson.JsonMethods.parse
-
-import com.cloudera.livy.rsc.RSCConf
-
-class SparkRSessionSpec extends BaseSessionSpec {
-
- override protected def withFixture(test: NoArgTest) = {
- assume(!sys.props.getOrElse("skipRTests", "false").toBoolean, "Skipping R tests.")
- super.withFixture(test)
- }
-
- override def createInterpreter(): Interpreter = SparkRInterpreter(new SparkConf())
-
- it should "execute `1 + 2` == 3" in withSession { session =>
- val statement = execute(session)("1 + 2")
- statement.id should equal(0)
-
- val result = parse(statement.output)
- val expectedResult = Extraction.decompose(Map(
- "status" -> "ok",
- "execution_count" -> 0,
- "data" -> Map(
- "text/plain" -> "[1] 3"
- )
- ))
-
- result should equal(expectedResult)
- }
-
- it should "execute `x = 1`, then `y = 2`, then `x + y`" in withSession { session =>
- val executeWithSession = execute(session)(_)
- var statement = executeWithSession("x = 1")
- statement.id should equal (0)
-
- var result = parse(statement.output)
- var expectedResult = Extraction.decompose(Map(
- "status" -> "ok",
- "execution_count" -> 0,
- "data" -> Map(
- "text/plain" -> ""
- )
- ))
-
- result should equal (expectedResult)
-
- statement = executeWithSession("y = 2")
- statement.id should equal (1)
-
- result = parse(statement.output)
- expectedResult = Extraction.decompose(Map(
- "status" -> "ok",
- "execution_count" -> 1,
- "data" -> Map(
- "text/plain" -> ""
- )
- ))
-
- result should equal (expectedResult)
-
- statement = executeWithSession("x + y")
- statement.id should equal (2)
-
- result = parse(statement.output)
- expectedResult = Extraction.decompose(Map(
- "status" -> "ok",
- "execution_count" -> 2,
- "data" -> Map(
- "text/plain" -> "[1] 3"
- )
- ))
-
- result should equal (expectedResult)
- }
-
- it should "capture stdout from print" in withSession { session =>
- val statement = execute(session)("""print('Hello World')""")
- statement.id should equal (0)
-
- val result = parse(statement.output)
- val expectedResult = Extraction.decompose(Map(
- "status" -> "ok",
- "execution_count" -> 0,
- "data" -> Map(
- "text/plain" -> "[1] \"Hello World\""
- )
- ))
-
- result should equal (expectedResult)
- }
-
- it should "capture stdout from cat" in withSession { session =>
- val statement = execute(session)("""cat(3)""")
- statement.id should equal (0)
-
- val result = parse(statement.output)
- val expectedResult = Extraction.decompose(Map(
- "status" -> "ok",
- "execution_count" -> 0,
- "data" -> Map(
- "text/plain" -> "3"
- )
- ))
-
- result should equal (expectedResult)
- }
-
- it should "report an error if accessing an unknown variable" in withSession { session =>
- val statement = execute(session)("""x""")
- statement.id should equal (0)
-
- val result = parse(statement.output)
- (result \ "status").extract[String] should be ("error")
- (result \ "execution_count").extract[Int] should be (0)
- (result \ "ename").extract[String] should be ("Error")
- assert((result \ "evalue").extract[String].contains("object 'x' not found"))
- (result \ "traceback").extract[List[String]] should be (List())
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/com/cloudera/livy/repl/SparkSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/com/cloudera/livy/repl/SparkSessionSpec.scala b/repl/src/test/scala/com/cloudera/livy/repl/SparkSessionSpec.scala
deleted file mode 100644
index 1b2be30..0000000
--- a/repl/src/test/scala/com/cloudera/livy/repl/SparkSessionSpec.scala
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.repl
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import org.apache.spark.SparkConf
-import org.json4s.Extraction
-import org.json4s.JsonAST.JValue
-import org.json4s.jackson.JsonMethods.parse
-import org.scalatest.concurrent.Eventually._
-
-import com.cloudera.livy.rsc.RSCConf
-import com.cloudera.livy.rsc.driver.StatementState
-
-class SparkSessionSpec extends BaseSessionSpec {
-
- override def createInterpreter(): Interpreter = new SparkInterpreter(new SparkConf())
-
- it should "execute `1 + 2` == 3" in withSession { session =>
- val statement = execute(session)("1 + 2")
- statement.id should equal (0)
-
- val result = parse(statement.output)
- val expectedResult = Extraction.decompose(Map(
- "status" -> "ok",
- "execution_count" -> 0,
- "data" -> Map(
- "text/plain" -> "res0: Int = 3"
- )
- ))
-
- result should equal (expectedResult)
- }
-
- it should "execute `x = 1`, then `y = 2`, then `x + y`" in withSession { session =>
- val executeWithSession = execute(session)(_)
- var statement = executeWithSession("val x = 1")
- statement.id should equal (0)
-
- var result = parse(statement.output)
- var expectedResult = Extraction.decompose(Map(
- "status" -> "ok",
- "execution_count" -> 0,
- "data" -> Map(
- "text/plain" -> "x: Int = 1"
- )
- ))
-
- result should equal (expectedResult)
-
- statement = executeWithSession("val y = 2")
- statement.id should equal (1)
-
- result = parse(statement.output)
- expectedResult = Extraction.decompose(Map(
- "status" -> "ok",
- "execution_count" -> 1,
- "data" -> Map(
- "text/plain" -> "y: Int = 2"
- )
- ))
-
- result should equal (expectedResult)
-
- statement = executeWithSession("x + y")
- statement.id should equal (2)
-
- result = parse(statement.output)
- expectedResult = Extraction.decompose(Map(
- "status" -> "ok",
- "execution_count" -> 2,
- "data" -> Map(
- "text/plain" -> "res0: Int = 3"
- )
- ))
-
- result should equal (expectedResult)
- }
-
- it should "capture stdout" in withSession { session =>
- val statement = execute(session)("""println("Hello World")""")
- statement.id should equal (0)
-
- val result = parse(statement.output)
- val expectedResult = Extraction.decompose(Map(
- "status" -> "ok",
- "execution_count" -> 0,
- "data" -> Map(
- "text/plain" -> "Hello World"
- )
- ))
-
- result should equal (expectedResult)
- }
-
- it should "report an error if accessing an unknown variable" in withSession { session =>
- val statement = execute(session)("""x""")
- statement.id should equal (0)
-
- val result = parse(statement.output)
-
- def extract(key: String): String = (result \ key).extract[String]
-
- extract("status") should equal ("error")
- extract("execution_count") should equal ("0")
- extract("ename") should equal ("Error")
- extract("evalue") should include ("error: not found: value x")
- }
-
- it should "report an error if exception is thrown" in withSession { session =>
- val statement = execute(session)(
- """def func1() {
- |throw new Exception()
- |}
- |func1()""".stripMargin)
- statement.id should equal (0)
-
- val result = parse(statement.output)
- val resultMap = result.extract[Map[String, JValue]]
-
- // Manually extract the values since the line numbers in the exception could change.
- resultMap("status").extract[String] should equal ("error")
- resultMap("execution_count").extract[Int] should equal (0)
- resultMap("ename").extract[String] should equal ("Error")
- resultMap("evalue").extract[String] should include ("java.lang.Exception")
-
- val traceback = resultMap("traceback").extract[Seq[String]]
- traceback(0) should include ("func1(<console>:")
- }
-
- it should "access the spark context" in withSession { session =>
- val statement = execute(session)("""sc""")
- statement.id should equal (0)
-
- val result = parse(statement.output)
- val resultMap = result.extract[Map[String, JValue]]
-
- // Manually extract the values since the line numbers in the exception could change.
- resultMap("status").extract[String] should equal ("ok")
- resultMap("execution_count").extract[Int] should equal (0)
-
- val data = resultMap("data").extract[Map[String, JValue]]
- data("text/plain").extract[String] should include (
- "res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext")
- }
-
- it should "execute spark commands" in withSession { session =>
- val statement = execute(session)(
- """sc.parallelize(0 to 1).map{i => i+1}.collect""".stripMargin)
- statement.id should equal (0)
-
- val result = parse(statement.output)
-
- val expectedResult = Extraction.decompose(Map(
- "status" -> "ok",
- "execution_count" -> 0,
- "data" -> Map(
- "text/plain" -> "res0: Array[Int] = Array(1, 2)"
- )
- ))
-
- result should equal (expectedResult)
- }
-
- it should "do table magic" in withSession { session =>
- val statement = execute(session)("val x = List((1, \"a\"), (3, \"b\"))\n%table x")
- statement.id should equal (0)
-
- val result = parse(statement.output)
-
- val expectedResult = Extraction.decompose(Map(
- "status" -> "ok",
- "execution_count" -> 0,
- "data" -> Map(
- "application/vnd.livy.table.v1+json" -> Map(
- "headers" -> List(
- Map("type" -> "BIGINT_TYPE", "name" -> "_1"),
- Map("type" -> "STRING_TYPE", "name" -> "_2")),
- "data" -> List(List(1, "a"), List(3, "b"))
- )
- )
- ))
-
- result should equal (expectedResult)
- }
-
- it should "cancel spark jobs" in withSession { session =>
- val stmtId = session.execute(
- """sc.parallelize(0 to 10).map { i => Thread.sleep(10000); i + 1 }.collect""".stripMargin)
- eventually(timeout(30 seconds), interval(100 millis)) {
- assert(session.statements(stmtId).state.get() == StatementState.Running)
- }
- session.cancel(stmtId)
-
- eventually(timeout(30 seconds), interval(100 millis)) {
- assert(session.statements(stmtId).state.get() == StatementState.Cancelled)
- session.statements(stmtId).output should include (
- "Job 0 cancelled part of cancelled job group 0")
- }
- }
-
- it should "cancel waiting statement" in withSession { session =>
- val stmtId1 = session.execute(
- """sc.parallelize(0 to 10).map { i => Thread.sleep(10000); i + 1 }.collect""".stripMargin)
- val stmtId2 = session.execute(
- """sc.parallelize(0 to 10).map { i => Thread.sleep(10000); i + 1 }.collect""".stripMargin)
- eventually(timeout(30 seconds), interval(100 millis)) {
- assert(session.statements(stmtId1).state.get() == StatementState.Running)
- }
-
- assert(session.statements(stmtId2).state.get() == StatementState.Waiting)
-
- session.cancel(stmtId2)
- assert(session.statements(stmtId2).state.get() == StatementState.Cancelled)
-
- session.cancel(stmtId1)
- assert(session.statements(stmtId1).state.get() == StatementState.Cancelling)
- eventually(timeout(30 seconds), interval(100 millis)) {
- assert(session.statements(stmtId1).state.get() == StatementState.Cancelled)
- session.statements(stmtId1).output should include (
- "Job 0 cancelled part of cancelled job group 0")
- }
- }
-
- it should "correctly calculate progress" in withSession { session =>
- val executeCode =
- """
- |sc.parallelize(1 to 2, 2).map(i => (i, 1)).collect()
- """.stripMargin
-
- val stmtId = session.execute(executeCode)
- eventually(timeout(30 seconds), interval(100 millis)) {
- session.progressOfStatement(stmtId) should be(1.0)
- }
- }
-
- it should "not generate Spark jobs for plain Scala code" in withSession { session =>
- val executeCode = """1 + 1"""
-
- val stmtId = session.execute(executeCode)
- session.progressOfStatement(stmtId) should be (0.0)
- }
-
- it should "handle multiple jobs in one statement" in withSession { session =>
- val executeCode =
- """
- |sc.parallelize(1 to 2, 2).map(i => (i, 1)).collect()
- |sc.parallelize(1 to 2, 2).map(i => (i, 1)).collect()
- """.stripMargin
-
- val stmtId = session.execute(executeCode)
- eventually(timeout(30 seconds), interval(100 millis)) {
- session.progressOfStatement(stmtId) should be(1.0)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/org/apache/livy/repl/BaseInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/BaseInterpreterSpec.scala b/repl/src/test/scala/org/apache/livy/repl/BaseInterpreterSpec.scala
new file mode 100644
index 0000000..b3d4d5a
--- /dev/null
+++ b/repl/src/test/scala/org/apache/livy/repl/BaseInterpreterSpec.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.repl
+
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.livy.LivyBaseUnitTestSuite
+
+abstract class BaseInterpreterSpec extends FlatSpec with Matchers with LivyBaseUnitTestSuite {
+
+ def createInterpreter(): Interpreter
+
+ def withInterpreter(testCode: Interpreter => Any): Unit = {
+ val interpreter = createInterpreter()
+ try {
+ interpreter.start()
+ testCode(interpreter)
+ } finally {
+ interpreter.close()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/org/apache/livy/repl/BaseSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/BaseSessionSpec.scala b/repl/src/test/scala/org/apache/livy/repl/BaseSessionSpec.scala
new file mode 100644
index 0000000..a13f924
--- /dev/null
+++ b/repl/src/test/scala/org/apache/livy/repl/BaseSessionSpec.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.repl
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.json4s._
+import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.livy.LivyBaseUnitTestSuite
+import org.apache.livy.rsc.RSCConf
+import org.apache.livy.rsc.driver.{Statement, StatementState}
+import org.apache.livy.sessions.SessionState
+
+abstract class BaseSessionSpec extends FlatSpec with Matchers with LivyBaseUnitTestSuite {
+
+ implicit val formats = DefaultFormats
+
+ private val rscConf = new RSCConf(new Properties())
+
+ protected def execute(session: Session)(code: String): Statement = {
+ val id = session.execute(code)
+ eventually(timeout(30 seconds), interval(100 millis)) {
+ val s = session.statements(id)
+ s.state.get() shouldBe StatementState.Available
+ s
+ }
+ }
+
+ protected def withSession(testCode: Session => Any): Unit = {
+ val stateChangedCalled = new AtomicInteger()
+ val session =
+ new Session(rscConf, createInterpreter(), { _ => stateChangedCalled.incrementAndGet() })
+ try {
+ // Session's constructor should fire an initial state change event.
+ stateChangedCalled.intValue() shouldBe 1
+ Await.ready(session.start(), 30 seconds)
+ assert(session.state === SessionState.Idle())
+ // There should be at least 1 state change event fired when session transits to idle.
+ stateChangedCalled.intValue() should (be > 1)
+ testCode(session)
+ } finally {
+ session.close()
+ }
+ }
+
+ protected def createInterpreter(): Interpreter
+
+ it should "start in the starting or idle state" in {
+ val session = new Session(rscConf, createInterpreter())
+ val future = session.start()
+ try {
+ eventually(timeout(30 seconds), interval(100 millis)) {
+ session.state should (equal (SessionState.Starting()) or equal (SessionState.Idle()))
+ }
+ Await.ready(future, 60 seconds)
+ } finally {
+ session.close()
+ }
+ }
+
+ it should "eventually become the idle state" in withSession { session =>
+ session.state should equal (SessionState.Idle())
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala b/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
new file mode 100644
index 0000000..1404765
--- /dev/null
+++ b/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.repl
+
+import org.apache.spark.SparkConf
+import org.json4s.{DefaultFormats, JNull, JValue}
+import org.json4s.JsonDSL._
+import org.scalatest._
+
+import org.apache.livy.rsc.RSCConf
+import org.apache.livy.sessions._
+
+abstract class PythonBaseInterpreterSpec extends BaseInterpreterSpec {
+
+ it should "execute `1 + 2` == 3" in withInterpreter { interpreter =>
+ val response = interpreter.execute("1 + 2")
+ response should equal (Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> "3"
+ ))
+ }
+
+ it should "execute multiple statements" in withInterpreter { interpreter =>
+ var response = interpreter.execute("x = 1")
+ response should equal (Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> ""
+ ))
+
+ response = interpreter.execute("y = 2")
+ response should equal (Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> ""
+ ))
+
+ response = interpreter.execute("x + y")
+ response should equal (Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> "3"
+ ))
+ }
+
+ it should "execute multiple statements in one block" in withInterpreter { interpreter =>
+ val response = interpreter.execute(
+ """
+ |x = 1
+ |
+ |y = 2
+ |
+ |x + y
+ """.stripMargin)
+ response should equal(Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> "3"
+ ))
+ }
+
+ it should "parse a class" in withInterpreter { interpreter =>
+ val response = interpreter.execute(
+ """
+ |class Counter(object):
+ | def __init__(self):
+ | self.count = 0
+ |
+ | def add_one(self):
+ | self.count += 1
+ |
+ | def add_two(self):
+ | self.count += 2
+ |
+ |counter = Counter()
+ |counter.add_one()
+ |counter.add_two()
+ |counter.count
+ """.stripMargin)
+ response should equal(Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> "3"
+ ))
+ }
+
+ it should "do json magic" in withInterpreter { interpreter =>
+ val response = interpreter.execute(
+ """x = [[1, 'a'], [3, 'b']]
+ |%json x
+ """.stripMargin)
+
+ response should equal(Interpreter.ExecuteSuccess(
+ APPLICATION_JSON -> List[JValue](
+ List[JValue](1, "a"),
+ List[JValue](3, "b")
+ )
+ ))
+ }
+
+ it should "do table magic" in withInterpreter { interpreter =>
+ val response = interpreter.execute(
+ """x = [[1, 'a'], [3, 'b']]
+ |%table x
+ """.stripMargin)
+
+ response should equal(Interpreter.ExecuteSuccess(
+ APPLICATION_LIVY_TABLE_JSON -> (
+ ("headers" -> List(
+ ("type" -> "INT_TYPE") ~ ("name" -> "0"),
+ ("type" -> "STRING_TYPE") ~ ("name" -> "1")
+ )) ~
+ ("data" -> List(
+ List[JValue](1, "a"),
+ List[JValue](3, "b")
+ ))
+ )
+ ))
+ }
+
+ it should "do table magic with None type value" in withInterpreter { interpreter =>
+ val response = interpreter.execute(
+ """x = [{"a":"1", "b":None}, {"a":"2", "b":2}]
+ |%table x
+ """.stripMargin)
+
+ response should equal(Interpreter.ExecuteSuccess(
+ APPLICATION_LIVY_TABLE_JSON -> (
+ ("headers" -> List(
+ ("type" -> "STRING_TYPE") ~ ("name" -> "a"),
+ ("type" -> "INT_TYPE") ~ ("name" -> "b")
+ )) ~
+ ("data" -> List(
+ List[JValue]("1", JNull),
+ List[JValue]("2", 2)
+ ))
+ )
+ ))
+ }
+
+ it should "do table magic with None type Row" in withInterpreter { interpreter =>
+ val response = interpreter.execute(
+ """x = [{"a":None, "b":None}, {"a":"2", "b":2}]
+ |%table x
+ """.stripMargin)
+
+ response should equal(Interpreter.ExecuteSuccess(
+ APPLICATION_LIVY_TABLE_JSON -> (
+ ("headers" -> List(
+ ("type" -> "STRING_TYPE") ~ ("name" -> "a"),
+ ("type" -> "INT_TYPE") ~ ("name" -> "b")
+ )) ~
+ ("data" -> List(
+ List[JValue](JNull, JNull),
+ List[JValue]("2", 2)
+ ))
+ )
+ ))
+ }
+
+ it should "allow magic inside statements" in withInterpreter { interpreter =>
+ val response = interpreter.execute(
+ """x = [[1, 'a'], [3, 'b']]
+ |%table x
+ |1 + 2
+ """.stripMargin)
+
+ response should equal(Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> "3"
+ ))
+ }
+
+ it should "capture stdout" in withInterpreter { interpreter =>
+ val response = interpreter.execute("print('Hello World')")
+ response should equal(Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> "Hello World"
+ ))
+ }
+
+ it should "report an error if accessing an unknown variable" in withInterpreter { interpreter =>
+ val response = interpreter.execute("x")
+ response should equal(Interpreter.ExecuteError(
+ "NameError",
+ "name 'x' is not defined",
+ List(
+ "Traceback (most recent call last):\n",
+ "NameError: name 'x' is not defined\n"
+ )
+ ))
+ }
+
+ it should "report an error if empty magic command" in withInterpreter { interpreter =>
+ val response = interpreter.execute("%")
+ response should equal(Interpreter.ExecuteError(
+ "UnknownMagic",
+ "magic command not specified",
+ List("UnknownMagic: magic command not specified\n")
+ ))
+ }
+
+ it should "report an error if unknown magic command" in withInterpreter { interpreter =>
+ val response = interpreter.execute("%foo")
+ response should equal(Interpreter.ExecuteError(
+ "UnknownMagic",
+ "unknown magic command 'foo'",
+ List("UnknownMagic: unknown magic command 'foo'\n")
+ ))
+ }
+
+ it should "not execute part of the block if there is a syntax error" in withInterpreter { intp =>
+ var response = intp.execute(
+ """x = 1
+ |'
+ """.stripMargin)
+
+ response should equal(Interpreter.ExecuteError(
+ "SyntaxError",
+ "EOL while scanning string literal (<stdin>, line 2)",
+ List(
+ " File \"<stdin>\", line 2\n",
+ " '\n",
+ " ^\n",
+ "SyntaxError: EOL while scanning string literal\n"
+ )
+ ))
+
+ response = intp.execute("x")
+ response should equal(Interpreter.ExecuteError(
+ "NameError",
+ "name 'x' is not defined",
+ List(
+ "Traceback (most recent call last):\n",
+ "NameError: name 'x' is not defined\n"
+ )
+ ))
+ }
+}
+
+class Python2InterpreterSpec extends PythonBaseInterpreterSpec {
+
+ implicit val formats = DefaultFormats
+
+ override def createInterpreter(): Interpreter = PythonInterpreter(new SparkConf(), PySpark())
+
+ // Scalastyle is treating unicode escape as non ascii characters. Turn off the check.
+ // scalastyle:off non.ascii.character.disallowed
+ it should "print unicode correctly" in withInterpreter { intp =>
+ intp.execute("print(u\"\u263A\")") should equal(Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> "\u263A"
+ ))
+ intp.execute("""print(u"\u263A")""") should equal(Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> "\u263A"
+ ))
+ intp.execute("""print("\xE2\x98\xBA")""") should equal(Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> "\u263A"
+ ))
+ }
+ // scalastyle:on non.ascii.character.disallowed
+}
+
+class Python3InterpreterSpec extends PythonBaseInterpreterSpec {
+
+ implicit val formats = DefaultFormats
+
+ override protected def withFixture(test: NoArgTest): Outcome = {
+ assume(!sys.props.getOrElse("skipPySpark3Tests", "false").toBoolean, "Skipping PySpark3 tests.")
+ test()
+ }
+
+ override def createInterpreter(): Interpreter = PythonInterpreter(new SparkConf(), PySpark3())
+
+ it should "check python version is 3.x" in withInterpreter { interpreter =>
+ val response = interpreter.execute("""import sys
+ |sys.version >= '3'
+ """.stripMargin)
+ response should equal (Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> "True"
+ ))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/org/apache/livy/repl/PythonSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/PythonSessionSpec.scala b/repl/src/test/scala/org/apache/livy/repl/PythonSessionSpec.scala
new file mode 100644
index 0000000..0883f27
--- /dev/null
+++ b/repl/src/test/scala/org/apache/livy/repl/PythonSessionSpec.scala
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.repl
+
+import org.apache.spark.SparkConf
+import org.json4s.Extraction
+import org.json4s.jackson.JsonMethods.parse
+import org.scalatest._
+
+import org.apache.livy.rsc.RSCConf
+import org.apache.livy.sessions._
+
+abstract class PythonSessionSpec extends BaseSessionSpec {
+
+ it should "execute `1 + 2` == 3" in withSession { session =>
+ val statement = execute(session)("1 + 2")
+ statement.id should equal (0)
+
+ val result = parse(statement.output)
+ val expectedResult = Extraction.decompose(Map(
+ "status" -> "ok",
+ "execution_count" -> 0,
+ "data" -> Map(
+ "text/plain" -> "3"
+ )
+ ))
+
+ result should equal (expectedResult)
+ }
+
+ it should "execute `x = 1`, then `y = 2`, then `x + y`" in withSession { session =>
+ val executeWithSession = execute(session)(_)
+ var statement = executeWithSession("x = 1")
+ statement.id should equal (0)
+
+ var result = parse(statement.output)
+ var expectedResult = Extraction.decompose(Map(
+ "status" -> "ok",
+ "execution_count" -> 0,
+ "data" -> Map(
+ "text/plain" -> ""
+ )
+ ))
+
+ result should equal (expectedResult)
+
+ statement = executeWithSession("y = 2")
+ statement.id should equal (1)
+
+ result = parse(statement.output)
+ expectedResult = Extraction.decompose(Map(
+ "status" -> "ok",
+ "execution_count" -> 1,
+ "data" -> Map(
+ "text/plain" -> ""
+ )
+ ))
+
+ result should equal (expectedResult)
+
+ statement = executeWithSession("x + y")
+ statement.id should equal (2)
+
+ result = parse(statement.output)
+ expectedResult = Extraction.decompose(Map(
+ "status" -> "ok",
+ "execution_count" -> 2,
+ "data" -> Map(
+ "text/plain" -> "3"
+ )
+ ))
+
+ result should equal (expectedResult)
+ }
+
+ it should "do table magic" in withSession { session =>
+ val statement = execute(session)("x = [[1, 'a'], [3, 'b']]\n%table x")
+ statement.id should equal (0)
+
+ val result = parse(statement.output)
+ val expectedResult = Extraction.decompose(Map(
+ "status" -> "ok",
+ "execution_count" -> 0,
+ "data" -> Map(
+ "application/vnd.livy.table.v1+json" -> Map(
+ "headers" -> List(
+ Map("type" -> "INT_TYPE", "name" -> "0"),
+ Map("type" -> "STRING_TYPE", "name" -> "1")),
+ "data" -> List(List(1, "a"), List(3, "b"))
+ )
+ )
+ ))
+
+ result should equal (expectedResult)
+ }
+
+ it should "capture stdout" in withSession { session =>
+ val statement = execute(session)("""print('Hello World')""")
+ statement.id should equal (0)
+
+ val result = parse(statement.output)
+ val expectedResult = Extraction.decompose(Map(
+ "status" -> "ok",
+ "execution_count" -> 0,
+ "data" -> Map(
+ "text/plain" -> "Hello World"
+ )
+ ))
+
+ result should equal (expectedResult)
+ }
+
+ it should "report an error if accessing an unknown variable" in withSession { session =>
+ val statement = execute(session)("""x""")
+ statement.id should equal (0)
+
+ val result = parse(statement.output)
+ val expectedResult = Extraction.decompose(Map(
+ "status" -> "error",
+ "execution_count" -> 0,
+ "traceback" -> List(
+ "Traceback (most recent call last):\n",
+ "NameError: name 'x' is not defined\n"
+ ),
+ "ename" -> "NameError",
+ "evalue" -> "name 'x' is not defined"
+ ))
+
+ result should equal (expectedResult)
+ }
+
+ it should "report an error if exception is thrown" in withSession { session =>
+ val statement = execute(session)(
+ """def func1():
+ | raise Exception("message")
+ |def func2():
+ | func1()
+ |func2()
+ """.stripMargin)
+ statement.id should equal (0)
+
+ val result = parse(statement.output)
+ val expectedResult = Extraction.decompose(Map(
+ "status" -> "error",
+ "execution_count" -> 0,
+ "traceback" -> List(
+ "Traceback (most recent call last):\n",
+ " File \"<stdin>\", line 4, in func2\n",
+ " File \"<stdin>\", line 2, in func1\n",
+ "Exception: message\n"
+ ),
+ "ename" -> "Exception",
+ "evalue" -> "message"
+ ))
+
+ result should equal (expectedResult)
+ }
+}
+
+class Python2SessionSpec extends PythonSessionSpec {
+ override def createInterpreter(): Interpreter = PythonInterpreter(new SparkConf(), PySpark())
+}
+
+class Python3SessionSpec extends PythonSessionSpec {
+
+ override protected def withFixture(test: NoArgTest): Outcome = {
+ assume(!sys.props.getOrElse("skipPySpark3Tests", "false").toBoolean, "Skipping PySpark3 tests.")
+ test()
+ }
+
+ override def createInterpreter(): Interpreter = PythonInterpreter(new SparkConf(), PySpark3())
+
+ it should "check python version is 3.x" in withSession { session =>
+ val statement = execute(session)(
+ """import sys
+ |sys.version >= '3'
+ """.stripMargin)
+ statement.id should equal (0)
+
+ val result = parse(statement.output)
+ val expectedResult = Extraction.decompose(Map(
+ "status" -> "ok",
+ "execution_count" -> 0,
+ "data" -> Map(
+ "text/plain" -> "True"
+ )
+ ))
+
+ result should equal (expectedResult)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/org/apache/livy/repl/ReplDriverSuite.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/ReplDriverSuite.scala b/repl/src/test/scala/org/apache/livy/repl/ReplDriverSuite.scala
new file mode 100644
index 0000000..6537f0c
--- /dev/null
+++ b/repl/src/test/scala/org/apache/livy/repl/ReplDriverSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.repl
+
+import java.net.URI
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.apache.spark.launcher.SparkLauncher
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+import org.scalatest.FunSuite
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.livy._
+import org.apache.livy.rsc.{PingJob, RSCClient, RSCConf}
+import org.apache.livy.sessions.Spark
+
+class ReplDriverSuite extends FunSuite with LivyBaseUnitTestSuite {
+
+ private implicit val formats = DefaultFormats
+
+ test("start a repl session using the rsc") {
+ val client = new LivyClientBuilder()
+ .setConf(SparkLauncher.DRIVER_MEMORY, "512m")
+ .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, sys.props("java.class.path"))
+ .setConf(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH, sys.props("java.class.path"))
+ .setConf(RSCConf.Entry.LIVY_JARS.key(), "")
+ .setURI(new URI("rsc:/"))
+ .setConf(RSCConf.Entry.DRIVER_CLASS.key(), classOf[ReplDriver].getName())
+ .setConf(RSCConf.Entry.SESSION_KIND.key(), Spark().toString)
+ .build()
+ .asInstanceOf[RSCClient]
+
+ try {
+ // This is sort of what InteractiveSession.scala does to detect an idle session.
+ client.submit(new PingJob()).get(60, TimeUnit.SECONDS)
+
+ val statementId = client.submitReplCode("1 + 1").get
+ eventually(timeout(30 seconds), interval(100 millis)) {
+ val rawResult =
+ client.getReplJobResults(statementId, 1).get(10, TimeUnit.SECONDS).statements(0)
+ val result = rawResult.output
+ assert((parse(result) \ Session.STATUS).extract[String] === Session.OK)
+ }
+ } finally {
+ client.stop(true)
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/org/apache/livy/repl/ScalaInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/ScalaInterpreterSpec.scala b/repl/src/test/scala/org/apache/livy/repl/ScalaInterpreterSpec.scala
new file mode 100644
index 0000000..3e9ee82
--- /dev/null
+++ b/repl/src/test/scala/org/apache/livy/repl/ScalaInterpreterSpec.scala
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.repl
+
+import org.apache.spark.SparkConf
+import org.json4s.{DefaultFormats, JValue}
+import org.json4s.JsonDSL._
+
+import org.apache.livy.rsc.RSCConf
+
+class ScalaInterpreterSpec extends BaseInterpreterSpec {
+
+ implicit val formats = DefaultFormats
+
+ override def createInterpreter(): Interpreter =
+ new SparkInterpreter(new SparkConf())
+
+ it should "execute `1 + 2` == 3" in withInterpreter { interpreter =>
+ val response = interpreter.execute("1 + 2")
+ response should equal (Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> "res0: Int = 3"
+ ))
+ }
+
+ it should "execute multiple statements" in withInterpreter { interpreter =>
+ var response = interpreter.execute("val x = 1")
+ response should equal (Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> "x: Int = 1"
+ ))
+
+ response = interpreter.execute("val y = 2")
+ response should equal (Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> "y: Int = 2"
+ ))
+
+ response = interpreter.execute("x + y")
+ response should equal (Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> "res0: Int = 3"
+ ))
+ }
+
+ it should "execute multiple statements in one block" in withInterpreter { interpreter =>
+ val response = interpreter.execute(
+ """
+ |val x = 1
+ |
+ |val y = 2
+ |
+ |x + y
+ """.stripMargin)
+ response should equal(Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> "res2: Int = 3"
+ ))
+ }
+
+ it should "do table magic" in withInterpreter { interpreter =>
+ val response = interpreter.execute(
+ """val x = List(List(1, "a"), List(3, "b"))
+ |%table x
+ """.stripMargin)
+
+ response should equal(Interpreter.ExecuteSuccess(
+ APPLICATION_LIVY_TABLE_JSON -> (
+ ("headers" -> List(
+ ("type" -> "BIGINT_TYPE") ~ ("name" -> "0"),
+ ("type" -> "STRING_TYPE") ~ ("name" -> "1")
+ )) ~
+ ("data" -> List(
+ List[JValue](1, "a"),
+ List[JValue](3, "b")
+ ))
+ )
+ ))
+ }
+
+ it should "allow magic inside statements" in withInterpreter { interpreter =>
+ val response = interpreter.execute(
+ """val x = List(List(1, "a"), List(3, "b"))
+ |%table x
+ |1 + 2
+ """.stripMargin)
+
+ response should equal(Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> "res0: Int = 3"
+ ))
+ }
+
+ it should "capture stdout" in withInterpreter { interpreter =>
+ val response = interpreter.execute("println(\"Hello World\")")
+ response should equal(Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> "Hello World"
+ ))
+ }
+
+ it should "report an error if accessing an unknown variable" in withInterpreter { interpreter =>
+ interpreter.execute("x") match {
+ case Interpreter.ExecuteError(ename, evalue, _) =>
+ ename should equal ("Error")
+ evalue should include ("error: not found: value x")
+
+ case other =>
+ fail(s"Expected error, got $other.")
+ }
+ }
+
+ it should "execute spark commands" in withInterpreter { interpreter =>
+ val response = interpreter.execute(
+ """sc.parallelize(0 to 1).map { i => i+1 }.collect""".stripMargin)
+
+ response should equal(Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> "res0: Array[Int] = Array(1, 2)"
+ ))
+ }
+
+ it should "handle statements ending with comments" in withInterpreter { interpreter =>
+ // Test statements with only comments
+ var response = interpreter.execute("""// comment""")
+ response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> ""))
+
+ response = interpreter.execute(
+ """/*
+ |comment
+ |*/
+ """.stripMargin)
+ response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> ""))
+
+ // Test statements ending with comments
+ response = interpreter.execute(
+ """val r = 1
+ |// comment
+ """.stripMargin)
+ response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
+
+ response = interpreter.execute(
+ """val r = 1
+ |/*
+ |comment
+ |comment
+ |*/
+ """.stripMargin)
+ response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
+
+ // Test statements ending with a mix of single line and multi-line comments
+ response = interpreter.execute(
+ """val r = 1
+ |// comment
+ |/*
+ |comment
+ |comment
+ |*/
+ |// comment
+ """.stripMargin)
+ response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
+
+ response = interpreter.execute(
+ """val r = 1
+ |/*
+ |comment
+ |// comment
+ |comment
+ |*/
+ """.stripMargin)
+ response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
+
+ // Make sure incomplete statement is still returned as incomplete statement.
+ response = interpreter.execute("sc.")
+ response should equal(Interpreter.ExecuteIncomplete())
+
+ // Make sure incomplete statement is still returned as incomplete statement.
+ response = interpreter.execute(
+ """sc.
+ |// comment
+ """.stripMargin)
+ response should equal(Interpreter.ExecuteIncomplete())
+
+ // Make sure our handling doesn't mess up a string with value like comments.
+ val tripleQuotes = "\"\"\""
+ val stringWithComment = s"/*\ncomment\n*/\n//comment"
+ response = interpreter.execute(s"val r = $tripleQuotes$stringWithComment$tripleQuotes")
+
+ try {
+ response should equal(
+ Interpreter.ExecuteSuccess(TEXT_PLAIN -> s"r: String = \n$stringWithComment"))
+ } catch {
+ case _: Exception =>
+ response should equal(
+ // Scala 2.11 doesn't have a " " after "="
+ Interpreter.ExecuteSuccess(TEXT_PLAIN -> s"r: String =\n$stringWithComment"))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/org/apache/livy/repl/SessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/SessionSpec.scala b/repl/src/test/scala/org/apache/livy/repl/SessionSpec.scala
new file mode 100644
index 0000000..090b7cb
--- /dev/null
+++ b/repl/src/test/scala/org/apache/livy/repl/SessionSpec.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.repl
+
+import java.util.Properties
+import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch, TimeUnit}
+
+import org.mockito.Mockito.when
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.FunSpec
+import org.scalatest.Matchers._
+import org.scalatest.concurrent.Eventually
+import org.scalatest.mock.MockitoSugar.mock
+import org.scalatest.time._
+
+import org.apache.livy.LivyBaseUnitTestSuite
+import org.apache.livy.repl.Interpreter.ExecuteResponse
+import org.apache.livy.rsc.RSCConf
+
+class SessionSpec extends FunSpec with Eventually with LivyBaseUnitTestSuite {
+ override implicit val patienceConfig =
+ PatienceConfig(timeout = scaled(Span(10, Seconds)), interval = scaled(Span(100, Millis)))
+
+ private val rscConf = new RSCConf(new Properties())
+
+ describe("Session") {
+ it("should call state changed callbacks in happy path") {
+ val expectedStateTransitions =
+ Array("not_started", "starting", "idle", "busy", "idle", "busy", "idle")
+ val actualStateTransitions = new ConcurrentLinkedQueue[String]()
+
+ val interpreter = mock[Interpreter]
+ when(interpreter.kind).thenAnswer(new Answer[String] {
+ override def answer(invocationOnMock: InvocationOnMock): String = "spark"
+ })
+
+ val session =
+ new Session(rscConf, interpreter, { s => actualStateTransitions.add(s.toString) })
+
+ session.start()
+
+ session.execute("")
+
+ eventually {
+ actualStateTransitions.toArray shouldBe expectedStateTransitions
+ }
+ }
+
+ it("should not transit to idle if there're any pending statements.") {
+ val expectedStateTransitions =
+ Array("not_started", "busy", "busy", "busy", "idle", "busy", "idle")
+ val actualStateTransitions = new ConcurrentLinkedQueue[String]()
+
+ val interpreter = mock[Interpreter]
+ when(interpreter.kind).thenAnswer(new Answer[String] {
+ override def answer(invocationOnMock: InvocationOnMock): String = "spark"
+ })
+
+ val blockFirstExecuteCall = new CountDownLatch(1)
+ when(interpreter.execute("")).thenAnswer(new Answer[Interpreter.ExecuteResponse] {
+ override def answer(invocation: InvocationOnMock): ExecuteResponse = {
+ blockFirstExecuteCall.await(10, TimeUnit.SECONDS)
+ null
+ }
+ })
+ val session =
+ new Session(rscConf, interpreter, { s => actualStateTransitions.add(s.toString) })
+
+ for (_ <- 1 to 2) {
+ session.execute("")
+ }
+
+ blockFirstExecuteCall.countDown()
+ eventually {
+ actualStateTransitions.toArray shouldBe expectedStateTransitions
+ }
+ }
+
+ it("should remove old statements when reaching threshold") {
+ val interpreter = mock[Interpreter]
+ when(interpreter.kind).thenAnswer(new Answer[String] {
+ override def answer(invocationOnMock: InvocationOnMock): String = "spark"
+ })
+
+ rscConf.set(RSCConf.Entry.RETAINED_STATEMENT_NUMBER, 2)
+ val session = new Session(rscConf, interpreter)
+ session.start()
+
+ session.statements.size should be (0)
+ session.execute("")
+ session.statements.size should be (1)
+ session.statements.map(_._1).toSet should be (Set(0))
+ session.execute("")
+ session.statements.size should be (2)
+ session.statements.map(_._1).toSet should be (Set(0, 1))
+ session.execute("")
+ eventually {
+ session.statements.size should be (2)
+ session.statements.map(_._1).toSet should be (Set(1, 2))
+ }
+
+ // Continue submitting statements, total statements in memory should be 2.
+ session.execute("")
+ eventually {
+ session.statements.size should be (2)
+ session.statements.map(_._1).toSet should be (Set(2, 3))
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/org/apache/livy/repl/SparkRInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/SparkRInterpreterSpec.scala b/repl/src/test/scala/org/apache/livy/repl/SparkRInterpreterSpec.scala
new file mode 100644
index 0000000..374afbc
--- /dev/null
+++ b/repl/src/test/scala/org/apache/livy/repl/SparkRInterpreterSpec.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.repl
+
+import org.apache.spark.SparkConf
+import org.json4s.{DefaultFormats, JValue}
+import org.json4s.JsonDSL._
+import org.scalatest._
+
+import org.apache.livy.rsc.RSCConf
+
+class SparkRInterpreterSpec extends BaseInterpreterSpec {
+
+ implicit val formats = DefaultFormats
+
+ override protected def withFixture(test: NoArgTest): Outcome = {
+ assume(!sys.props.getOrElse("skipRTests", "false").toBoolean, "Skipping R tests.")
+ super.withFixture(test)
+ }
+
+ override def createInterpreter(): Interpreter = SparkRInterpreter(new SparkConf())
+
+ it should "execute `1 + 2` == 3" in withInterpreter { interpreter =>
+ val response = interpreter.execute("1 + 2")
+ response should equal (Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> "[1] 3"
+ ))
+ }
+
+ it should "execute multiple statements" in withInterpreter { interpreter =>
+ var response = interpreter.execute("x = 1")
+ response should equal (Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> ""
+ ))
+
+ response = interpreter.execute("y = 2")
+ response should equal (Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> ""
+ ))
+
+ response = interpreter.execute("x + y")
+ response should equal (Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> "[1] 3"
+ ))
+ }
+
+ it should "execute multiple statements in one block" in withInterpreter { interpreter =>
+ val response = interpreter.execute(
+ """
+ |x = 1
+ |
+ |y = 2
+ |
+ |x + y
+ """.stripMargin)
+ response should equal(Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> "[1] 3"
+ ))
+ }
+
+ it should "capture stdout" in withInterpreter { interpreter =>
+ val response = interpreter.execute("cat(3)")
+ response should equal(Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> "3"
+ ))
+ }
+
+ it should "report an error if accessing an unknown variable" in withInterpreter { interpreter =>
+ val response = interpreter.execute("x")
+ assert(response.isInstanceOf[Interpreter.ExecuteError])
+ val errorResponse = response.asInstanceOf[Interpreter.ExecuteError]
+ errorResponse.ename should be ("Error")
+ assert(errorResponse.evalue.contains("object 'x' not found"))
+ }
+
+
+ it should "not hang when executing incomplete statements" in withInterpreter { interpreter =>
+ val response = interpreter.execute("x[")
+ response should equal(Interpreter.ExecuteError(
+ "Error",
+ """[1] "Error in parse(text = \"x[\"): <text>:2:0: unexpected end of input\n1: x[\n ^""""
+ ))
+ }
+
+ it should "escape the statement" in withInterpreter { interpreter =>
+ val response = interpreter.execute("print(\"a\")")
+ response should equal(Interpreter.ExecuteSuccess(
+ TEXT_PLAIN -> "[1] \"a\""
+ ))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/org/apache/livy/repl/SparkRSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/SparkRSessionSpec.scala b/repl/src/test/scala/org/apache/livy/repl/SparkRSessionSpec.scala
new file mode 100644
index 0000000..42ed60a
--- /dev/null
+++ b/repl/src/test/scala/org/apache/livy/repl/SparkRSessionSpec.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.repl
+
+import org.apache.spark.SparkConf
+import org.json4s.Extraction
+import org.json4s.jackson.JsonMethods.parse
+
+import org.apache.livy.rsc.RSCConf
+
+class SparkRSessionSpec extends BaseSessionSpec {
+
+ override protected def withFixture(test: NoArgTest) = {
+ assume(!sys.props.getOrElse("skipRTests", "false").toBoolean, "Skipping R tests.")
+ super.withFixture(test)
+ }
+
+ override def createInterpreter(): Interpreter = SparkRInterpreter(new SparkConf())
+
+ it should "execute `1 + 2` == 3" in withSession { session =>
+ val statement = execute(session)("1 + 2")
+ statement.id should equal(0)
+
+ val result = parse(statement.output)
+ val expectedResult = Extraction.decompose(Map(
+ "status" -> "ok",
+ "execution_count" -> 0,
+ "data" -> Map(
+ "text/plain" -> "[1] 3"
+ )
+ ))
+
+ result should equal(expectedResult)
+ }
+
+ it should "execute `x = 1`, then `y = 2`, then `x + y`" in withSession { session =>
+ val executeWithSession = execute(session)(_)
+ var statement = executeWithSession("x = 1")
+ statement.id should equal (0)
+
+ var result = parse(statement.output)
+ var expectedResult = Extraction.decompose(Map(
+ "status" -> "ok",
+ "execution_count" -> 0,
+ "data" -> Map(
+ "text/plain" -> ""
+ )
+ ))
+
+ result should equal (expectedResult)
+
+ statement = executeWithSession("y = 2")
+ statement.id should equal (1)
+
+ result = parse(statement.output)
+ expectedResult = Extraction.decompose(Map(
+ "status" -> "ok",
+ "execution_count" -> 1,
+ "data" -> Map(
+ "text/plain" -> ""
+ )
+ ))
+
+ result should equal (expectedResult)
+
+ statement = executeWithSession("x + y")
+ statement.id should equal (2)
+
+ result = parse(statement.output)
+ expectedResult = Extraction.decompose(Map(
+ "status" -> "ok",
+ "execution_count" -> 2,
+ "data" -> Map(
+ "text/plain" -> "[1] 3"
+ )
+ ))
+
+ result should equal (expectedResult)
+ }
+
+ it should "capture stdout from print" in withSession { session =>
+ val statement = execute(session)("""print('Hello World')""")
+ statement.id should equal (0)
+
+ val result = parse(statement.output)
+ val expectedResult = Extraction.decompose(Map(
+ "status" -> "ok",
+ "execution_count" -> 0,
+ "data" -> Map(
+ "text/plain" -> "[1] \"Hello World\""
+ )
+ ))
+
+ result should equal (expectedResult)
+ }
+
+ it should "capture stdout from cat" in withSession { session =>
+ val statement = execute(session)("""cat(3)""")
+ statement.id should equal (0)
+
+ val result = parse(statement.output)
+ val expectedResult = Extraction.decompose(Map(
+ "status" -> "ok",
+ "execution_count" -> 0,
+ "data" -> Map(
+ "text/plain" -> "3"
+ )
+ ))
+
+ result should equal (expectedResult)
+ }
+
+ it should "report an error if accessing an unknown variable" in withSession { session =>
+ val statement = execute(session)("""x""")
+ statement.id should equal (0)
+
+ val result = parse(statement.output)
+ (result \ "status").extract[String] should be ("error")
+ (result \ "execution_count").extract[Int] should be (0)
+ (result \ "ename").extract[String] should be ("Error")
+ assert((result \ "evalue").extract[String].contains("object 'x' not found"))
+ (result \ "traceback").extract[List[String]] should be (List())
+ }
+
+}