You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/07/05 03:06:57 UTC

[28/33] incubator-livy git commit: LIVY-375. Change Livy code package name to org.apache.livy

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/main/scala/org/apache/livy/test/framework/RealCluster.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/main/scala/org/apache/livy/test/framework/RealCluster.scala b/integration-test/src/main/scala/org/apache/livy/test/framework/RealCluster.scala
new file mode 100644
index 0000000..0acc580
--- /dev/null
+++ b/integration-test/src/main/scala/org/apache/livy/test/framework/RealCluster.scala
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.test.framework
+
+import java.io.{File, IOException}
+import java.nio.file.Files
+import java.security.PrivilegedExceptionAction
+import javax.servlet.http.HttpServletResponse._
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.sys.process._
+import scala.util.Random
+
+import com.decodified.scalassh._
+import com.ning.http.client.AsyncHttpClient
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.hadoop.security.UserGroupInformation
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.livy.{LivyConf, Logging}
+
+private class RealClusterConfig(config: Map[String, String]) {
+  val ip = config("ip")
+
+  val sshLogin = config("ssh.login")
+  val sshPubKey = config("ssh.pubkey")
+  val livyPort = config.getOrElse(LivyConf.SERVER_PORT.key, "8998").toInt
+  val livyClasspath = config.getOrElse("livy.classpath", "")
+
+  val deployLivy = config.getOrElse("deploy-livy", "true").toBoolean
+  val noDeployLivyHome = config.get("livy-home")
+
+  val sparkHome = config("env.spark_home")
+  val sparkConf = config.getOrElse("env.spark_conf", "/etc/spark/conf")
+  val hadoopConf = config.getOrElse("env.hadoop_conf", "/etc/hadoop/conf")
+
+  val javaHome = config.getOrElse("env.java_home", "/usr/java/default")
+}
+
+class RealCluster(_config: Map[String, String])
+  extends Cluster with ClusterUtils with Logging {
+
+  private val config = new RealClusterConfig(_config)
+
+  private var livyIsRunning = false
+  private var livyHomePath: String = _
+  private var livyEpoch = 0
+
+  private var _configDir: File = _
+
+  private var hdfsScratch: Path = _
+
+  private var sparkConfDir: String = _
+  private var tempDirPath: String = _
+  private var _hasSparkR: Boolean = _
+
+  override def isRealSpark(): Boolean = true
+
+  override def hasSparkR(): Boolean = _hasSparkR
+
+  override def configDir(): File = _configDir
+
+  override def hdfsScratchDir(): Path = hdfsScratch
+
+  override def doAsClusterUser[T](task: => T): T = {
+    val user = UserGroupInformation.createRemoteUser(config.sshLogin)
+    user.doAs(new PrivilegedExceptionAction[T] {
+      override def run(): T = task
+    })
+  }
+
+  private def sshClient[T](body: SshClient => SSH.Result[T]): T = {
+    val sshLogin = PublicKeyLogin(
+      config.sshLogin, None, config.sshPubKey :: Nil)
+    val hostConfig = HostConfig(login = sshLogin, hostKeyVerifier = HostKeyVerifiers.DontVerify)
+    SSH(config.ip, hostConfig)(body) match {
+      case Left(err) => throw new IOException(err)
+      case Right(result) => result
+    }
+  }
+
+  private def exec(cmd: String): CommandResult = {
+    info(s"Running command: $cmd")
+    val result = sshClient(_.exec(cmd))
+    result.exitCode match {
+      case Some(ec) if ec > 0 =>
+        throw new IOException(s"Command '$cmd' failed: $ec\n" +
+          s"stdout: ${result.stdOutAsString()}\n" +
+          s"stderr: ${result.stdErrAsString()}\n")
+      case _ =>
+    }
+    result
+  }
+
+  private def upload(local: String, remote: String): Unit = {
+    info(s"Uploading local path $local")
+    sshClient(_.upload(local, remote))
+  }
+
+  private def download(remote: String, local: String): Unit = {
+    info(s"Downloading remote path $remote")
+    sshClient(_.download(remote, local))
+  }
+
+  override def deploy(): Unit = {
+    // Make sure Livy is not running.
+    stopLivy()
+
+    // Check whether SparkR is supported in YARN (need the sparkr.zip archive).\
+    _hasSparkR = try {
+      exec(s"test -f ${config.sparkHome}/R/lib/sparkr.zip")
+      true
+    } catch {
+      case e: IOException => false
+    }
+
+    // Copy the remove Hadoop configuration to a local temp dir so that tests can use it to
+    // talk to HDFS and YARN.
+    val localTemp = new File(sys.props("java.io.tmpdir") + File.separator + "hadoop-conf")
+    download(config.hadoopConf, localTemp.getAbsolutePath())
+    _configDir = localTemp
+
+    // Create a temp directory where test files will be written.
+    tempDirPath = s"/tmp/livy-it-${Random.alphanumeric.take(16).mkString}"
+    exec(s"mkdir -p $tempDirPath")
+
+    // Also create an HDFS scratch directory for tests.
+    doAsClusterUser {
+      hdfsScratch = fs.makeQualified(
+        new Path(s"/tmp/livy-it-${Random.alphanumeric.take(16).mkString}"))
+      fs.mkdirs(hdfsScratch)
+      fs.setPermission(hdfsScratch, new FsPermission("777"))
+    }
+
+    // Create a copy of the Spark configuration, and make sure the master is "yarn-cluster".
+    sparkConfDir = s"$tempDirPath/spark-conf"
+    val sparkProps = s"$sparkConfDir/spark-defaults.conf"
+    Seq(
+      s"cp -Lr ${config.sparkConf} $sparkConfDir",
+      s"touch $sparkProps",
+      s"sed -i.old '/spark.master.*/d' $sparkProps",
+      s"sed -i.old '/spark.submit.deployMode.*/d' $sparkProps",
+      s"echo 'spark.master=yarn-cluster' >> $sparkProps"
+    ).foreach(exec)
+
+    if (config.deployLivy) {
+      try {
+        info(s"Deploying Livy to ${config.ip}...")
+        val version = sys.props("project.version")
+        val assemblyZip = new File(s"../assembly/target/livy-server-$version.zip")
+        assert(assemblyZip.isFile,
+          s"Can't find livy assembly zip at ${assemblyZip.getCanonicalPath}")
+        val assemblyName = assemblyZip.getName()
+
+        // SSH to the node to unzip and install Livy.
+        upload(assemblyZip.getCanonicalPath, s"$tempDirPath/$assemblyName")
+        exec(s"unzip -o $tempDirPath/$assemblyName -d $tempDirPath")
+        livyHomePath = s"$tempDirPath/livy-server-$version"
+        info(s"Deployed Livy to ${config.ip} at $livyHomePath.")
+      } catch {
+        case e: Exception =>
+          error(s"Failed to deploy Livy to ${config.ip}.", e)
+          cleanUp()
+          throw e
+      }
+    } else {
+      livyHomePath = config.noDeployLivyHome.get
+      info("Skipping deployment.")
+    }
+
+    runLivy()
+  }
+
+  override def cleanUp(): Unit = {
+    stopLivy()
+    if (tempDirPath != null) {
+      exec(s"rm -rf $tempDirPath")
+    }
+    if (hdfsScratch != null) {
+      doAsClusterUser {
+        // Cannot use the shared `fs` since this runs in a shutdown hook, and that instance
+        // may have been closed already.
+        val fs = FileSystem.newInstance(hadoopConf)
+        try {
+          fs.delete(hdfsScratch, true)
+        } finally {
+          fs.close()
+        }
+      }
+    }
+  }
+
+  override def runLivy(): Unit = synchronized {
+    assert(!livyIsRunning, "Livy is running already.")
+
+    val livyConf = Map(
+      "livy.server.port" -> config.livyPort.toString,
+      // "livy.server.recovery.mode=local",
+      "livy.environment" -> "development",
+      LivyConf.LIVY_SPARK_MASTER.key -> "yarn",
+      LivyConf.LIVY_SPARK_DEPLOY_MODE.key -> "cluster")
+    val livyConfFile = File.createTempFile("livy.", ".properties")
+    saveProperties(livyConf, livyConfFile)
+    upload(livyConfFile.getAbsolutePath(), s"$livyHomePath/conf/livy.conf")
+
+    val env = Map(
+        "JAVA_HOME" -> config.javaHome,
+        "HADOOP_CONF_DIR" -> config.hadoopConf,
+        "SPARK_CONF_DIR" -> sparkConfDir,
+        "SPARK_HOME" -> config.sparkHome,
+        "CLASSPATH" -> config.livyClasspath,
+        "LIVY_PID_DIR" -> s"$tempDirPath/pid",
+        "LIVY_LOG_DIR" -> s"$tempDirPath/logs",
+        "LIVY_MAX_LOG_FILES" -> "16",
+        "LIVY_IDENT_STRING" -> "it"
+      )
+    val livyEnvFile = File.createTempFile("livy-env.", ".sh")
+    saveProperties(env, livyEnvFile)
+    upload(livyEnvFile.getAbsolutePath(), s"$livyHomePath/conf/livy-env.sh")
+
+    info(s"Starting Livy @ port ${config.livyPort}...")
+    exec(s"env -i $livyHomePath/bin/livy-server start")
+    livyIsRunning = true
+
+    val httpClient = new AsyncHttpClient()
+    eventually(timeout(1 minute), interval(1 second)) {
+      assert(httpClient.prepareGet(livyEndpoint).execute().get().getStatusCode == SC_OK)
+    }
+    info(s"Started Livy.")
+  }
+
+  override def stopLivy(): Unit = synchronized {
+    info("Stopping Livy Server")
+    try {
+      exec(s"$livyHomePath/bin/livy-server stop")
+    } catch {
+      case e: Exception =>
+        if (livyIsRunning) {
+          throw e
+        }
+    }
+
+    if (livyIsRunning) {
+      // Wait a tiny bit so that the process finishes closing its output files.
+      Thread.sleep(2)
+
+      livyEpoch += 1
+      val logName = "livy-it-server.out"
+      val localName = s"livy-it-server-$livyEpoch.out"
+      val logPath = s"$tempDirPath/logs/$logName"
+      val localLog = sys.props("java.io.tmpdir") + File.separator + localName
+      download(logPath, localLog)
+      info(s"Log for epoch $livyEpoch available at $localLog")
+    }
+  }
+
+  override def livyEndpoint: String = s"http://${config.ip}:${config.livyPort}"
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/test/scala/com/cloudera/livy/test/BatchIT.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/test/scala/com/cloudera/livy/test/BatchIT.scala b/integration-test/src/test/scala/com/cloudera/livy/test/BatchIT.scala
deleted file mode 100644
index 179239a..0000000
--- a/integration-test/src/test/scala/com/cloudera/livy/test/BatchIT.scala
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test
-
-import java.io.File
-import java.util.UUID
-
-import scala.language.postfixOps
-
-import org.apache.commons.io.IOUtils
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.OptionValues._
-
-import com.cloudera.livy.sessions.SessionState
-import com.cloudera.livy.test.apps._
-import com.cloudera.livy.test.framework.{BaseIntegrationTestSuite, LivyRestClient}
-
-class BatchIT extends BaseIntegrationTestSuite with BeforeAndAfterAll {
-  private var testLibPath: String = _
-
-  protected override def beforeAll() = {
-    super.beforeAll()
-    testLibPath = uploadToHdfs(new File(testLib))
-  }
-
-  test("submit spark app") {
-    val output = newOutputPath()
-    withTestLib(classOf[SimpleSparkApp], List(output)) { s =>
-      s.verifySessionSuccess()
-
-      // Make sure the test lib has created the test output.
-      cluster.fs.isDirectory(new Path(output)) shouldBe true
-
-      // Make sure appInfo is reported correctly.
-      val state = s.snapshot()
-      state.appInfo.sparkUiUrl.value should startWith ("http")
-    }
-  }
-
-  test("submit an app that fails") {
-    val output = newOutputPath()
-    withTestLib(classOf[FailingApp], List(output)) { s =>
-      // At this point the application has exited. State should be 'dead' instead of 'error'.
-      s.verifySessionDead()
-
-      // The file is written to make sure the app actually ran, instead of just failing for
-      // some other reason.
-      cluster.fs.isFile(new Path(output)) shouldBe true
-    }
-  }
-
-  pytest("submit a pyspark application") {
-    val scriptPath = uploadResource("batch.py")
-    val output = newOutputPath()
-    withScript(scriptPath, List(output)) { s =>
-      s.verifySessionSuccess()
-      cluster.fs.isDirectory(new Path(output)) shouldBe true
-    }
-  }
-
-  // This is disabled since R scripts don't seem to work in yarn-cluster mode. There's a
-  // TODO comment in Spark's ApplicationMaster.scala.
-  ignore("submit a SparkR application") {
-    val hdfsPath = uploadResource("rtest.R")
-    withScript(hdfsPath, List.empty) { s =>
-      s.verifySessionSuccess()
-    }
-  }
-
-  test("deleting a session should kill YARN app") {
-    val output = newOutputPath()
-    withTestLib(classOf[SimpleSparkApp], List(output, "false")) { s =>
-      s.verifySessionState(SessionState.Running())
-      s.snapshot().appInfo.driverLogUrl.value should include ("containerlogs")
-
-      val appId = s.appId()
-
-      // Delete the session then verify the YARN app state is KILLED.
-      s.stop()
-      cluster.yarnClient.getApplicationReport(appId).getFinalApplicationStatus shouldBe
-        FinalApplicationStatus.KILLED
-    }
-  }
-
-  test("killing YARN app should change batch state to dead") {
-    val output = newOutputPath()
-    withTestLib(classOf[SimpleSparkApp], List(output, "false")) { s =>
-      s.verifySessionState(SessionState.Running())
-      val appId = s.appId()
-
-      // Kill the YARN app and check batch state should be KILLED.
-      cluster.yarnClient.killApplication(appId)
-      s.verifySessionDead()
-
-      cluster.yarnClient.getApplicationReport(appId).getFinalApplicationStatus shouldBe
-        FinalApplicationStatus.KILLED
-
-      s.snapshot().log should contain ("Application killed by user.")
-    }
-  }
-
-  test("recover batch sessions") {
-    val output1 = newOutputPath()
-    val output2 = newOutputPath()
-    withTestLib(classOf[SimpleSparkApp], List(output1)) { s1 =>
-      s1.stop()
-      withTestLib(classOf[SimpleSparkApp], List(output2, "false")) { s2 =>
-        s2.verifySessionRunning()
-
-        restartLivy()
-
-        // Verify previous active session still appears after restart.
-        s2.verifySessionRunning()
-        // Verify deleted session doesn't show up.
-        s1.verifySessionDoesNotExist()
-
-        s2.stop()
-
-        // Verify new session doesn't reuse old session id.
-        withTestLib(classOf[SimpleSparkApp], List(output2)) { s3 =>
-          s3.id should be > s2.id
-        }
-      }
-    }
-  }
-
-  private def newOutputPath(): String = {
-    cluster.hdfsScratchDir().toString() + "/" + UUID.randomUUID().toString()
-  }
-
-  private def uploadResource(name: String): String = {
-    val hdfsPath = new Path(cluster.hdfsScratchDir(), UUID.randomUUID().toString + "-" + name)
-    val in = getClass.getResourceAsStream("/" + name)
-    val out = cluster.fs.create(hdfsPath)
-    try {
-      IOUtils.copy(in, out)
-    } finally {
-      in.close()
-      out.close()
-    }
-    hdfsPath.toUri().getPath()
-  }
-
-  private def withScript[R]
-    (scriptPath: String, args: List[String], sparkConf: Map[String, String] = Map.empty)
-    (f: (LivyRestClient#BatchSession) => R): R = {
-    val s = livyClient.startBatch(scriptPath, None, args, sparkConf)
-    withSession(s)(f)
-  }
-
-  private def withTestLib[R]
-    (testClass: Class[_], args: List[String], sparkConf: Map[String, String] = Map.empty)
-    (f: (LivyRestClient#BatchSession) => R): R = {
-    val s = livyClient.startBatch(testLibPath, Some(testClass.getName()), args, sparkConf)
-    withSession(s)(f)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/test/scala/com/cloudera/livy/test/InteractiveIT.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/test/scala/com/cloudera/livy/test/InteractiveIT.scala b/integration-test/src/test/scala/com/cloudera/livy/test/InteractiveIT.scala
deleted file mode 100644
index 88a995c..0000000
--- a/integration-test/src/test/scala/com/cloudera/livy/test/InteractiveIT.scala
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test
-
-import java.util.concurrent.atomic.AtomicInteger
-import java.util.regex.Pattern
-
-import org.apache.hadoop.yarn.api.records.YarnApplicationState
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.OptionValues._
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import com.cloudera.livy.rsc.RSCConf
-import com.cloudera.livy.sessions._
-import com.cloudera.livy.test.framework.{BaseIntegrationTestSuite, LivyRestClient}
-
-class InteractiveIT extends BaseIntegrationTestSuite {
-  test("basic interactive session") {
-    withNewSession(Spark()) { s =>
-      s.run("val sparkVersion = sc.version").result().left.foreach(info(_))
-      s.run("1+1").verifyResult("res0: Int = 2")
-      s.run("""sc.getConf.get("spark.executor.instances")""").verifyResult("res1: String = 1")
-      s.run("val sql = new org.apache.spark.sql.SQLContext(sc)").verifyResult(
-        ".*" + Pattern.quote(
-        "sql: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext") + ".*")
-      s.run("abcde").verifyError(evalue = ".*?:[0-9]+: error: not found: value abcde.*")
-      s.run("throw new IllegalStateException()")
-        .verifyError(evalue = ".*java\\.lang\\.IllegalStateException.*")
-
-      // Verify Livy internal configurations are not exposed.
-      // TODO separate all these checks to different sub tests after merging new IT code.
-      s.run("""sc.getConf.getAll.exists(_._1.startsWith("spark.__livy__."))""")
-        .verifyResult(".*false")
-      s.run("""sys.props.exists(_._1.startsWith("spark.__livy__."))""").verifyResult(".*false")
-
-      // Make sure appInfo is reported correctly.
-      val state = s.snapshot()
-      state.appInfo.driverLogUrl.value should include ("containerlogs")
-      state.appInfo.sparkUiUrl.value should startWith ("http")
-
-      // Stop session and verify the YARN app state is finished.
-      // This is important because if YARN app state is killed, Spark history is not archived.
-      val appId = s.appId()
-      s.stop()
-      val appReport = cluster.yarnClient.getApplicationReport(appId)
-      appReport.getYarnApplicationState() shouldEqual YarnApplicationState.FINISHED
-    }
-  }
-
-  pytest("pyspark interactive session") {
-    withNewSession(PySpark()) { s =>
-      s.run("1+1").verifyResult("2")
-      s.run("sqlContext").verifyResult(startsWith("<pyspark.sql.context.HiveContext"))
-      s.run("sc.parallelize(range(100)).map(lambda x: x * 2).reduce(lambda x, y: x + y)")
-        .verifyResult("9900")
-      s.run("from pyspark.sql.types import Row").verifyResult("")
-      s.run("x = [Row(age=1, name=u'a'), Row(age=2, name=u'b'), Row(age=3, name=u'c')]")
-        .verifyResult("")
-      s.run("%table x").verifyResult(".*headers.*type.*name.*data.*")
-      s.run("abcde").verifyError(ename = "NameError", evalue = "name 'abcde' is not defined")
-      s.run("raise KeyError, 'foo'").verifyError(ename = "KeyError", evalue = "'foo'")
-    }
-  }
-
-  rtest("R interactive session") {
-    withNewSession(SparkR()) { s =>
-      // R's output sometimes includes the count of statements, which makes it annoying to test
-      // things. This helps a bit.
-      val curr = new AtomicInteger()
-      def count: Int = curr.incrementAndGet()
-
-      s.run("1+1").verifyResult(startsWith(s"[$count] 2"))
-      s.run("sqlContext <- sparkRSQL.init(sc)").verifyResult(null)
-      s.run("hiveContext <- sparkRHive.init(sc)").verifyResult(null)
-      s.run("""localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18))""")
-        .verifyResult(null)
-      s.run("df <- createDataFrame(sqlContext, localDF)").verifyResult(null)
-      s.run("printSchema(df)").verifyResult(literal(
-        """|root
-          | |-- name: string (nullable = true)
-          | |-- age: double (nullable = true)""".stripMargin))
-    }
-  }
-
-  test("application kills session") {
-    withNewSession(Spark()) { s =>
-      s.runFatalStatement("System.exit(0)")
-    }
-  }
-
-  test("should kill RSCDriver if it doesn't respond to end session") {
-    val testConfName = s"${RSCConf.LIVY_SPARK_PREFIX}${RSCConf.Entry.TEST_STUCK_END_SESSION.key()}"
-    withNewSession(Spark(), Map(testConfName -> "true")) { s =>
-      val appId = s.appId()
-      s.stop()
-      val appReport = cluster.yarnClient.getApplicationReport(appId)
-      appReport.getYarnApplicationState() shouldBe YarnApplicationState.KILLED
-    }
-  }
-
-  test("should kill RSCDriver if it didn't register itself in time") {
-    val testConfName =
-      s"${RSCConf.LIVY_SPARK_PREFIX}${RSCConf.Entry.TEST_STUCK_START_DRIVER.key()}"
-    withNewSession(Spark(), Map(testConfName -> "true"), false) { s =>
-      eventually(timeout(2 minutes), interval(5 seconds)) {
-        val appId = s.appId()
-        appId should not be null
-        val appReport = cluster.yarnClient.getApplicationReport(appId)
-        appReport.getYarnApplicationState() shouldBe YarnApplicationState.KILLED
-      }
-    }
-  }
-
-  test("user jars are properly imported in Scala interactive sessions") {
-    // Include a popular Java library to test importing user jars.
-    val sparkConf = Map("spark.jars.packages" -> "org.codehaus.plexus:plexus-utils:3.0.24")
-    withNewSession(Spark(), sparkConf) { s =>
-      // Check is the library loaded in JVM in the proper class loader.
-      s.run("Thread.currentThread.getContextClassLoader.loadClass" +
-          """("org.codehaus.plexus.util.FileUtils")""")
-        .verifyResult(".*Class\\[_\\] = class org.codehaus.plexus.util.FileUtils")
-
-      // Check does Scala interpreter see the library.
-      s.run("import org.codehaus.plexus.util._").verifyResult("import org.codehaus.plexus.util._")
-
-      // Check does SparkContext see classes defined by Scala interpreter.
-      s.run("case class Item(i: Int)").verifyResult("defined class Item")
-      s.run("val rdd = sc.parallelize(Array.fill(10){new Item(scala.util.Random.nextInt(1000))})")
-        .verifyResult("rdd.*")
-      s.run("rdd.count()").verifyResult(".*= 10")
-    }
-  }
-
-  test("heartbeat should kill expired session") {
-    // Set it to 2s because verifySessionIdle() is calling GET every second.
-    val heartbeatTimeout = Duration.create("2s")
-    withNewSession(Spark(), Map.empty, true, heartbeatTimeout.toSeconds.toInt) { s =>
-      // If the test reaches here, that means verifySessionIdle() is successfully keeping the
-      // session alive. Now verify heartbeat is killing expired session.
-      Thread.sleep(heartbeatTimeout.toMillis * 2)
-      s.verifySessionDoesNotExist()
-    }
-  }
-
-  test("recover interactive session") {
-    withNewSession(Spark()) { s =>
-      val stmt1 = s.run("1")
-      stmt1.verifyResult("res0: Int = 1")
-
-      restartLivy()
-
-      // Verify session still exists.
-      s.verifySessionIdle()
-      s.run("2").verifyResult("res1: Int = 2")
-      // Verify statement result is preserved.
-      stmt1.verifyResult("res0: Int = 1")
-
-      s.stop()
-
-      restartLivy()
-
-      // Verify deleted session doesn't show up after recovery.
-      s.verifySessionDoesNotExist()
-
-      // Verify new session doesn't reuse old session id.
-      withNewSession(Spark(), Map.empty, false) { s1 =>
-        s1.id should be > s.id
-      }
-    }
-  }
-
-  private def withNewSession[R] (
-      kind: Kind,
-      sparkConf: Map[String, String] = Map.empty,
-      waitForIdle: Boolean = true,
-      heartbeatTimeoutInSecond: Int = 0)
-    (f: (LivyRestClient#InteractiveSession) => R): R = {
-    withSession(livyClient.startSession(kind, sparkConf, heartbeatTimeoutInSecond)) { s =>
-      if (waitForIdle) {
-        s.verifySessionIdle()
-      }
-      f(s)
-    }
-  }
-
-  private def startsWith(result: String): String = Pattern.quote(result) + ".*"
-
-  private def literal(result: String): String = Pattern.quote(result)
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/test/scala/com/cloudera/livy/test/JobApiIT.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/test/scala/com/cloudera/livy/test/JobApiIT.scala b/integration-test/src/test/scala/com/cloudera/livy/test/JobApiIT.scala
deleted file mode 100644
index 87dcfe1..0000000
--- a/integration-test/src/test/scala/com/cloudera/livy/test/JobApiIT.scala
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test
-
-import java.io.{File, InputStream}
-import java.net.URI
-import java.nio.charset.StandardCharsets.UTF_8
-import java.nio.file.{Files, StandardCopyOption}
-import java.util.concurrent.{Future => JFuture, TimeUnit}
-import javax.servlet.http.HttpServletResponse
-
-import scala.collection.JavaConverters._
-import scala.util.{Properties, Try}
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.scalatest.BeforeAndAfterAll
-
-import com.cloudera.livy._
-import com.cloudera.livy.client.common.HttpMessages._
-import com.cloudera.livy.sessions.SessionKindModule
-import com.cloudera.livy.test.framework.BaseIntegrationTestSuite
-import com.cloudera.livy.test.jobs._
-import com.cloudera.livy.utils.LivySparkUtils
-
-// Proper type representing the return value of "GET /sessions". At some point we should make
-// SessionServlet use something like this.
-class SessionList {
-  val from: Int = -1
-  val total: Int = -1
-  val sessions: List[SessionInfo] = Nil
-}
-
-class JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll with Logging {
-
-  private var client: LivyClient = _
-  private var sessionId: Int = _
-  private var client2: LivyClient = _
-  private val mapper = new ObjectMapper()
-    .registerModule(DefaultScalaModule)
-    .registerModule(new SessionKindModule())
-
-  override def afterAll(): Unit = {
-    super.afterAll()
-    Seq(client, client2).foreach { c =>
-      if (c != null) {
-        c.stop(true)
-      }
-    }
-
-    livyClient.connectSession(sessionId).stop()
-  }
-
-  test("create a new session and upload test jar") {
-    val tempClient = createClient(livyEndpoint)
-
-    try {
-      // Figure out the session ID by poking at the REST endpoint. We should probably expose this
-      // in the Java API.
-      val list = sessionList()
-      assert(list.total === 1)
-      val tempSessionId = list.sessions(0).id
-
-      livyClient.connectSession(tempSessionId).verifySessionIdle()
-      waitFor(tempClient.uploadJar(new File(testLib)))
-
-      client = tempClient
-      sessionId = tempSessionId
-    } finally {
-      if (client == null) {
-        try {
-          tempClient.stop(true)
-        } catch {
-          case e: Exception => warn("Error stopping client.", e)
-        }
-      }
-    }
-  }
-
-  test("upload file") {
-    assume(client != null, "Client not active.")
-
-    val file = Files.createTempFile("filetest", ".txt")
-    Files.write(file, "hello".getBytes(UTF_8))
-
-    waitFor(client.uploadFile(file.toFile()))
-
-    val result = waitFor(client.submit(new FileReader(file.toFile().getName(), false)))
-    assert(result === "hello")
-  }
-
-  test("add file from HDFS") {
-    assume(client != null, "Client not active.")
-    val file = Files.createTempFile("filetest2", ".txt")
-    Files.write(file, "hello".getBytes(UTF_8))
-
-    val uri = new URI(uploadToHdfs(file.toFile()))
-    waitFor(client.addFile(uri))
-
-    val task = new FileReader(new File(uri.getPath()).getName(), false)
-    val result = waitFor(client.submit(task))
-    assert(result === "hello")
-  }
-
-  test("run simple jobs") {
-    assume(client != null, "Client not active.")
-
-    val result = waitFor(client.submit(new Echo("hello")))
-    assert(result === "hello")
-
-    val result2 = waitFor(client.run(new Echo("hello")))
-    assert(result2 === "hello")
-  }
-
-  test("run spark job") {
-    assume(client != null, "Client not active.")
-    val result = waitFor(client.submit(new SmallCount(100)))
-    assert(result === 100)
-  }
-
-  test("run spark sql job") {
-    assume(client != null, "Client not active.")
-    val result = waitFor(client.submit(new SQLGetTweets(false)))
-    assert(result.size() > 0)
-  }
-
-  test("stop a client without destroying the session") {
-    assume(client != null, "Client not active.")
-    client.stop(false)
-    client = null
-  }
-
-  test("connect to an existing session") {
-    livyClient.connectSession(sessionId).verifySessionIdle()
-    val sessionUri = s"$livyEndpoint/sessions/$sessionId"
-    client2 = createClient(sessionUri)
-  }
-
-  test("submit job using new client") {
-    assume(client2 != null, "Client not active.")
-    val result = waitFor(client2.submit(new Echo("hello")))
-    assert(result === "hello")
-  }
-
-  scalaTest("run scala jobs") {
-    assume(client2 != null, "Client not active.")
-
-    val jobs = Seq(
-      new ScalaEcho("abcde"),
-      new ScalaEcho(Seq(1, 2, 3, 4)),
-      new ScalaEcho(Map(1 -> 2, 3 -> 4)),
-      new ScalaEcho(ValueHolder("abcde")),
-      new ScalaEcho(ValueHolder(Seq(1, 2, 3, 4))),
-      new ScalaEcho(Some("abcde"))
-    )
-
-    jobs.foreach { job =>
-      val result = waitFor(client2.submit(job))
-      assert(result === job.value)
-    }
-  }
-
-  protected def scalaTest(desc: String)(testFn: => Unit): Unit = {
-    test(desc) {
-      assume(sys.env("LIVY_SPARK_SCALA_VERSION") ==
-        LivySparkUtils.formatScalaVersion(Properties.versionNumberString),
-        s"Scala test can only be run with ${Properties.versionString}")
-      testFn
-    }
-  }
-
-  test("ensure failing jobs do not affect session state") {
-    assume(client2 != null, "Client not active.")
-
-    try {
-      waitFor(client2.submit(new Failure()))
-      fail("Job should have failued.")
-    } catch {
-      case e: Exception =>
-        assert(e.getMessage().contains(classOf[Failure.JobFailureException].getName()))
-    }
-
-    val result = waitFor(client2.submit(new Echo("foo")))
-    assert(result === "foo")
-  }
-
-  test("return null should not throw NPE") {
-    assume(client2 != null, "Client not active")
-
-    val result = waitFor(client2.submit(new VoidJob()))
-    assert(result === null)
-  }
-
-  test("destroy the session") {
-    assume(client2 != null, "Client not active.")
-    client2.stop(true)
-
-    val list = sessionList()
-    assert(list.total === 0)
-
-    val sessionUri = s"$livyEndpoint/sessions/$sessionId"
-    Try(createClient(sessionUri)).foreach { client =>
-      client.stop(true)
-      fail("Should not have been able to connect to destroyed session.")
-    }
-
-    sessionId = -1
-  }
-
-  pytest("validate Python-API requests") {
-    val addFileContent = "hello from addfile"
-    val addFilePath = createTempFilesForTest("add_file", ".txt", addFileContent, true)
-    val addPyFileContent = "def test_add_pyfile(): return \"hello from addpyfile\""
-    val addPyFilePath = createTempFilesForTest("add_pyfile", ".py", addPyFileContent, true)
-    val uploadFileContent = "hello from uploadfile"
-    val uploadFilePath = createTempFilesForTest("upload_pyfile", ".py", uploadFileContent, false)
-    val uploadPyFileContent = "def test_upload_pyfile(): return \"hello from uploadpyfile\""
-    val uploadPyFilePath = createTempFilesForTest("upload_pyfile", ".py",
-      uploadPyFileContent, false)
-
-    val builder = new ProcessBuilder(Seq("python", createPyTestsForPythonAPI().toString).asJava)
-
-    val env = builder.environment()
-    env.put("LIVY_END_POINT", livyEndpoint)
-    env.put("ADD_FILE_URL", addFilePath)
-    env.put("ADD_PYFILE_URL", addPyFilePath)
-    env.put("UPLOAD_FILE_URL", uploadFilePath)
-    env.put("UPLOAD_PYFILE_URL", uploadPyFilePath)
-
-    builder.redirectOutput(new File(sys.props("java.io.tmpdir") + "/pytest_results.txt"))
-    builder.redirectErrorStream(true)
-
-    val process = builder.start()
-
-    process.waitFor()
-
-    assert(process.exitValue() === 0)
-  }
-
-  private def createPyTestsForPythonAPI(): File = {
-    var source: InputStream = null
-    try {
-      source = getClass.getClassLoader.getResourceAsStream("test_python_api.py")
-      val file = Files.createTempFile("", "").toFile
-      Files.copy(source, file.toPath, StandardCopyOption.REPLACE_EXISTING)
-      file
-    } finally {
-      source.close()
-    }
-  }
-
-  private def createTempFilesForTest(
-      fileName: String,
-      fileExtension: String,
-      fileContent: String,
-      uploadFileToHdfs: Boolean): String = {
-    val path = Files.createTempFile(fileName, fileExtension)
-    Files.write(path, fileContent.getBytes(UTF_8))
-    if (uploadFileToHdfs) {
-      uploadToHdfs(path.toFile())
-    } else {
-      path.toString
-    }
-  }
-
-  private def waitFor[T](future: JFuture[T]): T = {
-    future.get(30, TimeUnit.SECONDS)
-  }
-
-  private def sessionList(): SessionList = {
-    val response = httpClient.prepareGet(s"$livyEndpoint/sessions/").execute().get()
-    assert(response.getStatusCode === HttpServletResponse.SC_OK)
-    mapper.readValue(response.getResponseBodyAsStream, classOf[SessionList])
-  }
-
-  private def createClient(uri: String): LivyClient = {
-    new LivyClientBuilder().setURI(new URI(uri)).build()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala b/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala
new file mode 100644
index 0000000..0b3a061
--- /dev/null
+++ b/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.test
+
+import java.io.File
+import java.util.UUID
+
+import scala.language.postfixOps
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.OptionValues._
+
+import org.apache.livy.sessions.SessionState
+import org.apache.livy.test.apps._
+import org.apache.livy.test.framework.{BaseIntegrationTestSuite, LivyRestClient}
+
+class BatchIT extends BaseIntegrationTestSuite with BeforeAndAfterAll {
+  private var testLibPath: String = _
+
+  protected override def beforeAll() = {
+    super.beforeAll()
+    testLibPath = uploadToHdfs(new File(testLib))
+  }
+
+  test("submit spark app") {
+    val output = newOutputPath()
+    withTestLib(classOf[SimpleSparkApp], List(output)) { s =>
+      s.verifySessionSuccess()
+
+      // Make sure the test lib has created the test output.
+      cluster.fs.isDirectory(new Path(output)) shouldBe true
+
+      // Make sure appInfo is reported correctly.
+      val state = s.snapshot()
+      state.appInfo.sparkUiUrl.value should startWith ("http")
+    }
+  }
+
+  test("submit an app that fails") {
+    val output = newOutputPath()
+    withTestLib(classOf[FailingApp], List(output)) { s =>
+      // At this point the application has exited. State should be 'dead' instead of 'error'.
+      s.verifySessionDead()
+
+      // The file is written to make sure the app actually ran, instead of just failing for
+      // some other reason.
+      cluster.fs.isFile(new Path(output)) shouldBe true
+    }
+  }
+
+  pytest("submit a pyspark application") {
+    val scriptPath = uploadResource("batch.py")
+    val output = newOutputPath()
+    withScript(scriptPath, List(output)) { s =>
+      s.verifySessionSuccess()
+      cluster.fs.isDirectory(new Path(output)) shouldBe true
+    }
+  }
+
+  // This is disabled since R scripts don't seem to work in yarn-cluster mode. There's a
+  // TODO comment in Spark's ApplicationMaster.scala.
+  ignore("submit a SparkR application") {
+    val hdfsPath = uploadResource("rtest.R")
+    withScript(hdfsPath, List.empty) { s =>
+      s.verifySessionSuccess()
+    }
+  }
+
+  test("deleting a session should kill YARN app") {
+    val output = newOutputPath()
+    withTestLib(classOf[SimpleSparkApp], List(output, "false")) { s =>
+      s.verifySessionState(SessionState.Running())
+      s.snapshot().appInfo.driverLogUrl.value should include ("containerlogs")
+
+      val appId = s.appId()
+
+      // Delete the session then verify the YARN app state is KILLED.
+      s.stop()
+      cluster.yarnClient.getApplicationReport(appId).getFinalApplicationStatus shouldBe
+        FinalApplicationStatus.KILLED
+    }
+  }
+
+  test("killing YARN app should change batch state to dead") {
+    val output = newOutputPath()
+    withTestLib(classOf[SimpleSparkApp], List(output, "false")) { s =>
+      s.verifySessionState(SessionState.Running())
+      val appId = s.appId()
+
+      // Kill the YARN app and check batch state should be KILLED.
+      cluster.yarnClient.killApplication(appId)
+      s.verifySessionDead()
+
+      cluster.yarnClient.getApplicationReport(appId).getFinalApplicationStatus shouldBe
+        FinalApplicationStatus.KILLED
+
+      s.snapshot().log should contain ("Application killed by user.")
+    }
+  }
+
+  test("recover batch sessions") {
+    val output1 = newOutputPath()
+    val output2 = newOutputPath()
+    withTestLib(classOf[SimpleSparkApp], List(output1)) { s1 =>
+      s1.stop()
+      withTestLib(classOf[SimpleSparkApp], List(output2, "false")) { s2 =>
+        s2.verifySessionRunning()
+
+        restartLivy()
+
+        // Verify previous active session still appears after restart.
+        s2.verifySessionRunning()
+        // Verify deleted session doesn't show up.
+        s1.verifySessionDoesNotExist()
+
+        s2.stop()
+
+        // Verify new session doesn't reuse old session id.
+        withTestLib(classOf[SimpleSparkApp], List(output2)) { s3 =>
+          s3.id should be > s2.id
+        }
+      }
+    }
+  }
+
+  private def newOutputPath(): String = {
+    cluster.hdfsScratchDir().toString() + "/" + UUID.randomUUID().toString()
+  }
+
+  private def uploadResource(name: String): String = {
+    val hdfsPath = new Path(cluster.hdfsScratchDir(), UUID.randomUUID().toString + "-" + name)
+    val in = getClass.getResourceAsStream("/" + name)
+    val out = cluster.fs.create(hdfsPath)
+    try {
+      IOUtils.copy(in, out)
+    } finally {
+      in.close()
+      out.close()
+    }
+    hdfsPath.toUri().getPath()
+  }
+
+  private def withScript[R]
+    (scriptPath: String, args: List[String], sparkConf: Map[String, String] = Map.empty)
+    (f: (LivyRestClient#BatchSession) => R): R = {
+    val s = livyClient.startBatch(scriptPath, None, args, sparkConf)
+    withSession(s)(f)
+  }
+
+  private def withTestLib[R]
+    (testClass: Class[_], args: List[String], sparkConf: Map[String, String] = Map.empty)
+    (f: (LivyRestClient#BatchSession) => R): R = {
+    val s = livyClient.startBatch(testLibPath, Some(testClass.getName()), args, sparkConf)
+    withSession(s)(f)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala b/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala
new file mode 100644
index 0000000..728fd1c
--- /dev/null
+++ b/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.test
+
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.regex.Pattern
+
+import org.apache.hadoop.yarn.api.records.YarnApplicationState
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.OptionValues._
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.apache.livy.rsc.RSCConf
+import org.apache.livy.sessions._
+import org.apache.livy.test.framework.{BaseIntegrationTestSuite, LivyRestClient}
+
+class InteractiveIT extends BaseIntegrationTestSuite {
+  test("basic interactive session") {
+    withNewSession(Spark()) { s =>
+      s.run("val sparkVersion = sc.version").result().left.foreach(info(_))
+      s.run("1+1").verifyResult("res0: Int = 2")
+      s.run("""sc.getConf.get("spark.executor.instances")""").verifyResult("res1: String = 1")
+      s.run("val sql = new org.apache.spark.sql.SQLContext(sc)").verifyResult(
+        ".*" + Pattern.quote(
+        "sql: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext") + ".*")
+      s.run("abcde").verifyError(evalue = ".*?:[0-9]+: error: not found: value abcde.*")
+      s.run("throw new IllegalStateException()")
+        .verifyError(evalue = ".*java\\.lang\\.IllegalStateException.*")
+
+      // Verify Livy internal configurations are not exposed.
+      // TODO separate all these checks to different sub tests after merging new IT code.
+      s.run("""sc.getConf.getAll.exists(_._1.startsWith("spark.__livy__."))""")
+        .verifyResult(".*false")
+      s.run("""sys.props.exists(_._1.startsWith("spark.__livy__."))""").verifyResult(".*false")
+
+      // Make sure appInfo is reported correctly.
+      val state = s.snapshot()
+      state.appInfo.driverLogUrl.value should include ("containerlogs")
+      state.appInfo.sparkUiUrl.value should startWith ("http")
+
+      // Stop session and verify the YARN app state is finished.
+      // This is important because if YARN app state is killed, Spark history is not archived.
+      val appId = s.appId()
+      s.stop()
+      val appReport = cluster.yarnClient.getApplicationReport(appId)
+      appReport.getYarnApplicationState() shouldEqual YarnApplicationState.FINISHED
+    }
+  }
+
+  pytest("pyspark interactive session") {
+    withNewSession(PySpark()) { s =>
+      s.run("1+1").verifyResult("2")
+      s.run("sqlContext").verifyResult(startsWith("<pyspark.sql.context.HiveContext"))
+      s.run("sc.parallelize(range(100)).map(lambda x: x * 2).reduce(lambda x, y: x + y)")
+        .verifyResult("9900")
+      s.run("from pyspark.sql.types import Row").verifyResult("")
+      s.run("x = [Row(age=1, name=u'a'), Row(age=2, name=u'b'), Row(age=3, name=u'c')]")
+        .verifyResult("")
+      s.run("%table x").verifyResult(".*headers.*type.*name.*data.*")
+      s.run("abcde").verifyError(ename = "NameError", evalue = "name 'abcde' is not defined")
+      s.run("raise KeyError, 'foo'").verifyError(ename = "KeyError", evalue = "'foo'")
+    }
+  }
+
+  rtest("R interactive session") {
+    withNewSession(SparkR()) { s =>
+      // R's output sometimes includes the count of statements, which makes it annoying to test
+      // things. This helps a bit.
+      val curr = new AtomicInteger()
+      def count: Int = curr.incrementAndGet()
+
+      s.run("1+1").verifyResult(startsWith(s"[$count] 2"))
+      s.run("sqlContext <- sparkRSQL.init(sc)").verifyResult(null)
+      s.run("hiveContext <- sparkRHive.init(sc)").verifyResult(null)
+      s.run("""localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18))""")
+        .verifyResult(null)
+      s.run("df <- createDataFrame(sqlContext, localDF)").verifyResult(null)
+      s.run("printSchema(df)").verifyResult(literal(
+        """|root
+          | |-- name: string (nullable = true)
+          | |-- age: double (nullable = true)""".stripMargin))
+    }
+  }
+
+  test("application kills session") {
+    withNewSession(Spark()) { s =>
+      s.runFatalStatement("System.exit(0)")
+    }
+  }
+
+  test("should kill RSCDriver if it doesn't respond to end session") {
+    val testConfName = s"${RSCConf.LIVY_SPARK_PREFIX}${RSCConf.Entry.TEST_STUCK_END_SESSION.key()}"
+    withNewSession(Spark(), Map(testConfName -> "true")) { s =>
+      val appId = s.appId()
+      s.stop()
+      val appReport = cluster.yarnClient.getApplicationReport(appId)
+      appReport.getYarnApplicationState() shouldBe YarnApplicationState.KILLED
+    }
+  }
+
+  test("should kill RSCDriver if it didn't register itself in time") {
+    val testConfName =
+      s"${RSCConf.LIVY_SPARK_PREFIX}${RSCConf.Entry.TEST_STUCK_START_DRIVER.key()}"
+    withNewSession(Spark(), Map(testConfName -> "true"), false) { s =>
+      eventually(timeout(2 minutes), interval(5 seconds)) {
+        val appId = s.appId()
+        appId should not be null
+        val appReport = cluster.yarnClient.getApplicationReport(appId)
+        appReport.getYarnApplicationState() shouldBe YarnApplicationState.KILLED
+      }
+    }
+  }
+
+  test("user jars are properly imported in Scala interactive sessions") {
+    // Include a popular Java library to test importing user jars.
+    val sparkConf = Map("spark.jars.packages" -> "org.codehaus.plexus:plexus-utils:3.0.24")
+    withNewSession(Spark(), sparkConf) { s =>
+      // Check is the library loaded in JVM in the proper class loader.
+      s.run("Thread.currentThread.getContextClassLoader.loadClass" +
+          """("org.codehaus.plexus.util.FileUtils")""")
+        .verifyResult(".*Class\\[_\\] = class org.codehaus.plexus.util.FileUtils")
+
+      // Check does Scala interpreter see the library.
+      s.run("import org.codehaus.plexus.util._").verifyResult("import org.codehaus.plexus.util._")
+
+      // Check does SparkContext see classes defined by Scala interpreter.
+      s.run("case class Item(i: Int)").verifyResult("defined class Item")
+      s.run("val rdd = sc.parallelize(Array.fill(10){new Item(scala.util.Random.nextInt(1000))})")
+        .verifyResult("rdd.*")
+      s.run("rdd.count()").verifyResult(".*= 10")
+    }
+  }
+
+  test("heartbeat should kill expired session") {
+    // Set it to 2s because verifySessionIdle() is calling GET every second.
+    val heartbeatTimeout = Duration.create("2s")
+    withNewSession(Spark(), Map.empty, true, heartbeatTimeout.toSeconds.toInt) { s =>
+      // If the test reaches here, that means verifySessionIdle() is successfully keeping the
+      // session alive. Now verify heartbeat is killing expired session.
+      Thread.sleep(heartbeatTimeout.toMillis * 2)
+      s.verifySessionDoesNotExist()
+    }
+  }
+
+  test("recover interactive session") {
+    withNewSession(Spark()) { s =>
+      val stmt1 = s.run("1")
+      stmt1.verifyResult("res0: Int = 1")
+
+      restartLivy()
+
+      // Verify session still exists.
+      s.verifySessionIdle()
+      s.run("2").verifyResult("res1: Int = 2")
+      // Verify statement result is preserved.
+      stmt1.verifyResult("res0: Int = 1")
+
+      s.stop()
+
+      restartLivy()
+
+      // Verify deleted session doesn't show up after recovery.
+      s.verifySessionDoesNotExist()
+
+      // Verify new session doesn't reuse old session id.
+      withNewSession(Spark(), Map.empty, false) { s1 =>
+        s1.id should be > s.id
+      }
+    }
+  }
+
+  private def withNewSession[R] (
+      kind: Kind,
+      sparkConf: Map[String, String] = Map.empty,
+      waitForIdle: Boolean = true,
+      heartbeatTimeoutInSecond: Int = 0)
+    (f: (LivyRestClient#InteractiveSession) => R): R = {
+    withSession(livyClient.startSession(kind, sparkConf, heartbeatTimeoutInSecond)) { s =>
+      if (waitForIdle) {
+        s.verifySessionIdle()
+      }
+      f(s)
+    }
+  }
+
+  private def startsWith(result: String): String = Pattern.quote(result) + ".*"
+
+  private def literal(result: String): String = Pattern.quote(result)
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/test/scala/org/apache/livy/test/JobApiIT.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/test/scala/org/apache/livy/test/JobApiIT.scala b/integration-test/src/test/scala/org/apache/livy/test/JobApiIT.scala
new file mode 100644
index 0000000..becaaac
--- /dev/null
+++ b/integration-test/src/test/scala/org/apache/livy/test/JobApiIT.scala
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.test
+
+import java.io.{File, InputStream}
+import java.net.URI
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, StandardCopyOption}
+import java.util.concurrent.{Future => JFuture, TimeUnit}
+import javax.servlet.http.HttpServletResponse
+
+import scala.collection.JavaConverters._
+import scala.util.{Properties, Try}
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.livy._
+import org.apache.livy.client.common.HttpMessages._
+import org.apache.livy.sessions.SessionKindModule
+import org.apache.livy.test.framework.BaseIntegrationTestSuite
+import org.apache.livy.test.jobs._
+import org.apache.livy.utils.LivySparkUtils
+
+// Proper type representing the return value of "GET /sessions". At some point we should make
+// SessionServlet use something like this.
+class SessionList {
+  val from: Int = -1
+  val total: Int = -1
+  val sessions: List[SessionInfo] = Nil
+}
+
+class JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll with Logging {
+
+  private var client: LivyClient = _
+  private var sessionId: Int = _
+  private var client2: LivyClient = _
+  private val mapper = new ObjectMapper()
+    .registerModule(DefaultScalaModule)
+    .registerModule(new SessionKindModule())
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    Seq(client, client2).foreach { c =>
+      if (c != null) {
+        c.stop(true)
+      }
+    }
+
+    livyClient.connectSession(sessionId).stop()
+  }
+
+  test("create a new session and upload test jar") {
+    val tempClient = createClient(livyEndpoint)
+
+    try {
+      // Figure out the session ID by poking at the REST endpoint. We should probably expose this
+      // in the Java API.
+      val list = sessionList()
+      assert(list.total === 1)
+      val tempSessionId = list.sessions(0).id
+
+      livyClient.connectSession(tempSessionId).verifySessionIdle()
+      waitFor(tempClient.uploadJar(new File(testLib)))
+
+      client = tempClient
+      sessionId = tempSessionId
+    } finally {
+      if (client == null) {
+        try {
+          tempClient.stop(true)
+        } catch {
+          case e: Exception => warn("Error stopping client.", e)
+        }
+      }
+    }
+  }
+
+  test("upload file") {
+    assume(client != null, "Client not active.")
+
+    val file = Files.createTempFile("filetest", ".txt")
+    Files.write(file, "hello".getBytes(UTF_8))
+
+    waitFor(client.uploadFile(file.toFile()))
+
+    val result = waitFor(client.submit(new FileReader(file.toFile().getName(), false)))
+    assert(result === "hello")
+  }
+
+  test("add file from HDFS") {
+    assume(client != null, "Client not active.")
+    val file = Files.createTempFile("filetest2", ".txt")
+    Files.write(file, "hello".getBytes(UTF_8))
+
+    val uri = new URI(uploadToHdfs(file.toFile()))
+    waitFor(client.addFile(uri))
+
+    val task = new FileReader(new File(uri.getPath()).getName(), false)
+    val result = waitFor(client.submit(task))
+    assert(result === "hello")
+  }
+
+  test("run simple jobs") {
+    assume(client != null, "Client not active.")
+
+    val result = waitFor(client.submit(new Echo("hello")))
+    assert(result === "hello")
+
+    val result2 = waitFor(client.run(new Echo("hello")))
+    assert(result2 === "hello")
+  }
+
+  test("run spark job") {
+    assume(client != null, "Client not active.")
+    val result = waitFor(client.submit(new SmallCount(100)))
+    assert(result === 100)
+  }
+
+  test("run spark sql job") {
+    assume(client != null, "Client not active.")
+    val result = waitFor(client.submit(new SQLGetTweets(false)))
+    assert(result.size() > 0)
+  }
+
+  test("stop a client without destroying the session") {
+    assume(client != null, "Client not active.")
+    client.stop(false)
+    client = null
+  }
+
+  test("connect to an existing session") {
+    livyClient.connectSession(sessionId).verifySessionIdle()
+    val sessionUri = s"$livyEndpoint/sessions/$sessionId"
+    client2 = createClient(sessionUri)
+  }
+
+  test("submit job using new client") {
+    assume(client2 != null, "Client not active.")
+    val result = waitFor(client2.submit(new Echo("hello")))
+    assert(result === "hello")
+  }
+
+  scalaTest("run scala jobs") {
+    assume(client2 != null, "Client not active.")
+
+    val jobs = Seq(
+      new ScalaEcho("abcde"),
+      new ScalaEcho(Seq(1, 2, 3, 4)),
+      new ScalaEcho(Map(1 -> 2, 3 -> 4)),
+      new ScalaEcho(ValueHolder("abcde")),
+      new ScalaEcho(ValueHolder(Seq(1, 2, 3, 4))),
+      new ScalaEcho(Some("abcde"))
+    )
+
+    jobs.foreach { job =>
+      val result = waitFor(client2.submit(job))
+      assert(result === job.value)
+    }
+  }
+
+  protected def scalaTest(desc: String)(testFn: => Unit): Unit = {
+    test(desc) {
+      assume(sys.env("LIVY_SPARK_SCALA_VERSION") ==
+        LivySparkUtils.formatScalaVersion(Properties.versionNumberString),
+        s"Scala test can only be run with ${Properties.versionString}")
+      testFn
+    }
+  }
+
+  test("ensure failing jobs do not affect session state") {
+    assume(client2 != null, "Client not active.")
+
+    try {
+      waitFor(client2.submit(new Failure()))
+      fail("Job should have failued.")
+    } catch {
+      case e: Exception =>
+        assert(e.getMessage().contains(classOf[Failure.JobFailureException].getName()))
+    }
+
+    val result = waitFor(client2.submit(new Echo("foo")))
+    assert(result === "foo")
+  }
+
+  test("return null should not throw NPE") {
+    assume(client2 != null, "Client not active")
+
+    val result = waitFor(client2.submit(new VoidJob()))
+    assert(result === null)
+  }
+
+  test("destroy the session") {
+    assume(client2 != null, "Client not active.")
+    client2.stop(true)
+
+    val list = sessionList()
+    assert(list.total === 0)
+
+    val sessionUri = s"$livyEndpoint/sessions/$sessionId"
+    Try(createClient(sessionUri)).foreach { client =>
+      client.stop(true)
+      fail("Should not have been able to connect to destroyed session.")
+    }
+
+    sessionId = -1
+  }
+
+  pytest("validate Python-API requests") {
+    val addFileContent = "hello from addfile"
+    val addFilePath = createTempFilesForTest("add_file", ".txt", addFileContent, true)
+    val addPyFileContent = "def test_add_pyfile(): return \"hello from addpyfile\""
+    val addPyFilePath = createTempFilesForTest("add_pyfile", ".py", addPyFileContent, true)
+    val uploadFileContent = "hello from uploadfile"
+    val uploadFilePath = createTempFilesForTest("upload_pyfile", ".py", uploadFileContent, false)
+    val uploadPyFileContent = "def test_upload_pyfile(): return \"hello from uploadpyfile\""
+    val uploadPyFilePath = createTempFilesForTest("upload_pyfile", ".py",
+      uploadPyFileContent, false)
+
+    val builder = new ProcessBuilder(Seq("python", createPyTestsForPythonAPI().toString).asJava)
+
+    val env = builder.environment()
+    env.put("LIVY_END_POINT", livyEndpoint)
+    env.put("ADD_FILE_URL", addFilePath)
+    env.put("ADD_PYFILE_URL", addPyFilePath)
+    env.put("UPLOAD_FILE_URL", uploadFilePath)
+    env.put("UPLOAD_PYFILE_URL", uploadPyFilePath)
+
+    builder.redirectOutput(new File(sys.props("java.io.tmpdir") + "/pytest_results.txt"))
+    builder.redirectErrorStream(true)
+
+    val process = builder.start()
+
+    process.waitFor()
+
+    assert(process.exitValue() === 0)
+  }
+
+  private def createPyTestsForPythonAPI(): File = {
+    var source: InputStream = null
+    try {
+      source = getClass.getClassLoader.getResourceAsStream("test_python_api.py")
+      val file = Files.createTempFile("", "").toFile
+      Files.copy(source, file.toPath, StandardCopyOption.REPLACE_EXISTING)
+      file
+    } finally {
+      source.close()
+    }
+  }
+
+  private def createTempFilesForTest(
+      fileName: String,
+      fileExtension: String,
+      fileContent: String,
+      uploadFileToHdfs: Boolean): String = {
+    val path = Files.createTempFile(fileName, fileExtension)
+    Files.write(path, fileContent.getBytes(UTF_8))
+    if (uploadFileToHdfs) {
+      uploadToHdfs(path.toFile())
+    } else {
+      path.toString
+    }
+  }
+
+  private def waitFor[T](future: JFuture[T]): T = {
+    future.get(30, TimeUnit.SECONDS)
+  }
+
+  private def sessionList(): SessionList = {
+    val response = httpClient.prepareGet(s"$livyEndpoint/sessions/").execute().get()
+    assert(response.getStatusCode === HttpServletResponse.SC_OK)
+    mapper.readValue(response.getResponseBodyAsStream, classOf[SessionList])
+  }
+
+  private def createClient(uri: String): LivyClient = {
+    new LivyClientBuilder().setURI(new URI(uri)).build()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/test/spark2/scala/Spark2JobApiIT.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/test/spark2/scala/Spark2JobApiIT.scala b/integration-test/src/test/spark2/scala/Spark2JobApiIT.scala
index 0501685..aa30b88 100644
--- a/integration-test/src/test/spark2/scala/Spark2JobApiIT.scala
+++ b/integration-test/src/test/spark2/scala/Spark2JobApiIT.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package com.cloudera.livy.test
+package org.apache.livy.test
 
 import java.io.File
 import java.net.URI
@@ -26,11 +26,11 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.scalatest.BeforeAndAfterAll
 
-import com.cloudera.livy._
-import com.cloudera.livy.client.common.HttpMessages._
-import com.cloudera.livy.sessions.SessionKindModule
-import com.cloudera.livy.test.framework.BaseIntegrationTestSuite
-import com.cloudera.livy.test.jobs.spark2._
+import org.apache.livy._
+import org.apache.livy.client.common.HttpMessages._
+import org.apache.livy.sessions.SessionKindModule
+import org.apache.livy.test.framework.BaseIntegrationTestSuite
+import org.apache.livy.test.jobs.spark2._
 
 class Spark2JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll with Logging {
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d139099..9c1263b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,9 +20,9 @@
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
 
-  <groupId>com.cloudera.livy</groupId>
+  <groupId>org.apache.livy</groupId>
   <artifactId>livy-main</artifactId>
-  <version>0.4.0-SNAPSHOT</version>
+  <version>0.4.0-incubating-SNAPSHOT</version>
   <packaging>pom</packaging>
 
   <name>livy-main</name>

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/python-api/pom.xml
----------------------------------------------------------------------
diff --git a/python-api/pom.xml b/python-api/pom.xml
index 3ea50c1..86379ce 100644
--- a/python-api/pom.xml
+++ b/python-api/pom.xml
@@ -21,15 +21,15 @@
   <modelVersion>4.0.0</modelVersion>
 
   <parent>
-    <groupId>com.cloudera.livy</groupId>
+    <groupId>org.apache.livy</groupId>
     <artifactId>livy-main</artifactId>
-    <version>0.4.0-SNAPSHOT</version>
+    <version>0.4.0-incubating-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
-  <groupId>com.cloudera.livy</groupId>
+  <groupId>org.apache.livy</groupId>
   <artifactId>livy-python-api</artifactId>
-  <version>0.4.0-SNAPSHOT</version>
+  <version>0.4.0-incubating-SNAPSHOT</version>
   <packaging>pom</packaging>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/python-api/setup.py
----------------------------------------------------------------------
diff --git a/python-api/setup.py b/python-api/setup.py
index 5f55ffd..a7e3aab 100644
--- a/python-api/setup.py
+++ b/python-api/setup.py
@@ -39,14 +39,14 @@ requirements = [
 
 setup(
     name='livy-python-api',
-    version="0.4.0-SNAPSHOT",
+    version="0.4.0-incubating-SNAPSHOT",
     packages=["livy", "livy-tests"],
     package_dir={
         "": "src/main/python",
         "livy-tests": "src/test/python/livy-tests",
     },
-    url='https://github.com/cloudera/livy',
-    author_email='livy-user@cloudera.org',
+    url='https://github.com/apache/incubator-livy',
+    author_email='user@livy.incubator.apache.org',
     license='Apache License, Version 2.0',
     description=DESCRIPTION,
     platforms=['any'],

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/pom.xml
----------------------------------------------------------------------
diff --git a/repl/pom.xml b/repl/pom.xml
index 3af51bd..14931ba 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -20,33 +20,33 @@
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
-    <groupId>com.cloudera.livy</groupId>
+    <groupId>org.apache.livy</groupId>
     <artifactId>multi-scala-project-root</artifactId>
     <relativePath>../scala/pom.xml</relativePath>
-    <version>0.4.0-SNAPSHOT</version>
+    <version>0.4.0-incubating-SNAPSHOT</version>
   </parent>
 
   <artifactId>livy-repl-parent</artifactId>
-  <version>0.4.0-SNAPSHOT</version>
+  <version>0.4.0-incubating-SNAPSHOT</version>
   <packaging>pom</packaging>
 
   <dependencies>
 
     <dependency>
-      <groupId>com.cloudera.livy</groupId>
+      <groupId>org.apache.livy</groupId>
       <artifactId>livy-rsc</artifactId>
       <version>${project.version}</version>
       <scope>provided</scope>
     </dependency>
 
     <dependency>
-      <groupId>com.cloudera.livy</groupId>
+      <groupId>org.apache.livy</groupId>
       <artifactId>livy-core_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
       <exclusions>
         <!-- Provided and shaded by livy-rsc already. -->
         <exclusion>
-          <groupId>com.cloudera.livy</groupId>
+          <groupId>org.apache.livy</groupId>
           <artifactId>livy-client-common</artifactId>
         </exclusion>
       </exclusions>
@@ -189,7 +189,7 @@
               <relocations>
                 <relocation>
                   <pattern>org.json4s</pattern>
-                  <shadedPattern>com.cloudera.livy.shaded.json4s</shadedPattern>
+                  <shadedPattern>org.apache.livy.shaded.json4s</shadedPattern>
                 </relocation>
               </relocations>
             </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/scala-2.10/pom.xml
----------------------------------------------------------------------
diff --git a/repl/scala-2.10/pom.xml b/repl/scala-2.10/pom.xml
index c81659b..4d84611 100644
--- a/repl/scala-2.10/pom.xml
+++ b/repl/scala-2.10/pom.xml
@@ -19,15 +19,15 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
-  <groupId>com.cloudera.livy</groupId>
+  <groupId>org.apache.livy</groupId>
   <artifactId>livy-repl_2.10</artifactId>
-  <version>0.4.0-SNAPSHOT</version>
+  <version>0.4.0-incubating-SNAPSHOT</version>
   <packaging>jar</packaging>
 
   <parent>
-    <groupId>com.cloudera.livy</groupId>
+    <groupId>org.apache.livy</groupId>
     <artifactId>livy-repl-parent</artifactId>
-    <version>0.4.0-SNAPSHOT</version>
+    <version>0.4.0-incubating-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/scala-2.10/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.10/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala b/repl/scala-2.10/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala
deleted file mode 100644
index dc761cc..0000000
--- a/repl/scala-2.10/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.repl
-
-import java.io._
-import java.net.URLClassLoader
-import java.nio.file.Paths
-
-import scala.tools.nsc.Settings
-import scala.tools.nsc.interpreter.JPrintWriter
-import scala.tools.nsc.interpreter.Results.Result
-import scala.util.{Failure, Success, Try}
-
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.repl.SparkIMain
-
-/**
- * This represents a Spark interpreter. It is not thread safe.
- */
-class SparkInterpreter(conf: SparkConf)
-  extends AbstractSparkInterpreter with SparkContextInitializer {
-
-  private var sparkIMain: SparkIMain = _
-  protected var sparkContext: SparkContext = _
-
-  override def start(): SparkContext = {
-    require(sparkIMain == null && sparkContext == null)
-
-    val settings = new Settings()
-    settings.embeddedDefaults(Thread.currentThread().getContextClassLoader())
-    settings.usejavacp.value = true
-
-    sparkIMain = new SparkIMain(settings, new JPrintWriter(outputStream, true))
-    sparkIMain.initializeSynchronous()
-
-    // Spark 1.6 does not have "classServerUri"; instead, the local directory where class files
-    // are stored needs to be registered in SparkConf. See comment in
-    // SparkILoop::createSparkContext().
-    Try(sparkIMain.getClass().getMethod("classServerUri")) match {
-      case Success(method) =>
-        method.setAccessible(true)
-        conf.set("spark.repl.class.uri", method.invoke(sparkIMain).asInstanceOf[String])
-
-      case Failure(_) =>
-        val outputDir = sparkIMain.getClass().getMethod("getClassOutputDirectory")
-        outputDir.setAccessible(true)
-        conf.set("spark.repl.class.outputDir",
-          outputDir.invoke(sparkIMain).asInstanceOf[File].getAbsolutePath())
-    }
-
-    restoreContextClassLoader {
-      // Call sparkIMain.setContextClassLoader() to make sure SparkContext and repl are using the
-      // same ClassLoader. Otherwise if someone defined a new class in interactive shell,
-      // SparkContext cannot see them and will result in job stage failure.
-      val setContextClassLoaderMethod = sparkIMain.getClass().getMethod("setContextClassLoader")
-      setContextClassLoaderMethod.setAccessible(true)
-      setContextClassLoaderMethod.invoke(sparkIMain)
-
-      // With usejavacp=true, the Scala interpreter looks for jars under System Classpath. But it
-      // doesn't look for jars added to MutableURLClassLoader. Thus extra jars are not visible to
-      // the interpreter. SparkContext can use them via JVM ClassLoaders but users cannot import
-      // them using Scala import statement.
-      //
-      // For instance: If we import a package using SparkConf:
-      // "spark.jars.packages": "com.databricks:spark-csv_2.10:1.4.0"
-      // then "import com.databricks.spark.csv._" in the interpreter, it will throw an error.
-      //
-      // Adding them to the interpreter manually to fix this issue.
-      var classLoader = Thread.currentThread().getContextClassLoader
-      while (classLoader != null) {
-        if (classLoader.getClass.getCanonicalName == "org.apache.spark.util.MutableURLClassLoader")
-        {
-          val extraJarPath = classLoader.asInstanceOf[URLClassLoader].getURLs()
-            // Check if the file exists. Otherwise an exception will be thrown.
-            .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile }
-            // Livy rsc and repl are also in the extra jars list. Filter them out.
-            .filterNot { u => Paths.get(u.toURI).getFileName.toString.startsWith("livy-") }
-            // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it.
-            .filterNot { u =>
-              Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect")
-            }
-
-          extraJarPath.foreach { p => debug(s"Adding $p to Scala interpreter's class path...") }
-          sparkIMain.addUrlsToClassPath(extraJarPath: _*)
-          classLoader = null
-        } else {
-          classLoader = classLoader.getParent
-        }
-      }
-
-      createSparkContext(conf)
-    }
-
-    sparkContext
-  }
-
-  protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = {
-    sparkIMain.beQuietDuring {
-      sparkIMain.bind(name, tpe, value, modifier)
-    }
-  }
-
-  override def close(): Unit = synchronized {
-    if (sparkContext != null) {
-      sparkContext.stop()
-      sparkContext = null
-    }
-
-    if (sparkIMain != null) {
-      sparkIMain.close()
-      sparkIMain = null
-    }
-  }
-
-  override protected def isStarted(): Boolean = {
-    sparkContext != null && sparkIMain != null
-  }
-
-  override protected def interpret(code: String): Result = {
-    sparkIMain.interpret(code)
-  }
-
-  override protected[repl] def parseError(stdout: String): (String, Seq[String]) = {
-    // An example of Scala 2.10 runtime exception error message:
-    // java.lang.Exception: message
-    //     at $iwC$$iwC$$iwC$$iwC$$iwC.error(<console>:25)
-    //     at $iwC$$iwC$$iwC.error2(<console>:27)
-    //     at $iwC$$iwC.<init>(<console>:41)
-    //     at $iwC.<init>(<console>:43)
-    //     at <init>(<console>:45)
-    //     at .<init>(<console>:49)
-    //     at .<clinit>(<console>)
-    //     at .<init>(<console>:7)
-    //     at .<clinit>(<console>)
-    //     at $print(<console>)
-    //     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
-    //     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
-    // ...
-
-    val (ename, traceback) = super.parseError(stdout)
-
-    // Remove internal frames.
-    val startOfInternalFrames = traceback.indexWhere(_.contains("$iwC$$iwC.<init>"))
-    var endOfInternalFrames = traceback.indexWhere(!_.trim.startsWith("at"), startOfInternalFrames)
-    if (endOfInternalFrames == -1) {
-      endOfInternalFrames = traceback.length
-    }
-
-    val cleanedTraceback = if (startOfInternalFrames == -1) {
-      traceback
-    } else {
-      traceback.view.zipWithIndex
-        .filterNot { z => z._2 >= startOfInternalFrames && z._2 < endOfInternalFrames }
-        .map { _._1.replaceAll("(\\$iwC\\$)*\\$iwC", "<user code>") }
-    }
-
-    (ename, cleanedTraceback)
-  }
-
-  override protected def valueOfTerm(name: String): Option[Any] = {
-    sparkIMain.valueOfTerm(name)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/scala-2.10/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.10/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala b/repl/scala-2.10/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
new file mode 100644
index 0000000..f5b5b32
--- /dev/null
+++ b/repl/scala-2.10/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.repl
+
+import java.io._
+import java.net.URLClassLoader
+import java.nio.file.Paths
+
+import scala.tools.nsc.Settings
+import scala.tools.nsc.interpreter.JPrintWriter
+import scala.tools.nsc.interpreter.Results.Result
+import scala.util.{Failure, Success, Try}
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.repl.SparkIMain
+
+/**
+ * This represents a Spark interpreter. It is not thread safe.
+ */
+class SparkInterpreter(conf: SparkConf)
+  extends AbstractSparkInterpreter with SparkContextInitializer {
+
+  private var sparkIMain: SparkIMain = _
+  protected var sparkContext: SparkContext = _
+
+  override def start(): SparkContext = {
+    require(sparkIMain == null && sparkContext == null)
+
+    val settings = new Settings()
+    settings.embeddedDefaults(Thread.currentThread().getContextClassLoader())
+    settings.usejavacp.value = true
+
+    sparkIMain = new SparkIMain(settings, new JPrintWriter(outputStream, true))
+    sparkIMain.initializeSynchronous()
+
+    // Spark 1.6 does not have "classServerUri"; instead, the local directory where class files
+    // are stored needs to be registered in SparkConf. See comment in
+    // SparkILoop::createSparkContext().
+    Try(sparkIMain.getClass().getMethod("classServerUri")) match {
+      case Success(method) =>
+        method.setAccessible(true)
+        conf.set("spark.repl.class.uri", method.invoke(sparkIMain).asInstanceOf[String])
+
+      case Failure(_) =>
+        val outputDir = sparkIMain.getClass().getMethod("getClassOutputDirectory")
+        outputDir.setAccessible(true)
+        conf.set("spark.repl.class.outputDir",
+          outputDir.invoke(sparkIMain).asInstanceOf[File].getAbsolutePath())
+    }
+
+    restoreContextClassLoader {
+      // Call sparkIMain.setContextClassLoader() to make sure SparkContext and repl are using the
+      // same ClassLoader. Otherwise if someone defined a new class in interactive shell,
+      // SparkContext cannot see them and will result in job stage failure.
+      val setContextClassLoaderMethod = sparkIMain.getClass().getMethod("setContextClassLoader")
+      setContextClassLoaderMethod.setAccessible(true)
+      setContextClassLoaderMethod.invoke(sparkIMain)
+
+      // With usejavacp=true, the Scala interpreter looks for jars under System Classpath. But it
+      // doesn't look for jars added to MutableURLClassLoader. Thus extra jars are not visible to
+      // the interpreter. SparkContext can use them via JVM ClassLoaders but users cannot import
+      // them using Scala import statement.
+      //
+      // For instance: If we import a package using SparkConf:
+      // "spark.jars.packages": "com.databricks:spark-csv_2.10:1.4.0"
+      // then "import com.databricks.spark.csv._" in the interpreter, it will throw an error.
+      //
+      // Adding them to the interpreter manually to fix this issue.
+      var classLoader = Thread.currentThread().getContextClassLoader
+      while (classLoader != null) {
+        if (classLoader.getClass.getCanonicalName == "org.apache.spark.util.MutableURLClassLoader")
+        {
+          val extraJarPath = classLoader.asInstanceOf[URLClassLoader].getURLs()
+            // Check if the file exists. Otherwise an exception will be thrown.
+            .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile }
+            // Livy rsc and repl are also in the extra jars list. Filter them out.
+            .filterNot { u => Paths.get(u.toURI).getFileName.toString.startsWith("livy-") }
+            // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it.
+            .filterNot { u =>
+              Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect")
+            }
+
+          extraJarPath.foreach { p => debug(s"Adding $p to Scala interpreter's class path...") }
+          sparkIMain.addUrlsToClassPath(extraJarPath: _*)
+          classLoader = null
+        } else {
+          classLoader = classLoader.getParent
+        }
+      }
+
+      createSparkContext(conf)
+    }
+
+    sparkContext
+  }
+
+  protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = {
+    sparkIMain.beQuietDuring {
+      sparkIMain.bind(name, tpe, value, modifier)
+    }
+  }
+
+  override def close(): Unit = synchronized {
+    if (sparkContext != null) {
+      sparkContext.stop()
+      sparkContext = null
+    }
+
+    if (sparkIMain != null) {
+      sparkIMain.close()
+      sparkIMain = null
+    }
+  }
+
+  override protected def isStarted(): Boolean = {
+    sparkContext != null && sparkIMain != null
+  }
+
+  override protected def interpret(code: String): Result = {
+    sparkIMain.interpret(code)
+  }
+
+  override protected[repl] def parseError(stdout: String): (String, Seq[String]) = {
+    // An example of Scala 2.10 runtime exception error message:
+    // java.lang.Exception: message
+    //     at $iwC$$iwC$$iwC$$iwC$$iwC.error(<console>:25)
+    //     at $iwC$$iwC$$iwC.error2(<console>:27)
+    //     at $iwC$$iwC.<init>(<console>:41)
+    //     at $iwC.<init>(<console>:43)
+    //     at <init>(<console>:45)
+    //     at .<init>(<console>:49)
+    //     at .<clinit>(<console>)
+    //     at .<init>(<console>:7)
+    //     at .<clinit>(<console>)
+    //     at $print(<console>)
+    //     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
+    //     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
+    // ...
+
+    val (ename, traceback) = super.parseError(stdout)
+
+    // Remove internal frames.
+    val startOfInternalFrames = traceback.indexWhere(_.contains("$iwC$$iwC.<init>"))
+    var endOfInternalFrames = traceback.indexWhere(!_.trim.startsWith("at"), startOfInternalFrames)
+    if (endOfInternalFrames == -1) {
+      endOfInternalFrames = traceback.length
+    }
+
+    val cleanedTraceback = if (startOfInternalFrames == -1) {
+      traceback
+    } else {
+      traceback.view.zipWithIndex
+        .filterNot { z => z._2 >= startOfInternalFrames && z._2 < endOfInternalFrames }
+        .map { _._1.replaceAll("(\\$iwC\\$)*\\$iwC", "<user code>") }
+    }
+
+    (ename, cleanedTraceback)
+  }
+
+  override protected def valueOfTerm(name: String): Option[Any] = {
+    sparkIMain.valueOfTerm(name)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/scala-2.10/src/test/scala/com/cloudera/livy/repl/SparkInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.10/src/test/scala/com/cloudera/livy/repl/SparkInterpreterSpec.scala b/repl/scala-2.10/src/test/scala/com/cloudera/livy/repl/SparkInterpreterSpec.scala
deleted file mode 100644
index 960b59e..0000000
--- a/repl/scala-2.10/src/test/scala/com/cloudera/livy/repl/SparkInterpreterSpec.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.repl
-
-import org.scalatest._
-
-import com.cloudera.livy.LivyBaseUnitTestSuite
-
-class SparkInterpreterSpec extends FunSpec with Matchers with LivyBaseUnitTestSuite {
-  describe("SparkInterpreter") {
-    val interpreter = new SparkInterpreter(null)
-
-    it("should parse Scala compile error.") {
-      // Regression test for LIVY-260.
-      val error =
-        """<console>:27: error: type mismatch;
-          | found   : Int
-          | required: String
-          |       sc.setJobGroup(groupName, groupName, true)
-          |                      ^
-          |<console>:27: error: type mismatch;
-          | found   : Int
-          | required: String
-          |       sc.setJobGroup(groupName, groupName, true)
-          |                                 ^
-          |""".stripMargin
-
-      val expectedTraceback = AbstractSparkInterpreter.KEEP_NEWLINE_REGEX.split(
-        """ found   : Int
-          | required: String
-          |       sc.setJobGroup(groupName, groupName, true)
-          |                      ^
-          |<console>:27: error: type mismatch;
-          | found   : Int
-          | required: String
-          |       sc.setJobGroup(groupName, groupName, true)
-          |                                 ^
-          |""".stripMargin)
-
-      val (ename, traceback) = interpreter.parseError(error)
-      ename shouldBe "<console>:27: error: type mismatch;"
-      traceback shouldBe expectedTraceback
-    }
-
-    it("should parse Scala runtime error and remove internal frames.") {
-      val error =
-        """java.lang.RuntimeException: message
-          |        at $iwC$$iwC$$iwC$$iwC$$iwC.error(<console>:25)
-          |        at $iwC$$iwC$$iwC.error2(<console>:27)
-          |        at $iwC$$iwC.<init>(<console>:41)
-          |        at $iwC.<init>(<console>:43)
-          |        at <init>(<console>:45)
-          |        at .<init>(<console>:49)
-          |        at .<clinit>(<console>)
-          |        at .<init>(<console>:7)
-          |        at .<clinit>(<console>)
-          |        at $print(<console>)
-          |        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
-          |""".stripMargin
-
-      val expectedTraceback = AbstractSparkInterpreter.KEEP_NEWLINE_REGEX.split(
-        """        at <user code>.error(<console>:25)
-          |        at <user code>.error2(<console>:27)
-          |""".stripMargin)
-
-      val (ename, traceback) = interpreter.parseError(error)
-      ename shouldBe "java.lang.RuntimeException: message"
-      traceback shouldBe expectedTraceback
-    }
-  }
-}