You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/07/05 03:06:54 UTC

[25/33] incubator-livy git commit: LIVY-375. Change Livy code package name to org.apache.livy

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/com/cloudera/livy/repl/ReplDriverSuite.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/com/cloudera/livy/repl/ReplDriverSuite.scala b/repl/src/test/scala/com/cloudera/livy/repl/ReplDriverSuite.scala
deleted file mode 100644
index c2eca91..0000000
--- a/repl/src/test/scala/com/cloudera/livy/repl/ReplDriverSuite.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.repl
-
-import java.net.URI
-import java.util.concurrent.TimeUnit
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import org.apache.spark.launcher.SparkLauncher
-import org.json4s._
-import org.json4s.jackson.JsonMethods._
-import org.scalatest.FunSuite
-import org.scalatest.concurrent.Eventually._
-
-import com.cloudera.livy._
-import com.cloudera.livy.rsc.{PingJob, RSCClient, RSCConf}
-import com.cloudera.livy.sessions.Spark
-
-class ReplDriverSuite extends FunSuite with LivyBaseUnitTestSuite {
-
-  private implicit val formats = DefaultFormats
-
-  test("start a repl session using the rsc") {
-    val client = new LivyClientBuilder()
-      .setConf(SparkLauncher.DRIVER_MEMORY, "512m")
-      .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, sys.props("java.class.path"))
-      .setConf(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH, sys.props("java.class.path"))
-      .setConf(RSCConf.Entry.LIVY_JARS.key(), "")
-      .setURI(new URI("rsc:/"))
-      .setConf(RSCConf.Entry.DRIVER_CLASS.key(), classOf[ReplDriver].getName())
-      .setConf(RSCConf.Entry.SESSION_KIND.key(), Spark().toString)
-      .build()
-      .asInstanceOf[RSCClient]
-
-    try {
-      // This is sort of what InteractiveSession.scala does to detect an idle session.
-      client.submit(new PingJob()).get(60, TimeUnit.SECONDS)
-
-      val statementId = client.submitReplCode("1 + 1").get
-      eventually(timeout(30 seconds), interval(100 millis)) {
-        val rawResult =
-          client.getReplJobResults(statementId, 1).get(10, TimeUnit.SECONDS).statements(0)
-        val result = rawResult.output
-        assert((parse(result) \ Session.STATUS).extract[String] === Session.OK)
-      }
-    } finally {
-      client.stop(true)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/com/cloudera/livy/repl/ScalaInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/com/cloudera/livy/repl/ScalaInterpreterSpec.scala b/repl/src/test/scala/com/cloudera/livy/repl/ScalaInterpreterSpec.scala
deleted file mode 100644
index 9df77c3..0000000
--- a/repl/src/test/scala/com/cloudera/livy/repl/ScalaInterpreterSpec.scala
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.repl
-
-import org.apache.spark.SparkConf
-import org.json4s.{DefaultFormats, JValue}
-import org.json4s.JsonDSL._
-
-import com.cloudera.livy.rsc.RSCConf
-
-class ScalaInterpreterSpec extends BaseInterpreterSpec {
-
-  implicit val formats = DefaultFormats
-
-  override def createInterpreter(): Interpreter =
-    new SparkInterpreter(new SparkConf())
-
-  it should "execute `1 + 2` == 3" in withInterpreter { interpreter =>
-    val response = interpreter.execute("1 + 2")
-    response should equal (Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> "res0: Int = 3"
-    ))
-  }
-
-  it should "execute multiple statements" in withInterpreter { interpreter =>
-    var response = interpreter.execute("val x = 1")
-    response should equal (Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> "x: Int = 1"
-    ))
-
-    response = interpreter.execute("val y = 2")
-    response should equal (Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> "y: Int = 2"
-    ))
-
-    response = interpreter.execute("x + y")
-    response should equal (Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> "res0: Int = 3"
-    ))
-  }
-
-  it should "execute multiple statements in one block" in withInterpreter { interpreter =>
-    val response = interpreter.execute(
-      """
-        |val x = 1
-        |
-        |val y = 2
-        |
-        |x + y
-      """.stripMargin)
-    response should equal(Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> "res2: Int = 3"
-    ))
-  }
-
-  it should "do table magic" in withInterpreter { interpreter =>
-    val response = interpreter.execute(
-      """val x = List(List(1, "a"), List(3, "b"))
-        |%table x
-      """.stripMargin)
-
-    response should equal(Interpreter.ExecuteSuccess(
-      APPLICATION_LIVY_TABLE_JSON -> (
-        ("headers" -> List(
-          ("type" -> "BIGINT_TYPE") ~ ("name" -> "0"),
-          ("type" -> "STRING_TYPE") ~ ("name" -> "1")
-        )) ~
-          ("data" -> List(
-            List[JValue](1, "a"),
-            List[JValue](3, "b")
-          ))
-        )
-    ))
-  }
-
-  it should "allow magic inside statements" in withInterpreter { interpreter =>
-    val response = interpreter.execute(
-      """val x = List(List(1, "a"), List(3, "b"))
-        |%table x
-        |1 + 2
-      """.stripMargin)
-
-    response should equal(Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> "res0: Int = 3"
-    ))
-  }
-
-  it should "capture stdout" in withInterpreter { interpreter =>
-    val response = interpreter.execute("println(\"Hello World\")")
-    response should equal(Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> "Hello World"
-    ))
-  }
-
-  it should "report an error if accessing an unknown variable" in withInterpreter { interpreter =>
-    interpreter.execute("x") match {
-      case Interpreter.ExecuteError(ename, evalue, _) =>
-        ename should equal ("Error")
-        evalue should include ("error: not found: value x")
-
-      case other =>
-        fail(s"Expected error, got $other.")
-    }
-  }
-
-  it should "execute spark commands" in withInterpreter { interpreter =>
-    val response = interpreter.execute(
-      """sc.parallelize(0 to 1).map { i => i+1 }.collect""".stripMargin)
-
-    response should equal(Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> "res0: Array[Int] = Array(1, 2)"
-    ))
-  }
-
-  it should "handle statements ending with comments" in withInterpreter { interpreter =>
-    // Test statements with only comments
-    var response = interpreter.execute("""// comment""")
-    response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> ""))
-
-    response = interpreter.execute(
-      """/*
-        |comment
-        |*/
-      """.stripMargin)
-    response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> ""))
-
-    // Test statements ending with comments
-    response = interpreter.execute(
-      """val r = 1
-        |// comment
-      """.stripMargin)
-    response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
-
-    response = interpreter.execute(
-      """val r = 1
-        |/*
-        |comment
-        |comment
-        |*/
-      """.stripMargin)
-    response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
-
-    // Test statements ending with a mix of single line and multi-line comments
-    response = interpreter.execute(
-      """val r = 1
-        |// comment
-        |/*
-        |comment
-        |comment
-        |*/
-        |// comment
-      """.stripMargin)
-    response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
-
-    response = interpreter.execute(
-      """val r = 1
-        |/*
-        |comment
-        |// comment
-        |comment
-        |*/
-      """.stripMargin)
-    response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
-
-    // Make sure incomplete statement is still returned as incomplete statement.
-    response = interpreter.execute("sc.")
-    response should equal(Interpreter.ExecuteIncomplete())
-
-    // Make sure incomplete statement is still returned as incomplete statement.
-    response = interpreter.execute(
-      """sc.
-        |// comment
-      """.stripMargin)
-    response should equal(Interpreter.ExecuteIncomplete())
-
-    // Make sure our handling doesn't mess up a string with value like comments.
-    val tripleQuotes = "\"\"\""
-    val stringWithComment = s"/*\ncomment\n*/\n//comment"
-    response = interpreter.execute(s"val r = $tripleQuotes$stringWithComment$tripleQuotes")
-
-    try {
-      response should equal(
-        Interpreter.ExecuteSuccess(TEXT_PLAIN -> s"r: String = \n$stringWithComment"))
-    } catch {
-      case _: Exception =>
-        response should equal(
-          // Scala 2.11 doesn't have a " " after "="
-          Interpreter.ExecuteSuccess(TEXT_PLAIN -> s"r: String =\n$stringWithComment"))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/com/cloudera/livy/repl/SessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/com/cloudera/livy/repl/SessionSpec.scala b/repl/src/test/scala/com/cloudera/livy/repl/SessionSpec.scala
deleted file mode 100644
index 92d2322..0000000
--- a/repl/src/test/scala/com/cloudera/livy/repl/SessionSpec.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.repl
-
-import java.util.Properties
-import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch, TimeUnit}
-
-import org.mockito.Mockito.when
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
-import org.scalatest.FunSpec
-import org.scalatest.Matchers._
-import org.scalatest.concurrent.Eventually
-import org.scalatest.mock.MockitoSugar.mock
-import org.scalatest.time._
-
-import com.cloudera.livy.LivyBaseUnitTestSuite
-import com.cloudera.livy.repl.Interpreter.ExecuteResponse
-import com.cloudera.livy.rsc.RSCConf
-
-class SessionSpec extends FunSpec with Eventually with LivyBaseUnitTestSuite {
-  override implicit val patienceConfig =
-    PatienceConfig(timeout = scaled(Span(10, Seconds)), interval = scaled(Span(100, Millis)))
-
-  private val rscConf = new RSCConf(new Properties())
-
-  describe("Session") {
-    it("should call state changed callbacks in happy path") {
-      val expectedStateTransitions =
-        Array("not_started", "starting", "idle", "busy", "idle", "busy", "idle")
-      val actualStateTransitions = new ConcurrentLinkedQueue[String]()
-
-      val interpreter = mock[Interpreter]
-      when(interpreter.kind).thenAnswer(new Answer[String] {
-        override def answer(invocationOnMock: InvocationOnMock): String = "spark"
-      })
-
-      val session =
-        new Session(rscConf, interpreter, { s => actualStateTransitions.add(s.toString) })
-
-      session.start()
-
-      session.execute("")
-
-      eventually {
-        actualStateTransitions.toArray shouldBe expectedStateTransitions
-      }
-    }
-
-    it("should not transit to idle if there're any pending statements.") {
-      val expectedStateTransitions =
-        Array("not_started", "busy", "busy", "busy", "idle", "busy", "idle")
-      val actualStateTransitions = new ConcurrentLinkedQueue[String]()
-
-      val interpreter = mock[Interpreter]
-      when(interpreter.kind).thenAnswer(new Answer[String] {
-        override def answer(invocationOnMock: InvocationOnMock): String = "spark"
-      })
-
-      val blockFirstExecuteCall = new CountDownLatch(1)
-      when(interpreter.execute("")).thenAnswer(new Answer[Interpreter.ExecuteResponse] {
-        override def answer(invocation: InvocationOnMock): ExecuteResponse = {
-          blockFirstExecuteCall.await(10, TimeUnit.SECONDS)
-          null
-        }
-      })
-      val session =
-        new Session(rscConf, interpreter, { s => actualStateTransitions.add(s.toString) })
-
-      for (_ <- 1 to 2) {
-        session.execute("")
-      }
-
-      blockFirstExecuteCall.countDown()
-      eventually {
-        actualStateTransitions.toArray shouldBe expectedStateTransitions
-      }
-    }
-
-    it("should remove old statements when reaching threshold") {
-      val interpreter = mock[Interpreter]
-      when(interpreter.kind).thenAnswer(new Answer[String] {
-        override def answer(invocationOnMock: InvocationOnMock): String = "spark"
-      })
-
-      rscConf.set(RSCConf.Entry.RETAINED_STATEMENT_NUMBER, 2)
-      val session = new Session(rscConf, interpreter)
-      session.start()
-
-      session.statements.size should be (0)
-      session.execute("")
-      session.statements.size should be (1)
-      session.statements.map(_._1).toSet should be (Set(0))
-      session.execute("")
-      session.statements.size should be (2)
-      session.statements.map(_._1).toSet should be (Set(0, 1))
-      session.execute("")
-      eventually {
-        session.statements.size should be (2)
-        session.statements.map(_._1).toSet should be (Set(1, 2))
-      }
-
-      // Continue submitting statements, total statements in memory should be 2.
-      session.execute("")
-      eventually {
-        session.statements.size should be (2)
-        session.statements.map(_._1).toSet should be (Set(2, 3))
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala b/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala
deleted file mode 100644
index bde49b1..0000000
--- a/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.repl
-
-import org.apache.spark.SparkConf
-import org.json4s.{DefaultFormats, JValue}
-import org.json4s.JsonDSL._
-import org.scalatest._
-
-import com.cloudera.livy.rsc.RSCConf
-
-class SparkRInterpreterSpec extends BaseInterpreterSpec {
-
-  implicit val formats = DefaultFormats
-
-  override protected def withFixture(test: NoArgTest): Outcome = {
-    assume(!sys.props.getOrElse("skipRTests", "false").toBoolean, "Skipping R tests.")
-    super.withFixture(test)
-  }
-
-  override def createInterpreter(): Interpreter = SparkRInterpreter(new SparkConf())
-
-  it should "execute `1 + 2` == 3" in withInterpreter { interpreter =>
-    val response = interpreter.execute("1 + 2")
-    response should equal (Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> "[1] 3"
-    ))
-  }
-
-  it should "execute multiple statements" in withInterpreter { interpreter =>
-    var response = interpreter.execute("x = 1")
-    response should equal (Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> ""
-    ))
-
-    response = interpreter.execute("y = 2")
-    response should equal (Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> ""
-    ))
-
-    response = interpreter.execute("x + y")
-    response should equal (Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> "[1] 3"
-    ))
-  }
-
-  it should "execute multiple statements in one block" in withInterpreter { interpreter =>
-    val response = interpreter.execute(
-      """
-        |x = 1
-        |
-        |y = 2
-        |
-        |x + y
-      """.stripMargin)
-    response should equal(Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> "[1] 3"
-    ))
-  }
-
-  it should "capture stdout" in withInterpreter { interpreter =>
-    val response = interpreter.execute("cat(3)")
-    response should equal(Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> "3"
-    ))
-  }
-
-  it should "report an error if accessing an unknown variable" in withInterpreter { interpreter =>
-    val response = interpreter.execute("x")
-    assert(response.isInstanceOf[Interpreter.ExecuteError])
-    val errorResponse = response.asInstanceOf[Interpreter.ExecuteError]
-    errorResponse.ename should be ("Error")
-    assert(errorResponse.evalue.contains("object 'x' not found"))
-  }
-
-
-  it should "not hang when executing incomplete statements" in withInterpreter { interpreter =>
-    val response = interpreter.execute("x[")
-    response should equal(Interpreter.ExecuteError(
-      "Error",
-        """[1] "Error in parse(text = \"x[\"): <text>:2:0: unexpected end of input\n1: x[\n   ^""""
-    ))
-  }
-
-  it should "escape the statement" in withInterpreter { interpreter =>
-    val response = interpreter.execute("print(\"a\")")
-    response should equal(Interpreter.ExecuteSuccess(
-      TEXT_PLAIN -> "[1] \"a\""
-    ))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala b/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala
deleted file mode 100644
index 2ca4130..0000000
--- a/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.repl
-
-import org.apache.spark.SparkConf
-import org.json4s.Extraction
-import org.json4s.jackson.JsonMethods.parse
-
-import com.cloudera.livy.rsc.RSCConf
-
-class SparkRSessionSpec extends BaseSessionSpec {
-
-  override protected def withFixture(test: NoArgTest) = {
-    assume(!sys.props.getOrElse("skipRTests", "false").toBoolean, "Skipping R tests.")
-    super.withFixture(test)
-  }
-
-  override def createInterpreter(): Interpreter = SparkRInterpreter(new SparkConf())
-
-  it should "execute `1 + 2` == 3" in withSession { session =>
-    val statement = execute(session)("1 + 2")
-    statement.id should equal(0)
-
-    val result = parse(statement.output)
-    val expectedResult = Extraction.decompose(Map(
-      "status" -> "ok",
-      "execution_count" -> 0,
-      "data" -> Map(
-        "text/plain" -> "[1] 3"
-      )
-    ))
-
-    result should equal(expectedResult)
-  }
-
-  it should "execute `x = 1`, then `y = 2`, then `x + y`" in withSession { session =>
-    val executeWithSession = execute(session)(_)
-    var statement = executeWithSession("x = 1")
-    statement.id should equal (0)
-
-    var result = parse(statement.output)
-    var expectedResult = Extraction.decompose(Map(
-      "status" -> "ok",
-      "execution_count" -> 0,
-      "data" -> Map(
-        "text/plain" -> ""
-      )
-    ))
-
-    result should equal (expectedResult)
-
-    statement = executeWithSession("y = 2")
-    statement.id should equal (1)
-
-    result = parse(statement.output)
-    expectedResult = Extraction.decompose(Map(
-      "status" -> "ok",
-      "execution_count" -> 1,
-      "data" -> Map(
-        "text/plain" -> ""
-      )
-    ))
-
-    result should equal (expectedResult)
-
-    statement = executeWithSession("x + y")
-    statement.id should equal (2)
-
-    result = parse(statement.output)
-    expectedResult = Extraction.decompose(Map(
-      "status" -> "ok",
-      "execution_count" -> 2,
-      "data" -> Map(
-        "text/plain" -> "[1] 3"
-      )
-    ))
-
-    result should equal (expectedResult)
-  }
-
-  it should "capture stdout from print" in withSession { session =>
-    val statement = execute(session)("""print('Hello World')""")
-    statement.id should equal (0)
-
-    val result = parse(statement.output)
-    val expectedResult = Extraction.decompose(Map(
-      "status" -> "ok",
-      "execution_count" -> 0,
-      "data" -> Map(
-        "text/plain" -> "[1] \"Hello World\""
-      )
-    ))
-
-    result should equal (expectedResult)
-  }
-
-  it should "capture stdout from cat" in withSession { session =>
-    val statement = execute(session)("""cat(3)""")
-    statement.id should equal (0)
-
-    val result = parse(statement.output)
-    val expectedResult = Extraction.decompose(Map(
-      "status" -> "ok",
-      "execution_count" -> 0,
-      "data" -> Map(
-        "text/plain" -> "3"
-      )
-    ))
-
-    result should equal (expectedResult)
-  }
-
-  it should "report an error if accessing an unknown variable" in withSession { session =>
-    val statement = execute(session)("""x""")
-    statement.id should equal (0)
-
-    val result = parse(statement.output)
-    (result \ "status").extract[String] should be ("error")
-    (result \ "execution_count").extract[Int] should be (0)
-    (result \ "ename").extract[String] should be ("Error")
-    assert((result \ "evalue").extract[String].contains("object 'x' not found"))
-    (result \ "traceback").extract[List[String]] should be (List())
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/com/cloudera/livy/repl/SparkSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/com/cloudera/livy/repl/SparkSessionSpec.scala b/repl/src/test/scala/com/cloudera/livy/repl/SparkSessionSpec.scala
deleted file mode 100644
index 1b2be30..0000000
--- a/repl/src/test/scala/com/cloudera/livy/repl/SparkSessionSpec.scala
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.repl
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import org.apache.spark.SparkConf
-import org.json4s.Extraction
-import org.json4s.JsonAST.JValue
-import org.json4s.jackson.JsonMethods.parse
-import org.scalatest.concurrent.Eventually._
-
-import com.cloudera.livy.rsc.RSCConf
-import com.cloudera.livy.rsc.driver.StatementState
-
-class SparkSessionSpec extends BaseSessionSpec {
-
-  override def createInterpreter(): Interpreter = new SparkInterpreter(new SparkConf())
-
-  it should "execute `1 + 2` == 3" in withSession { session =>
-    val statement = execute(session)("1 + 2")
-    statement.id should equal (0)
-
-    val result = parse(statement.output)
-    val expectedResult = Extraction.decompose(Map(
-      "status" -> "ok",
-      "execution_count" -> 0,
-      "data" -> Map(
-        "text/plain" -> "res0: Int = 3"
-      )
-    ))
-
-    result should equal (expectedResult)
-  }
-
-  it should "execute `x = 1`, then `y = 2`, then `x + y`" in withSession { session =>
-    val executeWithSession = execute(session)(_)
-    var statement = executeWithSession("val x = 1")
-    statement.id should equal (0)
-
-    var result = parse(statement.output)
-    var expectedResult = Extraction.decompose(Map(
-      "status" -> "ok",
-      "execution_count" -> 0,
-      "data" -> Map(
-        "text/plain" -> "x: Int = 1"
-      )
-    ))
-
-    result should equal (expectedResult)
-
-    statement = executeWithSession("val y = 2")
-    statement.id should equal (1)
-
-    result = parse(statement.output)
-    expectedResult = Extraction.decompose(Map(
-      "status" -> "ok",
-      "execution_count" -> 1,
-      "data" -> Map(
-        "text/plain" -> "y: Int = 2"
-      )
-    ))
-
-    result should equal (expectedResult)
-
-    statement = executeWithSession("x + y")
-    statement.id should equal (2)
-
-    result = parse(statement.output)
-    expectedResult = Extraction.decompose(Map(
-      "status" -> "ok",
-      "execution_count" -> 2,
-      "data" -> Map(
-        "text/plain" -> "res0: Int = 3"
-      )
-    ))
-
-    result should equal (expectedResult)
-  }
-
-  it should "capture stdout" in withSession { session =>
-    val statement = execute(session)("""println("Hello World")""")
-    statement.id should equal (0)
-
-    val result = parse(statement.output)
-    val expectedResult = Extraction.decompose(Map(
-      "status" -> "ok",
-      "execution_count" -> 0,
-      "data" -> Map(
-        "text/plain" -> "Hello World"
-      )
-    ))
-
-    result should equal (expectedResult)
-  }
-
-  it should "report an error if accessing an unknown variable" in withSession { session =>
-    val statement = execute(session)("""x""")
-    statement.id should equal (0)
-
-    val result = parse(statement.output)
-
-    def extract(key: String): String = (result \ key).extract[String]
-
-    extract("status") should equal ("error")
-    extract("execution_count") should equal ("0")
-    extract("ename") should equal ("Error")
-    extract("evalue") should include ("error: not found: value x")
-  }
-
-  it should "report an error if exception is thrown" in withSession { session =>
-    val statement = execute(session)(
-      """def func1() {
-        |throw new Exception()
-        |}
-        |func1()""".stripMargin)
-    statement.id should equal (0)
-
-    val result = parse(statement.output)
-    val resultMap = result.extract[Map[String, JValue]]
-
-    // Manually extract the values since the line numbers in the exception could change.
-    resultMap("status").extract[String] should equal ("error")
-    resultMap("execution_count").extract[Int] should equal (0)
-    resultMap("ename").extract[String] should equal ("Error")
-    resultMap("evalue").extract[String] should include ("java.lang.Exception")
-
-    val traceback = resultMap("traceback").extract[Seq[String]]
-    traceback(0) should include ("func1(<console>:")
-  }
-
-  it should "access the spark context" in withSession { session =>
-    val statement = execute(session)("""sc""")
-    statement.id should equal (0)
-
-    val result = parse(statement.output)
-    val resultMap = result.extract[Map[String, JValue]]
-
-    // Manually extract the values since the line numbers in the exception could change.
-    resultMap("status").extract[String] should equal ("ok")
-    resultMap("execution_count").extract[Int] should equal (0)
-
-    val data = resultMap("data").extract[Map[String, JValue]]
-    data("text/plain").extract[String] should include (
-      "res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext")
-  }
-
-  it should "execute spark commands" in withSession { session =>
-    val statement = execute(session)(
-      """sc.parallelize(0 to 1).map{i => i+1}.collect""".stripMargin)
-    statement.id should equal (0)
-
-    val result = parse(statement.output)
-
-    val expectedResult = Extraction.decompose(Map(
-      "status" -> "ok",
-      "execution_count" -> 0,
-      "data" -> Map(
-        "text/plain" -> "res0: Array[Int] = Array(1, 2)"
-      )
-    ))
-
-    result should equal (expectedResult)
-  }
-
-  it should "do table magic" in withSession { session =>
-    val statement = execute(session)("val x = List((1, \"a\"), (3, \"b\"))\n%table x")
-    statement.id should equal (0)
-
-    val result = parse(statement.output)
-
-    val expectedResult = Extraction.decompose(Map(
-      "status" -> "ok",
-      "execution_count" -> 0,
-      "data" -> Map(
-        "application/vnd.livy.table.v1+json" -> Map(
-          "headers" -> List(
-            Map("type" -> "BIGINT_TYPE", "name" -> "_1"),
-            Map("type" -> "STRING_TYPE", "name" -> "_2")),
-          "data" -> List(List(1, "a"), List(3, "b"))
-        )
-      )
-    ))
-
-    result should equal (expectedResult)
-  }
-
-  it should "cancel spark jobs" in withSession { session =>
-    val stmtId = session.execute(
-      """sc.parallelize(0 to 10).map { i => Thread.sleep(10000); i + 1 }.collect""".stripMargin)
-    eventually(timeout(30 seconds), interval(100 millis)) {
-      assert(session.statements(stmtId).state.get() == StatementState.Running)
-    }
-    session.cancel(stmtId)
-
-    eventually(timeout(30 seconds), interval(100 millis)) {
-      assert(session.statements(stmtId).state.get() == StatementState.Cancelled)
-      session.statements(stmtId).output should include (
-        "Job 0 cancelled part of cancelled job group 0")
-    }
-  }
-
-  it should "cancel waiting statement" in withSession { session =>
-    val stmtId1 = session.execute(
-      """sc.parallelize(0 to 10).map { i => Thread.sleep(10000); i + 1 }.collect""".stripMargin)
-    val stmtId2 = session.execute(
-      """sc.parallelize(0 to 10).map { i => Thread.sleep(10000); i + 1 }.collect""".stripMargin)
-    eventually(timeout(30 seconds), interval(100 millis)) {
-      assert(session.statements(stmtId1).state.get() == StatementState.Running)
-    }
-
-    assert(session.statements(stmtId2).state.get() == StatementState.Waiting)
-
-    session.cancel(stmtId2)
-    assert(session.statements(stmtId2).state.get() == StatementState.Cancelled)
-
-    session.cancel(stmtId1)
-    assert(session.statements(stmtId1).state.get() == StatementState.Cancelling)
-    eventually(timeout(30 seconds), interval(100 millis)) {
-      assert(session.statements(stmtId1).state.get() == StatementState.Cancelled)
-      session.statements(stmtId1).output should include (
-        "Job 0 cancelled part of cancelled job group 0")
-    }
-  }
-
-  it should "correctly calculate progress" in withSession { session =>
-    val executeCode =
-      """
-        |sc.parallelize(1 to 2, 2).map(i => (i, 1)).collect()
-      """.stripMargin
-
-    val stmtId = session.execute(executeCode)
-    eventually(timeout(30 seconds), interval(100 millis)) {
-      session.progressOfStatement(stmtId) should be(1.0)
-    }
-  }
-
-  it should "not generate Spark jobs for plain Scala code" in withSession { session =>
-    val executeCode = """1 + 1"""
-
-    val stmtId = session.execute(executeCode)
-    session.progressOfStatement(stmtId) should be (0.0)
-  }
-
-  it should "handle multiple jobs in one statement" in withSession { session =>
-    val executeCode =
-      """
-        |sc.parallelize(1 to 2, 2).map(i => (i, 1)).collect()
-        |sc.parallelize(1 to 2, 2).map(i => (i, 1)).collect()
-      """.stripMargin
-
-    val stmtId = session.execute(executeCode)
-    eventually(timeout(30 seconds), interval(100 millis)) {
-      session.progressOfStatement(stmtId) should be(1.0)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/org/apache/livy/repl/BaseInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/BaseInterpreterSpec.scala b/repl/src/test/scala/org/apache/livy/repl/BaseInterpreterSpec.scala
new file mode 100644
index 0000000..b3d4d5a
--- /dev/null
+++ b/repl/src/test/scala/org/apache/livy/repl/BaseInterpreterSpec.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.repl
+
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.livy.LivyBaseUnitTestSuite
+
+abstract class BaseInterpreterSpec extends FlatSpec with Matchers with LivyBaseUnitTestSuite {
+
+  def createInterpreter(): Interpreter
+
+  def withInterpreter(testCode: Interpreter => Any): Unit = {
+    val interpreter = createInterpreter()
+    try {
+      interpreter.start()
+      testCode(interpreter)
+    } finally {
+      interpreter.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/org/apache/livy/repl/BaseSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/BaseSessionSpec.scala b/repl/src/test/scala/org/apache/livy/repl/BaseSessionSpec.scala
new file mode 100644
index 0000000..a13f924
--- /dev/null
+++ b/repl/src/test/scala/org/apache/livy/repl/BaseSessionSpec.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.repl
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.json4s._
+import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.livy.LivyBaseUnitTestSuite
+import org.apache.livy.rsc.RSCConf
+import org.apache.livy.rsc.driver.{Statement, StatementState}
+import org.apache.livy.sessions.SessionState
+
+abstract class BaseSessionSpec extends FlatSpec with Matchers with LivyBaseUnitTestSuite {
+
+  implicit val formats = DefaultFormats
+
+  private val rscConf = new RSCConf(new Properties())
+
+  protected def execute(session: Session)(code: String): Statement = {
+    val id = session.execute(code)
+    eventually(timeout(30 seconds), interval(100 millis)) {
+      val s = session.statements(id)
+      s.state.get() shouldBe StatementState.Available
+      s
+    }
+  }
+
+  protected def withSession(testCode: Session => Any): Unit = {
+    val stateChangedCalled = new AtomicInteger()
+    val session =
+      new Session(rscConf, createInterpreter(), { _ => stateChangedCalled.incrementAndGet() })
+    try {
+      // Session's constructor should fire an initial state change event.
+      stateChangedCalled.intValue() shouldBe 1
+      Await.ready(session.start(), 30 seconds)
+      assert(session.state === SessionState.Idle())
+      // There should be at least 1 state change event fired when session transits to idle.
+      stateChangedCalled.intValue() should (be > 1)
+      testCode(session)
+    } finally {
+      session.close()
+    }
+  }
+
+  protected def createInterpreter(): Interpreter
+
+  it should "start in the starting or idle state" in {
+    val session = new Session(rscConf, createInterpreter())
+    val future = session.start()
+    try {
+      eventually(timeout(30 seconds), interval(100 millis)) {
+        session.state should (equal (SessionState.Starting()) or equal (SessionState.Idle()))
+      }
+      Await.ready(future, 60 seconds)
+    } finally {
+      session.close()
+    }
+  }
+
+  it should "eventually become the idle state" in withSession { session =>
+    session.state should equal (SessionState.Idle())
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala b/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
new file mode 100644
index 0000000..1404765
--- /dev/null
+++ b/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.repl
+
+import org.apache.spark.SparkConf
+import org.json4s.{DefaultFormats, JNull, JValue}
+import org.json4s.JsonDSL._
+import org.scalatest._
+
+import org.apache.livy.rsc.RSCConf
+import org.apache.livy.sessions._
+
+abstract class PythonBaseInterpreterSpec extends BaseInterpreterSpec {
+
+  it should "execute `1 + 2` == 3" in withInterpreter { interpreter =>
+    val response = interpreter.execute("1 + 2")
+    response should equal (Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "3"
+    ))
+  }
+
+  it should "execute multiple statements" in withInterpreter { interpreter =>
+    var response = interpreter.execute("x = 1")
+    response should equal (Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> ""
+    ))
+
+    response = interpreter.execute("y = 2")
+    response should equal (Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> ""
+    ))
+
+    response = interpreter.execute("x + y")
+    response should equal (Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "3"
+    ))
+  }
+
+  it should "execute multiple statements in one block" in withInterpreter { interpreter =>
+    val response = interpreter.execute(
+      """
+        |x = 1
+        |
+        |y = 2
+        |
+        |x + y
+      """.stripMargin)
+    response should equal(Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "3"
+    ))
+  }
+
+  it should "parse a class" in withInterpreter { interpreter =>
+    val response = interpreter.execute(
+      """
+        |class Counter(object):
+        |   def __init__(self):
+        |       self.count = 0
+        |
+        |   def add_one(self):
+        |       self.count += 1
+        |
+        |   def add_two(self):
+        |       self.count += 2
+        |
+        |counter = Counter()
+        |counter.add_one()
+        |counter.add_two()
+        |counter.count
+      """.stripMargin)
+    response should equal(Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "3"
+    ))
+  }
+
+  it should "do json magic" in withInterpreter { interpreter =>
+    val response = interpreter.execute(
+      """x = [[1, 'a'], [3, 'b']]
+        |%json x
+      """.stripMargin)
+
+    response should equal(Interpreter.ExecuteSuccess(
+      APPLICATION_JSON -> List[JValue](
+        List[JValue](1, "a"),
+        List[JValue](3, "b")
+      )
+    ))
+  }
+
+  it should "do table magic" in withInterpreter { interpreter =>
+    val response = interpreter.execute(
+      """x = [[1, 'a'], [3, 'b']]
+        |%table x
+      """.stripMargin)
+
+    response should equal(Interpreter.ExecuteSuccess(
+      APPLICATION_LIVY_TABLE_JSON -> (
+        ("headers" -> List(
+          ("type" -> "INT_TYPE") ~ ("name" -> "0"),
+          ("type" -> "STRING_TYPE") ~ ("name" -> "1")
+        )) ~
+          ("data" -> List(
+            List[JValue](1, "a"),
+            List[JValue](3, "b")
+          ))
+        )
+    ))
+  }
+
+  it should "do table magic with None type value" in withInterpreter { interpreter =>
+    val response = interpreter.execute(
+      """x = [{"a":"1", "b":None}, {"a":"2", "b":2}]
+        |%table x
+      """.stripMargin)
+
+    response should equal(Interpreter.ExecuteSuccess(
+      APPLICATION_LIVY_TABLE_JSON -> (
+        ("headers" -> List(
+          ("type" -> "STRING_TYPE") ~ ("name" -> "a"),
+          ("type" -> "INT_TYPE") ~ ("name" -> "b")
+        )) ~
+          ("data" -> List(
+            List[JValue]("1", JNull),
+            List[JValue]("2", 2)
+          ))
+        )
+    ))
+  }
+
+  it should "do table magic with None type Row" in withInterpreter { interpreter =>
+    val response = interpreter.execute(
+      """x = [{"a":None, "b":None}, {"a":"2", "b":2}]
+        |%table x
+      """.stripMargin)
+
+    response should equal(Interpreter.ExecuteSuccess(
+      APPLICATION_LIVY_TABLE_JSON -> (
+        ("headers" -> List(
+          ("type" -> "STRING_TYPE") ~ ("name" -> "a"),
+          ("type" -> "INT_TYPE") ~ ("name" -> "b")
+        )) ~
+          ("data" -> List(
+            List[JValue](JNull, JNull),
+            List[JValue]("2", 2)
+          ))
+        )
+    ))
+  }
+
+  it should "allow magic inside statements" in withInterpreter { interpreter =>
+    val response = interpreter.execute(
+      """x = [[1, 'a'], [3, 'b']]
+        |%table x
+        |1 + 2
+      """.stripMargin)
+
+    response should equal(Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "3"
+    ))
+  }
+
+  it should "capture stdout" in withInterpreter { interpreter =>
+    val response = interpreter.execute("print('Hello World')")
+    response should equal(Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "Hello World"
+    ))
+  }
+
+  it should "report an error if accessing an unknown variable" in withInterpreter { interpreter =>
+    val response = interpreter.execute("x")
+    response should equal(Interpreter.ExecuteError(
+      "NameError",
+      "name 'x' is not defined",
+      List(
+        "Traceback (most recent call last):\n",
+        "NameError: name 'x' is not defined\n"
+      )
+    ))
+  }
+
+  it should "report an error if empty magic command" in withInterpreter { interpreter =>
+    val response = interpreter.execute("%")
+    response should equal(Interpreter.ExecuteError(
+      "UnknownMagic",
+      "magic command not specified",
+      List("UnknownMagic: magic command not specified\n")
+    ))
+  }
+
+  it should "report an error if unknown magic command" in withInterpreter { interpreter =>
+    val response = interpreter.execute("%foo")
+    response should equal(Interpreter.ExecuteError(
+      "UnknownMagic",
+      "unknown magic command 'foo'",
+      List("UnknownMagic: unknown magic command 'foo'\n")
+    ))
+  }
+
+  it should "not execute part of the block if there is a syntax error" in withInterpreter { intp =>
+    var response = intp.execute(
+      """x = 1
+        |'
+      """.stripMargin)
+
+    response should equal(Interpreter.ExecuteError(
+      "SyntaxError",
+      "EOL while scanning string literal (<stdin>, line 2)",
+      List(
+        "  File \"<stdin>\", line 2\n",
+        "    '\n",
+        "    ^\n",
+        "SyntaxError: EOL while scanning string literal\n"
+      )
+    ))
+
+    response = intp.execute("x")
+    response should equal(Interpreter.ExecuteError(
+      "NameError",
+      "name 'x' is not defined",
+      List(
+        "Traceback (most recent call last):\n",
+        "NameError: name 'x' is not defined\n"
+      )
+    ))
+  }
+}
+
+class Python2InterpreterSpec extends PythonBaseInterpreterSpec {
+
+  implicit val formats = DefaultFormats
+
+  override def createInterpreter(): Interpreter = PythonInterpreter(new SparkConf(), PySpark())
+
+  // Scalastyle is treating unicode escape as non ascii characters. Turn off the check.
+  // scalastyle:off non.ascii.character.disallowed
+  it should "print unicode correctly" in withInterpreter { intp =>
+    intp.execute("print(u\"\u263A\")") should equal(Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "\u263A"
+    ))
+    intp.execute("""print(u"\u263A")""") should equal(Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "\u263A"
+    ))
+    intp.execute("""print("\xE2\x98\xBA")""") should equal(Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "\u263A"
+    ))
+  }
+  // scalastyle:on non.ascii.character.disallowed
+}
+
+class Python3InterpreterSpec extends PythonBaseInterpreterSpec {
+
+  implicit val formats = DefaultFormats
+
+  override protected def withFixture(test: NoArgTest): Outcome = {
+    assume(!sys.props.getOrElse("skipPySpark3Tests", "false").toBoolean, "Skipping PySpark3 tests.")
+    test()
+  }
+
+  override def createInterpreter(): Interpreter = PythonInterpreter(new SparkConf(), PySpark3())
+
+  it should "check python version is 3.x" in withInterpreter { interpreter =>
+    val response = interpreter.execute("""import sys
+      |sys.version >= '3'
+      """.stripMargin)
+    response should equal (Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "True"
+    ))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/org/apache/livy/repl/PythonSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/PythonSessionSpec.scala b/repl/src/test/scala/org/apache/livy/repl/PythonSessionSpec.scala
new file mode 100644
index 0000000..0883f27
--- /dev/null
+++ b/repl/src/test/scala/org/apache/livy/repl/PythonSessionSpec.scala
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.repl
+
+import org.apache.spark.SparkConf
+import org.json4s.Extraction
+import org.json4s.jackson.JsonMethods.parse
+import org.scalatest._
+
+import org.apache.livy.rsc.RSCConf
+import org.apache.livy.sessions._
+
+abstract class PythonSessionSpec extends BaseSessionSpec {
+
+  it should "execute `1 + 2` == 3" in withSession { session =>
+    val statement = execute(session)("1 + 2")
+    statement.id should equal (0)
+
+    val result = parse(statement.output)
+    val expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 0,
+      "data" -> Map(
+        "text/plain" -> "3"
+      )
+    ))
+
+    result should equal (expectedResult)
+  }
+
+  it should "execute `x = 1`, then `y = 2`, then `x + y`" in withSession { session =>
+    val executeWithSession = execute(session)(_)
+    var statement = executeWithSession("x = 1")
+    statement.id should equal (0)
+
+    var result = parse(statement.output)
+    var expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 0,
+      "data" -> Map(
+        "text/plain" -> ""
+      )
+    ))
+
+    result should equal (expectedResult)
+
+    statement = executeWithSession("y = 2")
+    statement.id should equal (1)
+
+    result = parse(statement.output)
+    expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 1,
+      "data" -> Map(
+        "text/plain" -> ""
+      )
+    ))
+
+    result should equal (expectedResult)
+
+    statement = executeWithSession("x + y")
+    statement.id should equal (2)
+
+    result = parse(statement.output)
+    expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 2,
+      "data" -> Map(
+        "text/plain" -> "3"
+      )
+    ))
+
+    result should equal (expectedResult)
+  }
+
+  it should "do table magic" in withSession { session =>
+    val statement = execute(session)("x = [[1, 'a'], [3, 'b']]\n%table x")
+    statement.id should equal (0)
+
+    val result = parse(statement.output)
+    val expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 0,
+      "data" -> Map(
+        "application/vnd.livy.table.v1+json" -> Map(
+          "headers" -> List(
+            Map("type" -> "INT_TYPE", "name" -> "0"),
+            Map("type" -> "STRING_TYPE", "name" -> "1")),
+          "data" -> List(List(1, "a"), List(3, "b"))
+        )
+      )
+    ))
+
+    result should equal (expectedResult)
+  }
+
+  it should "capture stdout" in withSession { session =>
+    val statement = execute(session)("""print('Hello World')""")
+    statement.id should equal (0)
+
+    val result = parse(statement.output)
+    val expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 0,
+      "data" -> Map(
+        "text/plain" -> "Hello World"
+      )
+    ))
+
+    result should equal (expectedResult)
+  }
+
+  it should "report an error if accessing an unknown variable" in withSession { session =>
+    val statement = execute(session)("""x""")
+    statement.id should equal (0)
+
+    val result = parse(statement.output)
+    val expectedResult = Extraction.decompose(Map(
+      "status" -> "error",
+      "execution_count" -> 0,
+      "traceback" -> List(
+        "Traceback (most recent call last):\n",
+        "NameError: name 'x' is not defined\n"
+      ),
+      "ename" -> "NameError",
+      "evalue" -> "name 'x' is not defined"
+    ))
+
+    result should equal (expectedResult)
+  }
+
+  it should "report an error if exception is thrown" in withSession { session =>
+    val statement = execute(session)(
+      """def func1():
+        |  raise Exception("message")
+        |def func2():
+        |  func1()
+        |func2()
+      """.stripMargin)
+    statement.id should equal (0)
+
+    val result = parse(statement.output)
+    val expectedResult = Extraction.decompose(Map(
+      "status" -> "error",
+      "execution_count" -> 0,
+      "traceback" -> List(
+        "Traceback (most recent call last):\n",
+        "  File \"<stdin>\", line 4, in func2\n",
+        "  File \"<stdin>\", line 2, in func1\n",
+        "Exception: message\n"
+      ),
+      "ename" -> "Exception",
+      "evalue" -> "message"
+    ))
+
+    result should equal (expectedResult)
+  }
+}
+
+class Python2SessionSpec extends PythonSessionSpec {
+  override def createInterpreter(): Interpreter = PythonInterpreter(new SparkConf(), PySpark())
+}
+
+class Python3SessionSpec extends PythonSessionSpec {
+
+  override protected def withFixture(test: NoArgTest): Outcome = {
+    assume(!sys.props.getOrElse("skipPySpark3Tests", "false").toBoolean, "Skipping PySpark3 tests.")
+    test()
+  }
+
+  override def createInterpreter(): Interpreter = PythonInterpreter(new SparkConf(), PySpark3())
+
+  it should "check python version is 3.x" in withSession { session =>
+    val statement = execute(session)(
+      """import sys
+      |sys.version >= '3'
+      """.stripMargin)
+    statement.id should equal (0)
+
+    val result = parse(statement.output)
+    val expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 0,
+      "data" -> Map(
+        "text/plain" -> "True"
+      )
+    ))
+
+    result should equal (expectedResult)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/org/apache/livy/repl/ReplDriverSuite.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/ReplDriverSuite.scala b/repl/src/test/scala/org/apache/livy/repl/ReplDriverSuite.scala
new file mode 100644
index 0000000..6537f0c
--- /dev/null
+++ b/repl/src/test/scala/org/apache/livy/repl/ReplDriverSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.repl
+
+import java.net.URI
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.apache.spark.launcher.SparkLauncher
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+import org.scalatest.FunSuite
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.livy._
+import org.apache.livy.rsc.{PingJob, RSCClient, RSCConf}
+import org.apache.livy.sessions.Spark
+
+class ReplDriverSuite extends FunSuite with LivyBaseUnitTestSuite {
+
+  private implicit val formats = DefaultFormats
+
+  test("start a repl session using the rsc") {
+    val client = new LivyClientBuilder()
+      .setConf(SparkLauncher.DRIVER_MEMORY, "512m")
+      .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, sys.props("java.class.path"))
+      .setConf(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH, sys.props("java.class.path"))
+      .setConf(RSCConf.Entry.LIVY_JARS.key(), "")
+      .setURI(new URI("rsc:/"))
+      .setConf(RSCConf.Entry.DRIVER_CLASS.key(), classOf[ReplDriver].getName())
+      .setConf(RSCConf.Entry.SESSION_KIND.key(), Spark().toString)
+      .build()
+      .asInstanceOf[RSCClient]
+
+    try {
+      // This is sort of what InteractiveSession.scala does to detect an idle session.
+      client.submit(new PingJob()).get(60, TimeUnit.SECONDS)
+
+      val statementId = client.submitReplCode("1 + 1").get
+      eventually(timeout(30 seconds), interval(100 millis)) {
+        val rawResult =
+          client.getReplJobResults(statementId, 1).get(10, TimeUnit.SECONDS).statements(0)
+        val result = rawResult.output
+        assert((parse(result) \ Session.STATUS).extract[String] === Session.OK)
+      }
+    } finally {
+      client.stop(true)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/org/apache/livy/repl/ScalaInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/ScalaInterpreterSpec.scala b/repl/src/test/scala/org/apache/livy/repl/ScalaInterpreterSpec.scala
new file mode 100644
index 0000000..3e9ee82
--- /dev/null
+++ b/repl/src/test/scala/org/apache/livy/repl/ScalaInterpreterSpec.scala
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.repl
+
+import org.apache.spark.SparkConf
+import org.json4s.{DefaultFormats, JValue}
+import org.json4s.JsonDSL._
+
+import org.apache.livy.rsc.RSCConf
+
+class ScalaInterpreterSpec extends BaseInterpreterSpec {
+
+  implicit val formats = DefaultFormats
+
+  override def createInterpreter(): Interpreter =
+    new SparkInterpreter(new SparkConf())
+
+  it should "execute `1 + 2` == 3" in withInterpreter { interpreter =>
+    val response = interpreter.execute("1 + 2")
+    response should equal (Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "res0: Int = 3"
+    ))
+  }
+
+  it should "execute multiple statements" in withInterpreter { interpreter =>
+    var response = interpreter.execute("val x = 1")
+    response should equal (Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "x: Int = 1"
+    ))
+
+    response = interpreter.execute("val y = 2")
+    response should equal (Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "y: Int = 2"
+    ))
+
+    response = interpreter.execute("x + y")
+    response should equal (Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "res0: Int = 3"
+    ))
+  }
+
+  it should "execute multiple statements in one block" in withInterpreter { interpreter =>
+    val response = interpreter.execute(
+      """
+        |val x = 1
+        |
+        |val y = 2
+        |
+        |x + y
+      """.stripMargin)
+    response should equal(Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "res2: Int = 3"
+    ))
+  }
+
+  it should "do table magic" in withInterpreter { interpreter =>
+    val response = interpreter.execute(
+      """val x = List(List(1, "a"), List(3, "b"))
+        |%table x
+      """.stripMargin)
+
+    response should equal(Interpreter.ExecuteSuccess(
+      APPLICATION_LIVY_TABLE_JSON -> (
+        ("headers" -> List(
+          ("type" -> "BIGINT_TYPE") ~ ("name" -> "0"),
+          ("type" -> "STRING_TYPE") ~ ("name" -> "1")
+        )) ~
+          ("data" -> List(
+            List[JValue](1, "a"),
+            List[JValue](3, "b")
+          ))
+        )
+    ))
+  }
+
+  it should "allow magic inside statements" in withInterpreter { interpreter =>
+    val response = interpreter.execute(
+      """val x = List(List(1, "a"), List(3, "b"))
+        |%table x
+        |1 + 2
+      """.stripMargin)
+
+    response should equal(Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "res0: Int = 3"
+    ))
+  }
+
+  it should "capture stdout" in withInterpreter { interpreter =>
+    val response = interpreter.execute("println(\"Hello World\")")
+    response should equal(Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "Hello World"
+    ))
+  }
+
+  it should "report an error if accessing an unknown variable" in withInterpreter { interpreter =>
+    interpreter.execute("x") match {
+      case Interpreter.ExecuteError(ename, evalue, _) =>
+        ename should equal ("Error")
+        evalue should include ("error: not found: value x")
+
+      case other =>
+        fail(s"Expected error, got $other.")
+    }
+  }
+
+  it should "execute spark commands" in withInterpreter { interpreter =>
+    val response = interpreter.execute(
+      """sc.parallelize(0 to 1).map { i => i+1 }.collect""".stripMargin)
+
+    response should equal(Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "res0: Array[Int] = Array(1, 2)"
+    ))
+  }
+
+  it should "handle statements ending with comments" in withInterpreter { interpreter =>
+    // Test statements with only comments
+    var response = interpreter.execute("""// comment""")
+    response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> ""))
+
+    response = interpreter.execute(
+      """/*
+        |comment
+        |*/
+      """.stripMargin)
+    response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> ""))
+
+    // Test statements ending with comments
+    response = interpreter.execute(
+      """val r = 1
+        |// comment
+      """.stripMargin)
+    response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
+
+    response = interpreter.execute(
+      """val r = 1
+        |/*
+        |comment
+        |comment
+        |*/
+      """.stripMargin)
+    response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
+
+    // Test statements ending with a mix of single line and multi-line comments
+    response = interpreter.execute(
+      """val r = 1
+        |// comment
+        |/*
+        |comment
+        |comment
+        |*/
+        |// comment
+      """.stripMargin)
+    response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
+
+    response = interpreter.execute(
+      """val r = 1
+        |/*
+        |comment
+        |// comment
+        |comment
+        |*/
+      """.stripMargin)
+    response should equal(Interpreter.ExecuteSuccess(TEXT_PLAIN -> "r: Int = 1"))
+
+    // Make sure incomplete statement is still returned as incomplete statement.
+    response = interpreter.execute("sc.")
+    response should equal(Interpreter.ExecuteIncomplete())
+
+    // Make sure incomplete statement is still returned as incomplete statement.
+    response = interpreter.execute(
+      """sc.
+        |// comment
+      """.stripMargin)
+    response should equal(Interpreter.ExecuteIncomplete())
+
+    // Make sure our handling doesn't mess up a string with value like comments.
+    val tripleQuotes = "\"\"\""
+    val stringWithComment = s"/*\ncomment\n*/\n//comment"
+    response = interpreter.execute(s"val r = $tripleQuotes$stringWithComment$tripleQuotes")
+
+    try {
+      response should equal(
+        Interpreter.ExecuteSuccess(TEXT_PLAIN -> s"r: String = \n$stringWithComment"))
+    } catch {
+      case _: Exception =>
+        response should equal(
+          // Scala 2.11 doesn't have a " " after "="
+          Interpreter.ExecuteSuccess(TEXT_PLAIN -> s"r: String =\n$stringWithComment"))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/org/apache/livy/repl/SessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/SessionSpec.scala b/repl/src/test/scala/org/apache/livy/repl/SessionSpec.scala
new file mode 100644
index 0000000..090b7cb
--- /dev/null
+++ b/repl/src/test/scala/org/apache/livy/repl/SessionSpec.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.repl
+
+import java.util.Properties
+import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch, TimeUnit}
+
+import org.mockito.Mockito.when
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.FunSpec
+import org.scalatest.Matchers._
+import org.scalatest.concurrent.Eventually
+import org.scalatest.mock.MockitoSugar.mock
+import org.scalatest.time._
+
+import org.apache.livy.LivyBaseUnitTestSuite
+import org.apache.livy.repl.Interpreter.ExecuteResponse
+import org.apache.livy.rsc.RSCConf
+
+class SessionSpec extends FunSpec with Eventually with LivyBaseUnitTestSuite {
+  override implicit val patienceConfig =
+    PatienceConfig(timeout = scaled(Span(10, Seconds)), interval = scaled(Span(100, Millis)))
+
+  private val rscConf = new RSCConf(new Properties())
+
+  describe("Session") {
+    it("should call state changed callbacks in happy path") {
+      val expectedStateTransitions =
+        Array("not_started", "starting", "idle", "busy", "idle", "busy", "idle")
+      val actualStateTransitions = new ConcurrentLinkedQueue[String]()
+
+      val interpreter = mock[Interpreter]
+      when(interpreter.kind).thenAnswer(new Answer[String] {
+        override def answer(invocationOnMock: InvocationOnMock): String = "spark"
+      })
+
+      val session =
+        new Session(rscConf, interpreter, { s => actualStateTransitions.add(s.toString) })
+
+      session.start()
+
+      session.execute("")
+
+      eventually {
+        actualStateTransitions.toArray shouldBe expectedStateTransitions
+      }
+    }
+
+    it("should not transit to idle if there're any pending statements.") {
+      val expectedStateTransitions =
+        Array("not_started", "busy", "busy", "busy", "idle", "busy", "idle")
+      val actualStateTransitions = new ConcurrentLinkedQueue[String]()
+
+      val interpreter = mock[Interpreter]
+      when(interpreter.kind).thenAnswer(new Answer[String] {
+        override def answer(invocationOnMock: InvocationOnMock): String = "spark"
+      })
+
+      val blockFirstExecuteCall = new CountDownLatch(1)
+      when(interpreter.execute("")).thenAnswer(new Answer[Interpreter.ExecuteResponse] {
+        override def answer(invocation: InvocationOnMock): ExecuteResponse = {
+          blockFirstExecuteCall.await(10, TimeUnit.SECONDS)
+          null
+        }
+      })
+      val session =
+        new Session(rscConf, interpreter, { s => actualStateTransitions.add(s.toString) })
+
+      for (_ <- 1 to 2) {
+        session.execute("")
+      }
+
+      blockFirstExecuteCall.countDown()
+      eventually {
+        actualStateTransitions.toArray shouldBe expectedStateTransitions
+      }
+    }
+
+    it("should remove old statements when reaching threshold") {
+      val interpreter = mock[Interpreter]
+      when(interpreter.kind).thenAnswer(new Answer[String] {
+        override def answer(invocationOnMock: InvocationOnMock): String = "spark"
+      })
+
+      rscConf.set(RSCConf.Entry.RETAINED_STATEMENT_NUMBER, 2)
+      val session = new Session(rscConf, interpreter)
+      session.start()
+
+      session.statements.size should be (0)
+      session.execute("")
+      session.statements.size should be (1)
+      session.statements.map(_._1).toSet should be (Set(0))
+      session.execute("")
+      session.statements.size should be (2)
+      session.statements.map(_._1).toSet should be (Set(0, 1))
+      session.execute("")
+      eventually {
+        session.statements.size should be (2)
+        session.statements.map(_._1).toSet should be (Set(1, 2))
+      }
+
+      // Continue submitting statements, total statements in memory should be 2.
+      session.execute("")
+      eventually {
+        session.statements.size should be (2)
+        session.statements.map(_._1).toSet should be (Set(2, 3))
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/org/apache/livy/repl/SparkRInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/SparkRInterpreterSpec.scala b/repl/src/test/scala/org/apache/livy/repl/SparkRInterpreterSpec.scala
new file mode 100644
index 0000000..374afbc
--- /dev/null
+++ b/repl/src/test/scala/org/apache/livy/repl/SparkRInterpreterSpec.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.repl
+
+import org.apache.spark.SparkConf
+import org.json4s.{DefaultFormats, JValue}
+import org.json4s.JsonDSL._
+import org.scalatest._
+
+import org.apache.livy.rsc.RSCConf
+
+class SparkRInterpreterSpec extends BaseInterpreterSpec {
+
+  implicit val formats = DefaultFormats
+
+  override protected def withFixture(test: NoArgTest): Outcome = {
+    assume(!sys.props.getOrElse("skipRTests", "false").toBoolean, "Skipping R tests.")
+    super.withFixture(test)
+  }
+
+  override def createInterpreter(): Interpreter = SparkRInterpreter(new SparkConf())
+
+  it should "execute `1 + 2` == 3" in withInterpreter { interpreter =>
+    val response = interpreter.execute("1 + 2")
+    response should equal (Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "[1] 3"
+    ))
+  }
+
+  it should "execute multiple statements" in withInterpreter { interpreter =>
+    var response = interpreter.execute("x = 1")
+    response should equal (Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> ""
+    ))
+
+    response = interpreter.execute("y = 2")
+    response should equal (Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> ""
+    ))
+
+    response = interpreter.execute("x + y")
+    response should equal (Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "[1] 3"
+    ))
+  }
+
+  it should "execute multiple statements in one block" in withInterpreter { interpreter =>
+    val response = interpreter.execute(
+      """
+        |x = 1
+        |
+        |y = 2
+        |
+        |x + y
+      """.stripMargin)
+    response should equal(Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "[1] 3"
+    ))
+  }
+
+  it should "capture stdout" in withInterpreter { interpreter =>
+    val response = interpreter.execute("cat(3)")
+    response should equal(Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "3"
+    ))
+  }
+
+  it should "report an error if accessing an unknown variable" in withInterpreter { interpreter =>
+    val response = interpreter.execute("x")
+    assert(response.isInstanceOf[Interpreter.ExecuteError])
+    val errorResponse = response.asInstanceOf[Interpreter.ExecuteError]
+    errorResponse.ename should be ("Error")
+    assert(errorResponse.evalue.contains("object 'x' not found"))
+  }
+
+
+  it should "not hang when executing incomplete statements" in withInterpreter { interpreter =>
+    val response = interpreter.execute("x[")
+    response should equal(Interpreter.ExecuteError(
+      "Error",
+        """[1] "Error in parse(text = \"x[\"): <text>:2:0: unexpected end of input\n1: x[\n   ^""""
+    ))
+  }
+
+  it should "escape the statement" in withInterpreter { interpreter =>
+    val response = interpreter.execute("print(\"a\")")
+    response should equal(Interpreter.ExecuteSuccess(
+      TEXT_PLAIN -> "[1] \"a\""
+    ))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/src/test/scala/org/apache/livy/repl/SparkRSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/SparkRSessionSpec.scala b/repl/src/test/scala/org/apache/livy/repl/SparkRSessionSpec.scala
new file mode 100644
index 0000000..42ed60a
--- /dev/null
+++ b/repl/src/test/scala/org/apache/livy/repl/SparkRSessionSpec.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.repl
+
+import org.apache.spark.SparkConf
+import org.json4s.Extraction
+import org.json4s.jackson.JsonMethods.parse
+
+import org.apache.livy.rsc.RSCConf
+
+class SparkRSessionSpec extends BaseSessionSpec {
+
+  override protected def withFixture(test: NoArgTest) = {
+    assume(!sys.props.getOrElse("skipRTests", "false").toBoolean, "Skipping R tests.")
+    super.withFixture(test)
+  }
+
+  override def createInterpreter(): Interpreter = SparkRInterpreter(new SparkConf())
+
+  it should "execute `1 + 2` == 3" in withSession { session =>
+    val statement = execute(session)("1 + 2")
+    statement.id should equal(0)
+
+    val result = parse(statement.output)
+    val expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 0,
+      "data" -> Map(
+        "text/plain" -> "[1] 3"
+      )
+    ))
+
+    result should equal(expectedResult)
+  }
+
+  it should "execute `x = 1`, then `y = 2`, then `x + y`" in withSession { session =>
+    val executeWithSession = execute(session)(_)
+    var statement = executeWithSession("x = 1")
+    statement.id should equal (0)
+
+    var result = parse(statement.output)
+    var expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 0,
+      "data" -> Map(
+        "text/plain" -> ""
+      )
+    ))
+
+    result should equal (expectedResult)
+
+    statement = executeWithSession("y = 2")
+    statement.id should equal (1)
+
+    result = parse(statement.output)
+    expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 1,
+      "data" -> Map(
+        "text/plain" -> ""
+      )
+    ))
+
+    result should equal (expectedResult)
+
+    statement = executeWithSession("x + y")
+    statement.id should equal (2)
+
+    result = parse(statement.output)
+    expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 2,
+      "data" -> Map(
+        "text/plain" -> "[1] 3"
+      )
+    ))
+
+    result should equal (expectedResult)
+  }
+
+  it should "capture stdout from print" in withSession { session =>
+    val statement = execute(session)("""print('Hello World')""")
+    statement.id should equal (0)
+
+    val result = parse(statement.output)
+    val expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 0,
+      "data" -> Map(
+        "text/plain" -> "[1] \"Hello World\""
+      )
+    ))
+
+    result should equal (expectedResult)
+  }
+
+  it should "capture stdout from cat" in withSession { session =>
+    val statement = execute(session)("""cat(3)""")
+    statement.id should equal (0)
+
+    val result = parse(statement.output)
+    val expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 0,
+      "data" -> Map(
+        "text/plain" -> "3"
+      )
+    ))
+
+    result should equal (expectedResult)
+  }
+
+  it should "report an error if accessing an unknown variable" in withSession { session =>
+    val statement = execute(session)("""x""")
+    statement.id should equal (0)
+
+    val result = parse(statement.output)
+    (result \ "status").extract[String] should be ("error")
+    (result \ "execution_count").extract[Int] should be (0)
+    (result \ "ename").extract[String] should be ("Error")
+    assert((result \ "evalue").extract[String].contains("object 'x' not found"))
+    (result \ "traceback").extract[List[String]] should be (List())
+  }
+
+}