You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/07/05 03:06:31 UTC

[02/33] incubator-livy git commit: LIVY-375. Change Livy code package name to org.apache.livy

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala
new file mode 100644
index 0000000..e40bb1c
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server.recovery
+
+import org.scalatest.FunSpec
+import org.scalatest.Matchers._
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+
+class BlackholeStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
+  describe("BlackholeStateStore") {
+    val stateStore = new BlackholeStateStore(new LivyConf())
+
+    it("set should not throw") {
+      stateStore.set("", 1.asInstanceOf[Object])
+    }
+
+    it("get should return None") {
+      val v = stateStore.get[Object]("")
+      v shouldBe None
+    }
+
+    it("getChildren should return empty list") {
+      val c = stateStore.getChildren("")
+      c shouldBe empty
+    }
+
+    it("remove should not throw") {
+      stateStore.remove("")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala
new file mode 100644
index 0000000..4758c85
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server.recovery
+
+import java.io.{FileNotFoundException, InputStream, IOException}
+import java.util
+
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.Options.{CreateOpts, Rename}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.hamcrest.Description
+import org.mockito.ArgumentMatcher
+import org.mockito.Matchers.{any, anyInt, argThat, eq => equal}
+import org.mockito.Mockito.{atLeastOnce, verify, when}
+import org.mockito.internal.matchers.Equals
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.FunSpec
+import org.scalatest.Matchers._
+import org.scalatest.mock.MockitoSugar.mock
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+
+class FileSystemStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
+  describe("FileSystemStateStore") {
+    def pathEq(wantedPath: String): Path = argThat(new ArgumentMatcher[Path] {
+      private val matcher = new Equals(wantedPath)
+
+      override def matches(path: Any): Boolean = matcher.matches(path.toString)
+
+      override def describeTo(d: Description): Unit = { matcher.describeTo(d) }
+    })
+
+    def makeConf(): LivyConf = {
+      val conf = new LivyConf()
+      conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "file://tmp/")
+
+      conf
+    }
+
+    def mockFileContext(rootDirPermission: String): FileContext = {
+      val fileContext = mock[FileContext]
+      val rootDirStatus = mock[FileStatus]
+      when(fileContext.getFileStatus(any())).thenReturn(rootDirStatus)
+      when(rootDirStatus.getPermission).thenReturn(new FsPermission(rootDirPermission))
+
+      fileContext
+    }
+
+    it("should throw if url is not configured") {
+      intercept[IllegalArgumentException](new FileSystemStateStore(new LivyConf()))
+    }
+
+    it("should set and verify file permission") {
+      val fileContext = mockFileContext("700")
+      new FileSystemStateStore(makeConf(), Some(fileContext))
+
+      verify(fileContext).setUMask(new FsPermission("077"))
+    }
+
+    it("should reject insecure permission") {
+      def test(permission: String): Unit = {
+        val fileContext = mockFileContext(permission)
+
+        intercept[IllegalArgumentException](new FileSystemStateStore(makeConf(), Some(fileContext)))
+      }
+      test("600")
+      test("400")
+      test("677")
+      test("670")
+      test("607")
+    }
+
+    it("set should write with an intermediate file") {
+      val fileContext = mockFileContext("700")
+      val outputStream = mock[FSDataOutputStream]
+      when(fileContext.create(pathEq("/key.tmp"), any[util.EnumSet[CreateFlag]], any[CreateOpts]))
+        .thenReturn(outputStream)
+
+      val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext))
+
+      stateStore.set("key", "value")
+
+      verify(outputStream).write(""""value"""".getBytes)
+      verify(outputStream, atLeastOnce).close()
+
+
+      verify(fileContext).rename(pathEq("/key.tmp"), pathEq("/key"), equal(Rename.OVERWRITE))
+      verify(fileContext).delete(pathEq("/.key.tmp.crc"), equal(false))
+    }
+
+    it("get should read file") {
+      val fileContext = mockFileContext("700")
+      abstract class MockInputStream extends InputStream with Seekable with PositionedReadable {}
+      val inputStream: InputStream = mock[MockInputStream]
+      when(inputStream.read(any[Array[Byte]](), anyInt(), anyInt())).thenAnswer(new Answer[Int] {
+        private var firstCall = true
+        override def answer(invocation: InvocationOnMock): Int = {
+          if (firstCall) {
+            firstCall = false
+            val buf = invocation.getArguments()(0).asInstanceOf[Array[Byte]]
+            val b = """"value"""".getBytes()
+            b.copyToArray(buf)
+            b.length
+          } else {
+            -1
+          }
+        }
+      })
+
+      when(fileContext.open(pathEq("/key"))).thenReturn(new FSDataInputStream(inputStream))
+
+      val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext))
+
+      stateStore.get[String]("key") shouldBe Some("value")
+
+      verify(inputStream, atLeastOnce).close()
+    }
+
+    it("get non-existent key should return None") {
+      val fileContext = mockFileContext("700")
+      when(fileContext.open(any())).thenThrow(new FileNotFoundException("Unit test"))
+
+      val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext))
+
+      stateStore.get[String]("key") shouldBe None
+    }
+
+    it("getChildren should list file") {
+      val parentPath = "path"
+      def makeFileStatus(name: String): FileStatus = {
+        val fs = new FileStatus()
+        fs.setPath(new Path(parentPath, name))
+        fs
+      }
+      val children = Seq("c1", "c2")
+
+      val fileContext = mockFileContext("700")
+      val util = mock[FileContext#Util]
+      when(util.listStatus(pathEq(s"/$parentPath")))
+        .thenReturn(children.map(makeFileStatus).toArray)
+      when(fileContext.util()).thenReturn(util)
+
+      val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext))
+      stateStore.getChildren(parentPath) should contain theSameElementsAs children
+    }
+
+    def getChildrenErrorTest(error: Exception): Unit = {
+      val parentPath = "path"
+
+      val fileContext = mockFileContext("700")
+      val util = mock[FileContext#Util]
+      when(util.listStatus(pathEq(s"/$parentPath"))).thenThrow(error)
+      when(fileContext.util()).thenReturn(util)
+
+      val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext))
+      stateStore.getChildren(parentPath) shouldBe empty
+    }
+
+    it("getChildren should return empty list if the key doesn't exist") {
+      getChildrenErrorTest(new IOException("Unit test"))
+    }
+
+    it("getChildren should return empty list if key doesn't exist") {
+      getChildrenErrorTest(new FileNotFoundException("Unit test"))
+    }
+
+    it("remove should delete file") {
+      val fileContext = mockFileContext("700")
+
+      val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext))
+      stateStore.remove("key")
+
+      verify(fileContext).delete(pathEq("/key"), equal(false))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/org/apache/livy/server/recovery/SessionStoreSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/server/recovery/SessionStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/SessionStoreSpec.scala
new file mode 100644
index 0000000..5eeb2cf
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/server/recovery/SessionStoreSpec.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server.recovery
+
+import scala.util.Success
+
+import org.mockito.Mockito._
+import org.scalatest.FunSpec
+import org.scalatest.Matchers._
+import org.scalatest.mock.MockitoSugar.mock
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+import org.apache.livy.sessions.Session.RecoveryMetadata
+
+class SessionStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
+  describe("SessionStore") {
+    case class TestRecoveryMetadata(id: Int) extends RecoveryMetadata
+
+    val sessionType = "test"
+    val sessionPath = s"v1/$sessionType"
+    val sessionManagerPath = s"v1/$sessionType/state"
+
+    val conf = new LivyConf()
+    it("should set session state and session counter when saving a session.") {
+      val stateStore = mock[StateStore]
+      val sessionStore = new SessionStore(conf, stateStore)
+
+      val m = TestRecoveryMetadata(99)
+      sessionStore.save(sessionType, m)
+      verify(stateStore).set(s"$sessionPath/99", m)
+    }
+
+    it("should return existing sessions") {
+      val validMetadata = Map(
+        "0" -> Some(TestRecoveryMetadata(0)),
+        "5" -> None,
+        "77" -> Some(TestRecoveryMetadata(77)))
+      val corruptedMetadata = Map(
+        "7" -> new RuntimeException("Test"),
+        "11212" -> new RuntimeException("Test")
+      )
+      val stateStore = mock[StateStore]
+      val sessionStore = new SessionStore(conf, stateStore)
+      when(stateStore.getChildren(sessionPath))
+        .thenReturn((validMetadata ++ corruptedMetadata).keys.toList)
+
+      validMetadata.foreach { case (id, m) =>
+        when(stateStore.get[TestRecoveryMetadata](s"$sessionPath/$id")).thenReturn(m)
+      }
+
+      corruptedMetadata.foreach { case (id, ex) =>
+        when(stateStore.get[TestRecoveryMetadata](s"$sessionPath/$id")).thenThrow(ex)
+      }
+
+      val s = sessionStore.getAllSessions[TestRecoveryMetadata](sessionType)
+      // Verify normal metadata are retrieved.
+      s.filter(_.isSuccess) should contain theSameElementsAs
+        validMetadata.values.filter(_.isDefined).map(m => Success(m.get))
+      // Verify exceptions are wrapped as in Try and are returned.
+      s.filter(_.isFailure) should have size corruptedMetadata.size
+    }
+
+    it("should not throw if the state store is empty") {
+      val stateStore = mock[StateStore]
+      val sessionStore = new SessionStore(conf, stateStore)
+      when(stateStore.getChildren(sessionPath)).thenReturn(Seq.empty)
+
+      val s = sessionStore.getAllSessions[TestRecoveryMetadata](sessionType)
+      s.filter(_.isSuccess) shouldBe empty
+    }
+
+    it("should return correct next session id") {
+      val stateStore = mock[StateStore]
+      val sessionStore = new SessionStore(conf, stateStore)
+
+      when(stateStore.get[SessionManagerState](sessionManagerPath)).thenReturn(None)
+      sessionStore.getNextSessionId(sessionType) shouldBe 0
+
+      val sms = SessionManagerState(100)
+      when(stateStore.get[SessionManagerState](sessionManagerPath)).thenReturn(Some(sms))
+      sessionStore.getNextSessionId(sessionType) shouldBe sms.nextSessionId
+    }
+
+    it("should remove session") {
+      val stateStore = mock[StateStore]
+      val sessionStore = new SessionStore(conf, stateStore)
+      val id = 1
+
+      sessionStore.remove(sessionType, 1)
+      verify(stateStore).remove(s"$sessionPath/$id")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/org/apache/livy/server/recovery/StateStoreSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/server/recovery/StateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/StateStoreSpec.scala
new file mode 100644
index 0000000..c7040a5
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/server/recovery/StateStoreSpec.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server.recovery
+
+import scala.reflect.classTag
+
+import org.scalatest.{BeforeAndAfter, FunSpec}
+import org.scalatest.Matchers._
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+import org.apache.livy.sessions.SessionManager
+
+class StateStoreSpec extends FunSpec with BeforeAndAfter with LivyBaseUnitTestSuite {
+  describe("StateStore") {
+    after {
+      StateStore.cleanup()
+    }
+
+    def createConf(stateStore: String): LivyConf = {
+      val conf = new LivyConf()
+      conf.set(LivyConf.RECOVERY_MODE.key, SessionManager.SESSION_RECOVERY_MODE_RECOVERY)
+      conf.set(LivyConf.RECOVERY_STATE_STORE.key, stateStore)
+      conf
+    }
+
+    it("should throw an error on get if it's not initialized") {
+      intercept[AssertionError] { StateStore.get }
+    }
+
+    it("should initialize blackhole state store if recovery is disabled") {
+      StateStore.init(new LivyConf())
+      StateStore.get shouldBe a[BlackholeStateStore]
+    }
+
+    it("should pick the correct store according to state store config") {
+      StateStore.pickStateStore(createConf("filesystem")) shouldBe classOf[FileSystemStateStore]
+      StateStore.pickStateStore(createConf("zookeeper")) shouldBe classOf[ZooKeeperStateStore]
+    }
+
+    it("should return error if an unknown recovery mode is set") {
+      val conf = new LivyConf()
+      conf.set(LivyConf.RECOVERY_MODE.key, "unknown")
+      intercept[IllegalArgumentException] { StateStore.init(conf) }
+    }
+
+    it("should return error if an unknown state store is set") {
+      intercept[IllegalArgumentException] { StateStore.init(createConf("unknown")) }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala
new file mode 100644
index 0000000..88e530f
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server.recovery
+
+import scala.collection.JavaConverters._
+
+import org.apache.curator.framework.CuratorFramework
+import org.apache.curator.framework.api._
+import org.apache.curator.framework.listen.Listenable
+import org.apache.zookeeper.data.Stat
+import org.mockito.Mockito._
+import org.scalatest.FunSpec
+import org.scalatest.Matchers._
+import org.scalatest.mock.MockitoSugar.mock
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+
+class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
+  describe("ZooKeeperStateStore") {
+    case class TestFixture(stateStore: ZooKeeperStateStore, curatorClient: CuratorFramework)
+    val conf = new LivyConf()
+    conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "host")
+    val key = "key"
+    val prefixedKey = s"/livy/$key"
+
+    def withMock[R](testBody: TestFixture => R): R = {
+      val curatorClient = mock[CuratorFramework]
+      when(curatorClient.getUnhandledErrorListenable())
+        .thenReturn(mock[Listenable[UnhandledErrorListener]])
+      val stateStore = new ZooKeeperStateStore(conf, Some(curatorClient))
+      testBody(TestFixture(stateStore, curatorClient))
+    }
+
+    def mockExistsBuilder(curatorClient: CuratorFramework, exists: Boolean): Unit = {
+      val existsBuilder = mock[ExistsBuilder]
+      when(curatorClient.checkExists()).thenReturn(existsBuilder)
+      if (exists) {
+        when(existsBuilder.forPath(prefixedKey)).thenReturn(mock[Stat])
+      }
+    }
+
+    it("should throw on bad config") {
+      withMock { f =>
+        val conf = new LivyConf()
+        intercept[IllegalArgumentException] { new ZooKeeperStateStore(conf) }
+
+        conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "host")
+        conf.set(ZooKeeperStateStore.ZK_RETRY_CONF, "bad")
+        intercept[IllegalArgumentException] { new ZooKeeperStateStore(conf) }
+      }
+    }
+
+    it("set should use curatorClient") {
+      withMock { f =>
+        mockExistsBuilder(f.curatorClient, true)
+
+        val setDataBuilder = mock[SetDataBuilder]
+        when(f.curatorClient.setData()).thenReturn(setDataBuilder)
+
+        f.stateStore.set("key", 1.asInstanceOf[Object])
+
+        verify(f.curatorClient).start()
+        verify(setDataBuilder).forPath(prefixedKey, Array[Byte](49))
+      }
+    }
+
+    it("set should create parents if they don't exist") {
+      withMock { f =>
+        mockExistsBuilder(f.curatorClient, false)
+
+        val createBuilder = mock[CreateBuilder]
+        when(f.curatorClient.create()).thenReturn(createBuilder)
+        val p = mock[ProtectACLCreateModePathAndBytesable[String]]
+        when(createBuilder.creatingParentsIfNeeded()).thenReturn(p)
+
+        f.stateStore.set("key", 1.asInstanceOf[Object])
+
+        verify(f.curatorClient).start()
+        verify(p).forPath(prefixedKey, Array[Byte](49))
+      }
+    }
+
+    it("get should retrieve retry policy configs") {
+      conf.set(org.apache.livy.server.recovery.ZooKeeperStateStore.ZK_RETRY_CONF, "11,77")
+        withMock { f =>
+        mockExistsBuilder(f.curatorClient, true)
+
+        f.stateStore.retryPolicy should not be null
+        f.stateStore.retryPolicy.getN shouldBe 11
+      }
+    }
+
+    it("get should retrieve data from curatorClient") {
+      withMock { f =>
+        mockExistsBuilder(f.curatorClient, true)
+
+        val getDataBuilder = mock[GetDataBuilder]
+        when(f.curatorClient.getData()).thenReturn(getDataBuilder)
+        when(getDataBuilder.forPath(prefixedKey)).thenReturn(Array[Byte](50))
+
+        val v = f.stateStore.get[Int]("key")
+
+        verify(f.curatorClient).start()
+        v shouldBe Some(2)
+      }
+    }
+
+    it("get should return None if key doesn't exist") {
+      withMock { f =>
+        mockExistsBuilder(f.curatorClient, false)
+
+        val v = f.stateStore.get[Int]("key")
+
+        verify(f.curatorClient).start()
+        v shouldBe None
+      }
+    }
+
+    it("getChildren should use curatorClient") {
+      withMock { f =>
+        mockExistsBuilder(f.curatorClient, true)
+
+        val getChildrenBuilder = mock[GetChildrenBuilder]
+        when(f.curatorClient.getChildren()).thenReturn(getChildrenBuilder)
+        val children = List("abc", "def")
+        when(getChildrenBuilder.forPath(prefixedKey)).thenReturn(children.asJava)
+
+        val c = f.stateStore.getChildren("key")
+
+        verify(f.curatorClient).start()
+        c shouldBe children
+      }
+    }
+
+    it("getChildren should return empty list if key doesn't exist") {
+      withMock { f =>
+        mockExistsBuilder(f.curatorClient, false)
+
+        val c = f.stateStore.getChildren("key")
+
+        verify(f.curatorClient).start()
+        c shouldBe empty
+      }
+    }
+
+    it("remove should use curatorClient") {
+      withMock { f =>
+        val deleteBuilder = mock[DeleteBuilder]
+        when(f.curatorClient.delete()).thenReturn(deleteBuilder)
+        val g = mock[ChildrenDeletable]
+        when(deleteBuilder.guaranteed()).thenReturn(g)
+
+        f.stateStore.remove(key)
+
+        verify(g).forPath(prefixedKey)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/sessions/MockSession.scala b/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
new file mode 100644
index 0000000..3cfbe46
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.sessions
+
+import org.apache.livy.LivyConf
+
+class MockSession(id: Int, owner: String, conf: LivyConf) extends Session(id, owner, conf) {
+  case class RecoveryMetadata(id: Int) extends Session.RecoveryMetadata()
+
+  override val proxyUser = None
+
+  override protected def stopSession(): Unit = ()
+
+  override def logLines(): IndexedSeq[String] = IndexedSeq()
+
+  override def state: SessionState = SessionState.Idle()
+
+  override def recoveryMetadata: RecoveryMetadata = RecoveryMetadata(0)
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
new file mode 100644
index 0000000..beffa71
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.sessions
+
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.{Failure, Try}
+
+import org.mockito.Mockito.{doReturn, never, verify, when}
+import org.scalatest.{FunSpec, Matchers}
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.mock.MockitoSugar.mock
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+import org.apache.livy.server.batch.{BatchRecoveryMetadata, BatchSession}
+import org.apache.livy.server.interactive.InteractiveSession
+import org.apache.livy.server.recovery.SessionStore
+import org.apache.livy.sessions.Session.RecoveryMetadata
+
+class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuite {
+  implicit def executor: ExecutionContext = ExecutionContext.global
+
+  describe("SessionManager") {
+    it("should garbage collect old sessions") {
+      val livyConf = new LivyConf()
+      livyConf.set(LivyConf.SESSION_TIMEOUT, "100ms")
+      val manager = new SessionManager[MockSession, RecoveryMetadata](
+        livyConf,
+        { _ => assert(false).asInstanceOf[MockSession] },
+        mock[SessionStore],
+        "test",
+        Some(Seq.empty))
+      val session = manager.register(new MockSession(manager.nextId(), null, livyConf))
+      manager.get(session.id).isDefined should be(true)
+      eventually(timeout(5 seconds), interval(100 millis)) {
+        Await.result(manager.collectGarbage(), Duration.Inf)
+        manager.get(session.id) should be(None)
+      }
+    }
+
+    it("batch session should not be gc-ed until application is finished") {
+      val sessionId = 24
+      val session = mock[BatchSession]
+      when(session.id).thenReturn(sessionId)
+      when(session.stop()).thenReturn(Future {})
+      when(session.lastActivity).thenReturn(System.nanoTime())
+
+      val conf = new LivyConf().set(LivyConf.SESSION_STATE_RETAIN_TIME, "1s")
+      val sm = new BatchSessionManager(conf, mock[SessionStore], Some(Seq(session)))
+      testSessionGC(session, sm)
+    }
+
+    it("interactive session should not gc-ed if session timeout check is off") {
+      val sessionId = 24
+      val session = mock[InteractiveSession]
+      when(session.id).thenReturn(sessionId)
+      when(session.stop()).thenReturn(Future {})
+      when(session.lastActivity).thenReturn(System.nanoTime())
+
+      val conf = new LivyConf().set(LivyConf.SESSION_TIMEOUT_CHECK, false)
+        .set(LivyConf.SESSION_STATE_RETAIN_TIME, "1s")
+      val sm = new InteractiveSessionManager(conf, mock[SessionStore], Some(Seq(session)))
+      testSessionGC(session, sm)
+    }
+
+    def testSessionGC(session: Session, sm: SessionManager[_, _]): Unit = {
+
+      def changeStateAndCheck(s: SessionState)(fn: SessionManager[_, _] => Unit): Unit = {
+        doReturn(s).when(session).state
+        Await.result(sm.collectGarbage(), Duration.Inf)
+        fn(sm)
+      }
+
+      // Batch session should not be gc-ed when alive
+      for (s <- Seq(SessionState.Running(),
+        SessionState.Idle(),
+        SessionState.Recovering(),
+        SessionState.NotStarted(),
+        SessionState.Busy(),
+        SessionState.ShuttingDown())) {
+        changeStateAndCheck(s) { sm => sm.get(session.id) should be (Some(session)) }
+      }
+
+      // Stopped session should be gc-ed after retained timeout
+      for (s <- Seq(SessionState.Error(),
+        SessionState.Success(),
+        SessionState.Dead())) {
+        eventually(timeout(30 seconds), interval(100 millis)) {
+          changeStateAndCheck(s) { sm => sm.get(session.id) should be (None) }
+        }
+      }
+    }
+  }
+
+  describe("BatchSessionManager") {
+    implicit def executor: ExecutionContext = ExecutionContext.global
+
+    def makeMetadata(id: Int, appTag: String): BatchRecoveryMetadata = {
+      BatchRecoveryMetadata(id, None, appTag, null, None)
+    }
+
+    def mockSession(id: Int): BatchSession = {
+      val session = mock[BatchSession]
+      when(session.id).thenReturn(id)
+      when(session.stop()).thenReturn(Future {})
+      when(session.lastActivity).thenReturn(System.nanoTime())
+
+      session
+    }
+
+    it("should not fail if state store is empty") {
+      val conf = new LivyConf()
+
+      val sessionStore = mock[SessionStore]
+      when(sessionStore.getAllSessions[BatchRecoveryMetadata]("batch"))
+        .thenReturn(Seq.empty)
+
+      val sm = new BatchSessionManager(conf, sessionStore)
+      sm.nextId() shouldBe 0
+    }
+
+    it("should recover sessions from state store") {
+      val conf = new LivyConf()
+      conf.set(LivyConf.LIVY_SPARK_MASTER.key, "yarn-cluster")
+
+      val sessionType = "batch"
+      val nextId = 99
+
+      val validMetadata = List(makeMetadata(0, "t1"), makeMetadata(77, "t2")).map(Try(_))
+      val invalidMetadata = List(Failure(new Exception("Fake invalid metadata")))
+      val sessionStore = mock[SessionStore]
+      when(sessionStore.getNextSessionId(sessionType)).thenReturn(nextId)
+      when(sessionStore.getAllSessions[BatchRecoveryMetadata](sessionType))
+        .thenReturn(validMetadata ++ invalidMetadata)
+
+      val sm = new BatchSessionManager(conf, sessionStore)
+      sm.nextId() shouldBe nextId
+      validMetadata.foreach { m =>
+        sm.get(m.get.id) shouldBe defined
+      }
+      sm.size shouldBe validMetadata.size
+    }
+
+    it("should delete sessions from state store") {
+      val conf = new LivyConf()
+
+      val sessionType = "batch"
+      val sessionId = 24
+      val sessionStore = mock[SessionStore]
+      val session = mockSession(sessionId)
+
+      val sm = new BatchSessionManager(conf, sessionStore, Some(Seq(session)))
+      sm.get(sessionId) shouldBe defined
+
+      Await.ready(sm.delete(sessionId).get, 30 seconds)
+
+      verify(sessionStore).remove(sessionType, sessionId)
+      sm.get(sessionId) shouldBe None
+    }
+
+    it("should delete sessions on shutdown when recovery is off") {
+      val conf = new LivyConf()
+      val sessionId = 24
+      val sessionStore = mock[SessionStore]
+      val session = mockSession(sessionId)
+
+      val sm = new BatchSessionManager(conf, sessionStore, Some(Seq(session)))
+      sm.get(sessionId) shouldBe defined
+      sm.shutdown()
+
+      verify(session).stop()
+    }
+
+    it("should not delete sessions on shutdown with recovery is on") {
+      val conf = new LivyConf()
+      conf.set(LivyConf.RECOVERY_MODE, SessionManager.SESSION_RECOVERY_MODE_RECOVERY)
+
+      val sessionId = 24
+      val sessionStore = mock[SessionStore]
+      val session = mockSession(sessionId)
+
+      val sm = new BatchSessionManager(conf, sessionStore, Some(Seq(session)))
+      sm.get(sessionId) shouldBe defined
+      sm.shutdown()
+
+      verify(session, never).stop()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/org/apache/livy/sessions/SessionSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/sessions/SessionSpec.scala b/server/src/test/scala/org/apache/livy/sessions/SessionSpec.scala
new file mode 100644
index 0000000..a7bfaaa
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/sessions/SessionSpec.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.sessions
+
+import java.net.URI
+
+import org.scalatest.FunSuite
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+
+class SessionSpec extends FunSuite with LivyBaseUnitTestSuite {
+
+  test("use default fs in paths") {
+    val conf = new LivyConf(false)
+    conf.hadoopConf.set("fs.defaultFS", "dummy:///")
+
+    val uris = Seq("http://example.com/foo", "hdfs:/bar", "/baz")
+    val expected = Seq(uris(0), uris(1), "dummy:///baz")
+    assert(Session.resolveURIs(uris, conf) === expected)
+
+    intercept[IllegalArgumentException] {
+      Session.resolveURI(new URI("relative_path"), conf)
+    }
+  }
+
+  test("local fs whitelist") {
+    val conf = new LivyConf(false)
+    conf.set(LivyConf.LOCAL_FS_WHITELIST, "/allowed/,/also_allowed")
+
+    Seq("/allowed/file", "/also_allowed/file").foreach { path =>
+      assert(Session.resolveURI(new URI(path), conf) === new URI("file://" + path))
+    }
+
+    Seq("/not_allowed", "/allowed_not_really").foreach { path =>
+      intercept[IllegalArgumentException] {
+        Session.resolveURI(new URI(path), conf)
+      }
+    }
+  }
+
+  test("conf validation and preparation") {
+    val conf = new LivyConf(false)
+    conf.hadoopConf.set("fs.defaultFS", "dummy:///")
+    conf.set(LivyConf.LOCAL_FS_WHITELIST, "/allowed")
+
+    // Test baseline.
+    assert(Session.prepareConf(Map(), Nil, Nil, Nil, Nil, conf) === Map("spark.master" -> "local"))
+
+    // Test validations.
+    intercept[IllegalArgumentException] {
+      Session.prepareConf(Map("spark.do_not_set" -> "1"), Nil, Nil, Nil, Nil, conf)
+    }
+    conf.sparkFileLists.foreach { key =>
+      intercept[IllegalArgumentException] {
+        Session.prepareConf(Map(key -> "file:/not_allowed"), Nil, Nil, Nil, Nil, conf)
+      }
+    }
+    intercept[IllegalArgumentException] {
+      Session.prepareConf(Map(), Seq("file:/not_allowed"), Nil, Nil, Nil, conf)
+    }
+    intercept[IllegalArgumentException] {
+      Session.prepareConf(Map(), Nil, Seq("file:/not_allowed"), Nil, Nil, conf)
+    }
+    intercept[IllegalArgumentException] {
+      Session.prepareConf(Map(), Nil, Nil, Seq("file:/not_allowed"), Nil, conf)
+    }
+    intercept[IllegalArgumentException] {
+      Session.prepareConf(Map(), Nil, Nil, Nil, Seq("file:/not_allowed"), conf)
+    }
+
+    // Test that file lists are merged and resolved.
+    val base = "/file1.txt"
+    val other = Seq("/file2.txt")
+    val expected = Some(Seq("dummy://" + other(0), "dummy://" + base).mkString(","))
+
+    val userLists = Seq(LivyConf.SPARK_JARS, LivyConf.SPARK_FILES, LivyConf.SPARK_ARCHIVES,
+      LivyConf.SPARK_PY_FILES)
+    val baseConf = userLists.map { key => (key -> base) }.toMap
+    val result = Session.prepareConf(baseConf, other, other, other, other, conf)
+    userLists.foreach { key => assert(result.get(key) === expected) }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala b/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala
new file mode 100644
index 0000000..eb5f30a
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.utils
+
+import org.scalatest.FunSuite
+import org.scalatest.Matchers
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+import org.apache.livy.LivyConf._
+import org.apache.livy.server.LivyServer
+
+class LivySparkUtilsSuite extends FunSuite with Matchers with LivyBaseUnitTestSuite {
+
+  import LivySparkUtils._
+
+  private val livyConf = new LivyConf()
+  private val livyConf210 = new LivyConf()
+  livyConf210.set(LIVY_SPARK_SCALA_VERSION, "2.10.6")
+
+  private val livyConf211 = new LivyConf()
+  livyConf211.set(LIVY_SPARK_SCALA_VERSION, "2.11.1")
+
+  test("check for SPARK_HOME") {
+    testSparkHome(livyConf)
+  }
+
+  test("check spark-submit version") {
+    testSparkSubmit(livyConf)
+  }
+
+  test("should support Spark 1.6") {
+    testSparkVersion("1.6.0")
+    testSparkVersion("1.6.1")
+    testSparkVersion("1.6.1-SNAPSHOT")
+    testSparkVersion("1.6.2")
+    testSparkVersion("1.6")
+    testSparkVersion("1.6.3.2.5.0-12")
+  }
+
+  test("should support Spark 2.0.x") {
+    testSparkVersion("2.0.0")
+    testSparkVersion("2.0.1")
+    testSparkVersion("2.0.2")
+    testSparkVersion("2.0.3-SNAPSHOT")
+    testSparkVersion("2.0.0.2.5.1.0-56") // LIVY-229
+    testSparkVersion("2.0")
+    testSparkVersion("2.1.0")
+    testSparkVersion("2.1.1")
+  }
+
+  test("should not support Spark older than 1.6") {
+    intercept[IllegalArgumentException] { testSparkVersion("1.4.0") }
+    intercept[IllegalArgumentException] { testSparkVersion("1.5.0") }
+    intercept[IllegalArgumentException] { testSparkVersion("1.5.1") }
+    intercept[IllegalArgumentException] { testSparkVersion("1.5.2") }
+    intercept[IllegalArgumentException] { testSparkVersion("1.5.0-cdh5.6.1") }
+  }
+
+  test("should fail on bad version") {
+    intercept[IllegalArgumentException] { testSparkVersion("not a version") }
+  }
+
+  test("should error out if recovery is turned on but master isn't yarn") {
+    val livyConf = new LivyConf()
+    livyConf.set(LivyConf.LIVY_SPARK_MASTER, "local")
+    livyConf.set(LivyConf.RECOVERY_MODE, "recovery")
+    val s = new LivyServer()
+    intercept[IllegalArgumentException] { s.testRecovery(livyConf) }
+  }
+
+  test("formatScalaVersion() should format Scala version") {
+    formatScalaVersion("2.10.8") shouldBe "2.10"
+    formatScalaVersion("2.11.4") shouldBe "2.11"
+    formatScalaVersion("2.10") shouldBe "2.10"
+    formatScalaVersion("2.10.x.x.x.x") shouldBe "2.10"
+
+    // Throw exception for bad Scala version.
+    intercept[IllegalArgumentException] { formatScalaVersion("") }
+    intercept[IllegalArgumentException] { formatScalaVersion("xxx") }
+  }
+
+  test("defaultSparkScalaVersion() should return default Scala version") {
+    defaultSparkScalaVersion(formatSparkVersion("1.6.0")) shouldBe "2.10"
+    defaultSparkScalaVersion(formatSparkVersion("1.6.1")) shouldBe "2.10"
+    defaultSparkScalaVersion(formatSparkVersion("1.6.2")) shouldBe "2.10"
+    defaultSparkScalaVersion(formatSparkVersion("2.0.0")) shouldBe "2.11"
+    defaultSparkScalaVersion(formatSparkVersion("2.0.1")) shouldBe "2.11"
+
+    // Throw exception for unsupported Spark version.
+    intercept[IllegalArgumentException] { defaultSparkScalaVersion(formatSparkVersion("1.5.0")) }
+  }
+
+  test("sparkScalaVersion() should use spark-submit detected Scala version.") {
+    sparkScalaVersion(formatSparkVersion("2.0.1"), Some("2.10"), livyConf) shouldBe "2.10"
+    sparkScalaVersion(formatSparkVersion("1.6.0"), Some("2.11"), livyConf) shouldBe "2.11"
+  }
+
+  test("sparkScalaVersion() should throw if configured and detected Scala version mismatch.") {
+    intercept[IllegalArgumentException] {
+      sparkScalaVersion(formatSparkVersion("2.0.1"), Some("2.11"), livyConf210)
+    }
+    intercept[IllegalArgumentException] {
+      sparkScalaVersion(formatSparkVersion("1.6.1"), Some("2.10"), livyConf211)
+    }
+  }
+
+  test("sparkScalaVersion() should use configured Scala version if spark-submit doesn't tell.") {
+    sparkScalaVersion(formatSparkVersion("1.6.0"), None, livyConf210) shouldBe "2.10"
+    sparkScalaVersion(formatSparkVersion("1.6.2"), None, livyConf210) shouldBe "2.10"
+    sparkScalaVersion(formatSparkVersion("2.0.0"), None, livyConf210) shouldBe "2.10"
+    sparkScalaVersion(formatSparkVersion("2.0.1"), None, livyConf210) shouldBe "2.10"
+    sparkScalaVersion(formatSparkVersion("1.6.0"), None, livyConf211) shouldBe "2.11"
+    sparkScalaVersion(formatSparkVersion("1.6.2"), None, livyConf211) shouldBe "2.11"
+    sparkScalaVersion(formatSparkVersion("2.0.0"), None, livyConf211) shouldBe "2.11"
+    sparkScalaVersion(formatSparkVersion("2.0.1"), None, livyConf211) shouldBe "2.11"
+  }
+
+  test("sparkScalaVersion() should use default Spark Scala version.") {
+    sparkScalaVersion(formatSparkVersion("1.6.0"), None, livyConf) shouldBe "2.10"
+    sparkScalaVersion(formatSparkVersion("1.6.2"), None, livyConf) shouldBe "2.10"
+    sparkScalaVersion(formatSparkVersion("2.0.0"), None, livyConf) shouldBe "2.11"
+    sparkScalaVersion(formatSparkVersion("2.0.1"), None, livyConf) shouldBe "2.11"
+    sparkScalaVersion(formatSparkVersion("2.1.0"), None, livyConf) shouldBe "2.11"
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
new file mode 100644
index 0000000..b3c50da
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.livy.utils
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus.UNDEFINED
+import org.apache.hadoop.yarn.api.records.YarnApplicationState._
+import org.apache.hadoop.yarn.client.api.YarnClient
+import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.FunSpec
+import org.scalatest.mock.MockitoSugar.mock
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+import org.apache.livy.utils.SparkApp._
+
+class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
+  private def cleanupThread(t: Thread)(f: => Unit) = {
+    try { f } finally { t.interrupt() }
+  }
+
+  private def mockSleep(ms: Long) = {
+    Thread.`yield`()
+  }
+
+  describe("SparkYarnApp") {
+    val TEST_TIMEOUT = 30 seconds
+    val appId = ConverterUtils.toApplicationId("application_1467912463905_0021")
+    val appIdOption = Some(appId.toString)
+    val appTag = "fakeTag"
+    val livyConf = new LivyConf()
+    livyConf.set(LivyConf.YARN_APP_LOOKUP_TIMEOUT, "30s")
+
+    it("should poll YARN state and terminate") {
+      Clock.withSleepMethod(mockSleep) {
+        val mockYarnClient = mock[YarnClient]
+        val mockAppListener = mock[SparkAppListener]
+
+        val mockAppReport = mock[ApplicationReport]
+        when(mockAppReport.getApplicationId).thenReturn(appId)
+        when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
+        // Simulate YARN app state progression.
+        when(mockAppReport.getYarnApplicationState).thenAnswer(new Answer[YarnApplicationState]() {
+          private var stateSeq = List(ACCEPTED, RUNNING, FINISHED)
+
+          override def answer(invocation: InvocationOnMock): YarnApplicationState = {
+            val currentState = stateSeq.head
+            if (stateSeq.tail.nonEmpty) {
+              stateSeq = stateSeq.tail
+            }
+            currentState
+          }
+        })
+        when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
+
+        val app = new SparkYarnApp(
+          appTag,
+          appIdOption,
+          None,
+          Some(mockAppListener),
+          livyConf,
+          mockYarnClient)
+        cleanupThread(app.yarnAppMonitorThread) {
+          app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
+          assert(!app.yarnAppMonitorThread.isAlive,
+            "YarnAppMonitorThread should terminate after YARN app is finished.")
+          verify(mockYarnClient, atLeast(1)).getApplicationReport(appId)
+          verify(mockAppListener).stateChanged(State.STARTING, State.RUNNING)
+          verify(mockAppListener).stateChanged(State.RUNNING, State.FINISHED)
+        }
+      }
+    }
+
+    it("should kill yarn app") {
+      Clock.withSleepMethod(mockSleep) {
+        val diag = "DIAG"
+        val mockYarnClient = mock[YarnClient]
+
+        val mockAppReport = mock[ApplicationReport]
+        when(mockAppReport.getApplicationId).thenReturn(appId)
+        when(mockAppReport.getDiagnostics).thenReturn(diag)
+        when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
+
+        var appKilled = false
+        when(mockAppReport.getYarnApplicationState).thenAnswer(new Answer[YarnApplicationState]() {
+          override def answer(invocation: InvocationOnMock): YarnApplicationState = {
+            if (!appKilled) {
+              RUNNING
+            } else {
+              KILLED
+            }
+          }
+        })
+        when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
+
+        val app = new SparkYarnApp(appTag, appIdOption, None, None, livyConf, mockYarnClient)
+        cleanupThread(app.yarnAppMonitorThread) {
+          app.kill()
+          appKilled = true
+
+          app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
+          assert(!app.yarnAppMonitorThread.isAlive,
+            "YarnAppMonitorThread should terminate after YARN app is finished.")
+          verify(mockYarnClient, atLeast(1)).getApplicationReport(appId)
+          verify(mockYarnClient).killApplication(appId)
+          assert(app.log().mkString.contains(diag))
+        }
+      }
+    }
+
+    it("should return spark-submit log") {
+      Clock.withSleepMethod(mockSleep) {
+        val mockYarnClient = mock[YarnClient]
+        val mockSparkSubmit = mock[LineBufferedProcess]
+        val sparkSubmitInfoLog = IndexedSeq("SPARK-SUBMIT", "LOG")
+        val sparkSubmitErrorLog = IndexedSeq("SPARK-SUBMIT", "error log")
+        val sparkSubmitLog = ("stdout: " +: sparkSubmitInfoLog) ++
+          ("\nstderr: " +: sparkSubmitErrorLog) :+ "\nYARN Diagnostics: "
+        when(mockSparkSubmit.inputLines).thenReturn(sparkSubmitInfoLog)
+        when(mockSparkSubmit.errorLines).thenReturn(sparkSubmitErrorLog)
+        val waitForCalledLatch = new CountDownLatch(1)
+        when(mockSparkSubmit.waitFor()).thenAnswer(new Answer[Int]() {
+          override def answer(invocation: InvocationOnMock): Int = {
+            waitForCalledLatch.countDown()
+            0
+          }
+        })
+
+        val mockAppReport = mock[ApplicationReport]
+        when(mockAppReport.getApplicationId).thenReturn(appId)
+        when(mockAppReport.getYarnApplicationState).thenReturn(YarnApplicationState.FINISHED)
+        when(mockAppReport.getDiagnostics).thenReturn(null)
+        when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
+
+        val app = new SparkYarnApp(
+          appTag,
+          appIdOption,
+          Some(mockSparkSubmit),
+          None,
+          livyConf,
+          mockYarnClient)
+        cleanupThread(app.yarnAppMonitorThread) {
+          waitForCalledLatch.await(TEST_TIMEOUT.toMillis, TimeUnit.MILLISECONDS)
+          assert(app.log() == sparkSubmitLog, "Expect spark-submit log")
+        }
+      }
+    }
+
+    it("can kill spark-submit while it's running") {
+      Clock.withSleepMethod(mockSleep) {
+        val livyConf = new LivyConf()
+        livyConf.set(LivyConf.YARN_APP_LOOKUP_TIMEOUT, "0")
+
+        val mockYarnClient = mock[YarnClient]
+        val mockSparkSubmit = mock[LineBufferedProcess]
+
+        val sparkSubmitRunningLatch = new CountDownLatch(1)
+        // Simulate a running spark-submit
+        when(mockSparkSubmit.waitFor()).thenAnswer(new Answer[Int]() {
+          override def answer(invocation: InvocationOnMock): Int = {
+            sparkSubmitRunningLatch.await()
+            0
+          }
+        })
+
+        val app = new SparkYarnApp(
+          appTag,
+          appIdOption,
+          Some(mockSparkSubmit),
+          None,
+          livyConf,
+          mockYarnClient)
+        cleanupThread(app.yarnAppMonitorThread) {
+          app.kill()
+          verify(mockSparkSubmit, times(1)).destroy()
+          sparkSubmitRunningLatch.countDown()
+        }
+      }
+    }
+
+    it("should map YARN state to SparkApp.State correctly") {
+      val app = new SparkYarnApp(appTag, appIdOption, None, None, livyConf)
+      cleanupThread(app.yarnAppMonitorThread) {
+        assert(app.mapYarnState(appId, NEW, UNDEFINED) == State.STARTING)
+        assert(app.mapYarnState(appId, NEW_SAVING, UNDEFINED) == State.STARTING)
+        assert(app.mapYarnState(appId, SUBMITTED, UNDEFINED) == State.STARTING)
+        assert(app.mapYarnState(appId, ACCEPTED, UNDEFINED) == State.STARTING)
+        assert(app.mapYarnState(appId, RUNNING, UNDEFINED) == State.RUNNING)
+        assert(
+          app.mapYarnState(appId, FINISHED, FinalApplicationStatus.SUCCEEDED) == State.FINISHED)
+        assert(app.mapYarnState(appId, FINISHED, FinalApplicationStatus.FAILED) == State.FAILED)
+        assert(app.mapYarnState(appId, FINISHED, FinalApplicationStatus.KILLED) == State.KILLED)
+        assert(app.mapYarnState(appId, FINISHED, UNDEFINED) == State.FAILED)
+        assert(app.mapYarnState(appId, FAILED, UNDEFINED) == State.FAILED)
+        assert(app.mapYarnState(appId, KILLED, UNDEFINED) == State.KILLED)
+      }
+    }
+
+    it("should expose driver log url and Spark UI url") {
+      Clock.withSleepMethod(mockSleep) {
+        val mockYarnClient = mock[YarnClient]
+        val driverLogUrl = "DRIVER LOG URL"
+        val sparkUiUrl = "SPARK UI URL"
+
+        val mockApplicationAttemptId = mock[ApplicationAttemptId]
+        val mockAppReport = mock[ApplicationReport]
+        when(mockAppReport.getApplicationId).thenReturn(appId)
+        when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
+        when(mockAppReport.getTrackingUrl).thenReturn(sparkUiUrl)
+        when(mockAppReport.getCurrentApplicationAttemptId).thenReturn(mockApplicationAttemptId)
+        var done = false
+        when(mockAppReport.getYarnApplicationState).thenAnswer(new Answer[YarnApplicationState]() {
+          override def answer(invocation: InvocationOnMock): YarnApplicationState = {
+            if (!done) {
+              RUNNING
+            } else {
+              FINISHED
+            }
+          }
+        })
+        when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
+
+        val mockAttemptReport = mock[ApplicationAttemptReport]
+        val mockContainerId = mock[ContainerId]
+        when(mockAttemptReport.getAMContainerId).thenReturn(mockContainerId)
+        when(mockYarnClient.getApplicationAttemptReport(mockApplicationAttemptId))
+          .thenReturn(mockAttemptReport)
+
+        val mockContainerReport = mock[ContainerReport]
+        when(mockYarnClient.getContainerReport(mockContainerId)).thenReturn(mockContainerReport)
+
+        // Block test until getLogUrl is called 10 times.
+        val getLogUrlCountDown = new CountDownLatch(10)
+        when(mockContainerReport.getLogUrl).thenAnswer(new Answer[String] {
+          override def answer(invocation: InvocationOnMock): String = {
+            getLogUrlCountDown.countDown()
+            driverLogUrl
+          }
+        })
+
+        val mockListener = mock[SparkAppListener]
+
+        val app = new SparkYarnApp(
+          appTag, appIdOption, None, Some(mockListener), livyConf, mockYarnClient)
+        cleanupThread(app.yarnAppMonitorThread) {
+          getLogUrlCountDown.await(TEST_TIMEOUT.length, TEST_TIMEOUT.unit)
+          done = true
+
+          app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
+          assert(!app.yarnAppMonitorThread.isAlive,
+            "YarnAppMonitorThread should terminate after YARN app is finished.")
+
+          verify(mockYarnClient, atLeast(1)).getApplicationReport(appId)
+          verify(mockAppReport, atLeast(1)).getTrackingUrl()
+          verify(mockContainerReport, atLeast(1)).getLogUrl()
+          verify(mockListener).appIdKnown(appId.toString)
+          verify(mockListener).infoChanged(AppInfo(Some(driverLogUrl), Some(sparkUiUrl)))
+        }
+      }
+    }
+
+    it("should not die on YARN-4411") {
+      Clock.withSleepMethod(mockSleep) {
+        val mockYarnClient = mock[YarnClient]
+
+        // Block test until getApplicationReport is called 10 times.
+        val pollCountDown = new CountDownLatch(10)
+        when(mockYarnClient.getApplicationReport(appId)).thenAnswer(new Answer[ApplicationReport] {
+          override def answer(invocation: InvocationOnMock): ApplicationReport = {
+            pollCountDown.countDown()
+            throw new IllegalArgumentException("No enum constant " +
+              "org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState.FINAL_SAVING")
+          }
+        })
+
+        val app = new SparkYarnApp(appTag, appIdOption, None, None, livyConf, mockYarnClient)
+        cleanupThread(app.yarnAppMonitorThread) {
+          pollCountDown.await(TEST_TIMEOUT.length, TEST_TIMEOUT.unit)
+          assert(app.state == SparkApp.State.STARTING)
+
+          app.state = SparkApp.State.FINISHED
+          app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
+        }
+      }
+    }
+
+    it("should not die on ApplicationAttemptNotFoundException") {
+      Clock.withSleepMethod(mockSleep) {
+        val mockYarnClient = mock[YarnClient]
+        val mockAppReport = mock[ApplicationReport]
+        val mockApplicationAttemptId = mock[ApplicationAttemptId]
+        var done = false
+
+        when(mockAppReport.getApplicationId).thenReturn(appId)
+        when(mockAppReport.getYarnApplicationState).thenAnswer(
+          new Answer[YarnApplicationState]() {
+            override def answer(invocation: InvocationOnMock): YarnApplicationState = {
+              if (done) {
+                FINISHED
+              } else {
+                RUNNING
+              }
+            }
+          })
+        when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
+        when(mockAppReport.getCurrentApplicationAttemptId).thenReturn(mockApplicationAttemptId)
+        when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
+
+        // Block test until getApplicationReport is called 10 times.
+        val pollCountDown = new CountDownLatch(10)
+        when(mockYarnClient.getApplicationAttemptReport(mockApplicationAttemptId)).thenAnswer(
+          new Answer[ApplicationReport] {
+            override def answer(invocation: InvocationOnMock): ApplicationReport = {
+              pollCountDown.countDown()
+              throw new ApplicationAttemptNotFoundException("unit test")
+            }
+          })
+
+        val app = new SparkYarnApp(appTag, appIdOption, None, None, livyConf, mockYarnClient)
+        cleanupThread(app.yarnAppMonitorThread) {
+          pollCountDown.await(TEST_TIMEOUT.length, TEST_TIMEOUT.unit)
+          assert(app.state == SparkApp.State.RUNNING)
+          done = true
+
+          app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/pom.xml
----------------------------------------------------------------------
diff --git a/test-lib/pom.xml b/test-lib/pom.xml
index 131e0e6..c380764 100644
--- a/test-lib/pom.xml
+++ b/test-lib/pom.xml
@@ -18,14 +18,14 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
-    <groupId>com.cloudera.livy</groupId>
+    <groupId>org.apache.livy</groupId>
     <artifactId>livy-main</artifactId>
-    <version>0.4.0-SNAPSHOT</version>
+    <version>0.4.0-incubating-SNAPSHOT</version>
   </parent>
 
-  <groupId>com.cloudera.livy</groupId>
+  <groupId>org.apache.livy</groupId>
   <artifactId>livy-test-lib</artifactId>
-  <version>0.4.0-SNAPSHOT</version>
+  <version>0.4.0-incubating-SNAPSHOT</version>
   <packaging>jar</packaging>
 
   <properties>
@@ -34,7 +34,7 @@
 
   <dependencies>
     <dependency>
-      <groupId>com.cloudera.livy</groupId>
+      <groupId>org.apache.livy</groupId>
       <artifactId>livy-api</artifactId>
       <version>${project.version}</version>
       <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/com/cloudera/livy/test/apps/FailingApp.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/com/cloudera/livy/test/apps/FailingApp.java b/test-lib/src/main/java/com/cloudera/livy/test/apps/FailingApp.java
deleted file mode 100644
index 51ba795..0000000
--- a/test-lib/src/main/java/com/cloudera/livy/test/apps/FailingApp.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.apps;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public class FailingApp {
-
-  public static void main(String[] args) throws Exception {
-    if (args.length != 1) {
-      throw new IllegalArgumentException("Missing output path.");
-    }
-    String output = args[0];
-
-    FileSystem fs = FileSystem.get(new Configuration());
-    Path out = new Path(output);
-    fs.create(out).close();
-
-    throw new IllegalStateException("This app always fails.");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/com/cloudera/livy/test/apps/SimpleSparkApp.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/com/cloudera/livy/test/apps/SimpleSparkApp.java b/test-lib/src/main/java/com/cloudera/livy/test/apps/SimpleSparkApp.java
deleted file mode 100644
index 922066e..0000000
--- a/test-lib/src/main/java/com/cloudera/livy/test/apps/SimpleSparkApp.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.apps;
-
-import java.util.Arrays;
-import java.util.List;
-
-import scala.Tuple2;
-
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.PairFunction;
-
-public class SimpleSparkApp {
-
-  public static void main(String[] args) throws Exception {
-    if (args.length < 1 || args.length > 2) {
-      throw new IllegalArgumentException(
-        "Invalid arguments. <output path> [exit after output=true]>");
-    }
-
-    String output = args[0];
-    Boolean exitAfterOutput = true;
-    if (args.length == 2) {
-      exitAfterOutput = Boolean.parseBoolean(args[1]);
-    }
-
-    JavaSparkContext sc = new JavaSparkContext();
-    try {
-      List<String> data = Arrays.asList("the", "quick", "brown", "fox", "jumped", "over", "the",
-        "lazy", "dog");
-
-      JavaPairRDD<String, Integer> rdd = sc.parallelize(data, 3)
-        .mapToPair(new Counter());
-      rdd.saveAsTextFile(output);
-
-      if (!exitAfterOutput) {
-        while (true) {
-          Thread.sleep(60 * 60 * 1000);
-        }
-      }
-    } finally {
-      sc.close();
-    }
-  }
-
-  private static class Counter implements PairFunction<String, String, Integer> {
-
-    @Override
-    public Tuple2<String, Integer> call(String s) throws Exception {
-      return new Tuple2<>(s, s.length());
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/com/cloudera/livy/test/jobs/Echo.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/com/cloudera/livy/test/jobs/Echo.java b/test-lib/src/main/java/com/cloudera/livy/test/jobs/Echo.java
deleted file mode 100644
index 7ea3aa7..0000000
--- a/test-lib/src/main/java/com/cloudera/livy/test/jobs/Echo.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.jobs;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-
-public class Echo<T> implements Job<T> {
-
-  private final T value;
-
-  public Echo(T value) {
-    this.value = value;
-  }
-
-  @Override
-  public T call(JobContext jc) {
-    return value;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/com/cloudera/livy/test/jobs/Failure.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/com/cloudera/livy/test/jobs/Failure.java b/test-lib/src/main/java/com/cloudera/livy/test/jobs/Failure.java
deleted file mode 100644
index 9232347..0000000
--- a/test-lib/src/main/java/com/cloudera/livy/test/jobs/Failure.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.jobs;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-
-public class Failure implements Job<Void> {
-
-  @Override
-  public Void call(JobContext jc) {
-    throw new JobFailureException();
-  }
-
-  public static class JobFailureException extends RuntimeException {
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/com/cloudera/livy/test/jobs/FileReader.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/com/cloudera/livy/test/jobs/FileReader.java b/test-lib/src/main/java/com/cloudera/livy/test/jobs/FileReader.java
deleted file mode 100644
index 3bca2f3..0000000
--- a/test-lib/src/main/java/com/cloudera/livy/test/jobs/FileReader.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.jobs;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import org.apache.spark.SparkFiles;
-import org.apache.spark.api.java.function.Function;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-
-public class FileReader implements Job<String> {
-
-  private final boolean isResource;
-  private final String fileName;
-
-  public FileReader(String fileName, boolean isResource) {
-    this.fileName = fileName;
-    this.isResource = isResource;
-  }
-
-  @Override
-  public String call(JobContext jc) {
-    return jc.sc().parallelize(Arrays.asList(1)).map(new Reader()).collect().get(0);
-  }
-
-  private class Reader implements Function<Integer, String> {
-
-    @Override
-    public String call(Integer i) throws Exception {
-      InputStream in;
-      if (isResource) {
-        ClassLoader ccl = Thread.currentThread().getContextClassLoader();
-        in = ccl.getResourceAsStream(fileName);
-        if (in == null) {
-          throw new IOException("Resource not found: " + fileName);
-        }
-      } else {
-        in = new FileInputStream(SparkFiles.get(fileName));
-      }
-      try {
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        byte[] buf = new byte[1024];
-        int read;
-        while ((read = in.read(buf)) >= 0) {
-          out.write(buf, 0, read);
-        }
-        byte[] bytes = out.toByteArray();
-        return new String(bytes, 0, bytes.length, UTF_8);
-      } finally {
-        in.close();
-      }
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/com/cloudera/livy/test/jobs/GetCurrentUser.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/com/cloudera/livy/test/jobs/GetCurrentUser.java b/test-lib/src/main/java/com/cloudera/livy/test/jobs/GetCurrentUser.java
deleted file mode 100644
index 5919896..0000000
--- a/test-lib/src/main/java/com/cloudera/livy/test/jobs/GetCurrentUser.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.jobs;
-
-import org.apache.hadoop.security.UserGroupInformation;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-
-public class GetCurrentUser implements Job<String> {
-
-  @Override
-  public String call(JobContext jc) throws Exception {
-    return UserGroupInformation.getCurrentUser().getUserName();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/com/cloudera/livy/test/jobs/SQLGetTweets.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/com/cloudera/livy/test/jobs/SQLGetTweets.java b/test-lib/src/main/java/com/cloudera/livy/test/jobs/SQLGetTweets.java
deleted file mode 100644
index 15aa79b..0000000
--- a/test-lib/src/main/java/com/cloudera/livy/test/jobs/SQLGetTweets.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.jobs;
-
-import java.io.File;
-import java.io.InputStream;
-import java.net.URI;
-import java.nio.file.Files;
-import java.nio.file.StandardCopyOption;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-
-public class SQLGetTweets implements Job<List<String>> {
-
-  private final boolean useHiveContext;
-
-  public SQLGetTweets(boolean useHiveContext) {
-    this.useHiveContext = useHiveContext;
-  }
-
-  @Override
-  public List<String> call(JobContext jc) throws Exception {
-    InputStream source = getClass().getResourceAsStream("/testweet.json");
-
-    // Save the resource as a file in HDFS (or the local tmp dir when using a local filesystem).
-    URI input;
-    File local = File.createTempFile("tweets", ".json", jc.getLocalTmpDir());
-    Files.copy(source, local.toPath(), StandardCopyOption.REPLACE_EXISTING);
-    FileSystem fs = FileSystem.get(jc.sc().sc().hadoopConfiguration());
-    if ("file".equals(fs.getUri().getScheme())) {
-      input = local.toURI();
-    } else {
-      String uuid = UUID.randomUUID().toString();
-      Path target = new Path("/tmp/" + uuid + "-tweets.json");
-      fs.copyFromLocalFile(new Path(local.toURI()), target);
-      input = target.toUri();
-    }
-
-    SQLContext sqlctx = useHiveContext ? jc.hivectx() : jc.sqlctx();
-    sqlctx.jsonFile(input.toString()).registerTempTable("tweets");
-
-    List<String> tweetList = new ArrayList<>();
-    Row[] result =
-      (Row[])(sqlctx.sql("SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10")
-        .collect());
-    for (Row r : result) {
-       tweetList.add(r.toString());
-    }
-    return tweetList;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/com/cloudera/livy/test/jobs/Sleeper.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/com/cloudera/livy/test/jobs/Sleeper.java b/test-lib/src/main/java/com/cloudera/livy/test/jobs/Sleeper.java
deleted file mode 100644
index a3c33d4..0000000
--- a/test-lib/src/main/java/com/cloudera/livy/test/jobs/Sleeper.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.jobs;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-
-public class Sleeper implements Job<Void> {
-
-  private final long millis;
-
-  public Sleeper(long millis) {
-    this.millis = millis;
-  }
-
-  @Override
-  public Void call(JobContext jc) throws Exception {
-    Thread.sleep(millis);
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/com/cloudera/livy/test/jobs/SmallCount.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/com/cloudera/livy/test/jobs/SmallCount.java b/test-lib/src/main/java/com/cloudera/livy/test/jobs/SmallCount.java
deleted file mode 100644
index fb74027..0000000
--- a/test-lib/src/main/java/com/cloudera/livy/test/jobs/SmallCount.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.jobs;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-
-public class SmallCount implements Job<Long> {
-
-  private final int count;
-
-  public SmallCount(int count) {
-    this.count = count;
-  }
-
-  @Override
-  public Long call(JobContext jc) {
-    Random r = new Random();
-    int partitions = Math.min(r.nextInt(10) + 1, count);
-
-    List<Integer> elements = new ArrayList<>(count);
-    for (int i = 0; i < count; i++) {
-      elements.add(r.nextInt());
-    }
-
-    return jc.sc().parallelize(elements, partitions).count();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/com/cloudera/livy/test/jobs/VoidJob.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/com/cloudera/livy/test/jobs/VoidJob.java b/test-lib/src/main/java/com/cloudera/livy/test/jobs/VoidJob.java
deleted file mode 100644
index 0333ccf..0000000
--- a/test-lib/src/main/java/com/cloudera/livy/test/jobs/VoidJob.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.jobs;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-
-public class VoidJob implements Job<Void> {
-  @Override
-  public Void call(JobContext jc) {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/org/apache/livy/test/apps/FailingApp.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/org/apache/livy/test/apps/FailingApp.java b/test-lib/src/main/java/org/apache/livy/test/apps/FailingApp.java
new file mode 100644
index 0000000..6fa4850
--- /dev/null
+++ b/test-lib/src/main/java/org/apache/livy/test/apps/FailingApp.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.test.apps;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class FailingApp {
+
+  public static void main(String[] args) throws Exception {
+    if (args.length != 1) {
+      throw new IllegalArgumentException("Missing output path.");
+    }
+    String output = args[0];
+
+    FileSystem fs = FileSystem.get(new Configuration());
+    Path out = new Path(output);
+    fs.create(out).close();
+
+    throw new IllegalStateException("This app always fails.");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/org/apache/livy/test/apps/SimpleSparkApp.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/org/apache/livy/test/apps/SimpleSparkApp.java b/test-lib/src/main/java/org/apache/livy/test/apps/SimpleSparkApp.java
new file mode 100644
index 0000000..ef8e1a6
--- /dev/null
+++ b/test-lib/src/main/java/org/apache/livy/test/apps/SimpleSparkApp.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.test.apps;
+
+import java.util.Arrays;
+import java.util.List;
+
+import scala.Tuple2;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFunction;
+
+public class SimpleSparkApp {
+
+  public static void main(String[] args) throws Exception {
+    if (args.length < 1 || args.length > 2) {
+      throw new IllegalArgumentException(
+        "Invalid arguments. <output path> [exit after output=true]>");
+    }
+
+    String output = args[0];
+    Boolean exitAfterOutput = true;
+    if (args.length == 2) {
+      exitAfterOutput = Boolean.parseBoolean(args[1]);
+    }
+
+    JavaSparkContext sc = new JavaSparkContext();
+    try {
+      List<String> data = Arrays.asList("the", "quick", "brown", "fox", "jumped", "over", "the",
+        "lazy", "dog");
+
+      JavaPairRDD<String, Integer> rdd = sc.parallelize(data, 3)
+        .mapToPair(new Counter());
+      rdd.saveAsTextFile(output);
+
+      if (!exitAfterOutput) {
+        while (true) {
+          Thread.sleep(60 * 60 * 1000);
+        }
+      }
+    } finally {
+      sc.close();
+    }
+  }
+
+  private static class Counter implements PairFunction<String, String, Integer> {
+
+    @Override
+    public Tuple2<String, Integer> call(String s) throws Exception {
+      return new Tuple2<>(s, s.length());
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/org/apache/livy/test/jobs/Echo.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/org/apache/livy/test/jobs/Echo.java b/test-lib/src/main/java/org/apache/livy/test/jobs/Echo.java
new file mode 100644
index 0000000..3c3c834
--- /dev/null
+++ b/test-lib/src/main/java/org/apache/livy/test/jobs/Echo.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.test.jobs;
+
+import org.apache.livy.Job;
+import org.apache.livy.JobContext;
+
+public class Echo<T> implements Job<T> {
+
+  private final T value;
+
+  public Echo(T value) {
+    this.value = value;
+  }
+
+  @Override
+  public T call(JobContext jc) {
+    return value;
+  }
+
+}