You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/07/05 03:06:31 UTC
[02/33] incubator-livy git commit: LIVY-375. Change Livy code package
name to org.apache.livy
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala
new file mode 100644
index 0000000..e40bb1c
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server.recovery
+
+import org.scalatest.FunSpec
+import org.scalatest.Matchers._
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+
+class BlackholeStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
+ describe("BlackholeStateStore") {
+ val stateStore = new BlackholeStateStore(new LivyConf())
+
+ it("set should not throw") {
+ stateStore.set("", 1.asInstanceOf[Object])
+ }
+
+ it("get should return None") {
+ val v = stateStore.get[Object]("")
+ v shouldBe None
+ }
+
+ it("getChildren should return empty list") {
+ val c = stateStore.getChildren("")
+ c shouldBe empty
+ }
+
+ it("remove should not throw") {
+ stateStore.remove("")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala
new file mode 100644
index 0000000..4758c85
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server.recovery
+
+import java.io.{FileNotFoundException, InputStream, IOException}
+import java.util
+
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.Options.{CreateOpts, Rename}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.hamcrest.Description
+import org.mockito.ArgumentMatcher
+import org.mockito.Matchers.{any, anyInt, argThat, eq => equal}
+import org.mockito.Mockito.{atLeastOnce, verify, when}
+import org.mockito.internal.matchers.Equals
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.FunSpec
+import org.scalatest.Matchers._
+import org.scalatest.mock.MockitoSugar.mock
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+
+class FileSystemStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
+ describe("FileSystemStateStore") {
+ def pathEq(wantedPath: String): Path = argThat(new ArgumentMatcher[Path] {
+ private val matcher = new Equals(wantedPath)
+
+ override def matches(path: Any): Boolean = matcher.matches(path.toString)
+
+ override def describeTo(d: Description): Unit = { matcher.describeTo(d) }
+ })
+
+ def makeConf(): LivyConf = {
+ val conf = new LivyConf()
+ conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "file://tmp/")
+
+ conf
+ }
+
+ def mockFileContext(rootDirPermission: String): FileContext = {
+ val fileContext = mock[FileContext]
+ val rootDirStatus = mock[FileStatus]
+ when(fileContext.getFileStatus(any())).thenReturn(rootDirStatus)
+ when(rootDirStatus.getPermission).thenReturn(new FsPermission(rootDirPermission))
+
+ fileContext
+ }
+
+ it("should throw if url is not configured") {
+ intercept[IllegalArgumentException](new FileSystemStateStore(new LivyConf()))
+ }
+
+ it("should set and verify file permission") {
+ val fileContext = mockFileContext("700")
+ new FileSystemStateStore(makeConf(), Some(fileContext))
+
+ verify(fileContext).setUMask(new FsPermission("077"))
+ }
+
+ it("should reject insecure permission") {
+ def test(permission: String): Unit = {
+ val fileContext = mockFileContext(permission)
+
+ intercept[IllegalArgumentException](new FileSystemStateStore(makeConf(), Some(fileContext)))
+ }
+ test("600")
+ test("400")
+ test("677")
+ test("670")
+ test("607")
+ }
+
+ it("set should write with an intermediate file") {
+ val fileContext = mockFileContext("700")
+ val outputStream = mock[FSDataOutputStream]
+ when(fileContext.create(pathEq("/key.tmp"), any[util.EnumSet[CreateFlag]], any[CreateOpts]))
+ .thenReturn(outputStream)
+
+ val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext))
+
+ stateStore.set("key", "value")
+
+ verify(outputStream).write(""""value"""".getBytes)
+ verify(outputStream, atLeastOnce).close()
+
+
+ verify(fileContext).rename(pathEq("/key.tmp"), pathEq("/key"), equal(Rename.OVERWRITE))
+ verify(fileContext).delete(pathEq("/.key.tmp.crc"), equal(false))
+ }
+
+ it("get should read file") {
+ val fileContext = mockFileContext("700")
+ abstract class MockInputStream extends InputStream with Seekable with PositionedReadable {}
+ val inputStream: InputStream = mock[MockInputStream]
+ when(inputStream.read(any[Array[Byte]](), anyInt(), anyInt())).thenAnswer(new Answer[Int] {
+ private var firstCall = true
+ override def answer(invocation: InvocationOnMock): Int = {
+ if (firstCall) {
+ firstCall = false
+ val buf = invocation.getArguments()(0).asInstanceOf[Array[Byte]]
+ val b = """"value"""".getBytes()
+ b.copyToArray(buf)
+ b.length
+ } else {
+ -1
+ }
+ }
+ })
+
+ when(fileContext.open(pathEq("/key"))).thenReturn(new FSDataInputStream(inputStream))
+
+ val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext))
+
+ stateStore.get[String]("key") shouldBe Some("value")
+
+ verify(inputStream, atLeastOnce).close()
+ }
+
+ it("get non-existent key should return None") {
+ val fileContext = mockFileContext("700")
+ when(fileContext.open(any())).thenThrow(new FileNotFoundException("Unit test"))
+
+ val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext))
+
+ stateStore.get[String]("key") shouldBe None
+ }
+
+ it("getChildren should list file") {
+ val parentPath = "path"
+ def makeFileStatus(name: String): FileStatus = {
+ val fs = new FileStatus()
+ fs.setPath(new Path(parentPath, name))
+ fs
+ }
+ val children = Seq("c1", "c2")
+
+ val fileContext = mockFileContext("700")
+ val util = mock[FileContext#Util]
+ when(util.listStatus(pathEq(s"/$parentPath")))
+ .thenReturn(children.map(makeFileStatus).toArray)
+ when(fileContext.util()).thenReturn(util)
+
+ val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext))
+ stateStore.getChildren(parentPath) should contain theSameElementsAs children
+ }
+
+ def getChildrenErrorTest(error: Exception): Unit = {
+ val parentPath = "path"
+
+ val fileContext = mockFileContext("700")
+ val util = mock[FileContext#Util]
+ when(util.listStatus(pathEq(s"/$parentPath"))).thenThrow(error)
+ when(fileContext.util()).thenReturn(util)
+
+ val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext))
+ stateStore.getChildren(parentPath) shouldBe empty
+ }
+
+ it("getChildren should return empty list if the key doesn't exist") {
+ getChildrenErrorTest(new IOException("Unit test"))
+ }
+
+ it("getChildren should return empty list if key doesn't exist") {
+ getChildrenErrorTest(new FileNotFoundException("Unit test"))
+ }
+
+ it("remove should delete file") {
+ val fileContext = mockFileContext("700")
+
+ val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext))
+ stateStore.remove("key")
+
+ verify(fileContext).delete(pathEq("/key"), equal(false))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/org/apache/livy/server/recovery/SessionStoreSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/server/recovery/SessionStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/SessionStoreSpec.scala
new file mode 100644
index 0000000..5eeb2cf
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/server/recovery/SessionStoreSpec.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server.recovery
+
+import scala.util.Success
+
+import org.mockito.Mockito._
+import org.scalatest.FunSpec
+import org.scalatest.Matchers._
+import org.scalatest.mock.MockitoSugar.mock
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+import org.apache.livy.sessions.Session.RecoveryMetadata
+
+class SessionStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
+ describe("SessionStore") {
+ case class TestRecoveryMetadata(id: Int) extends RecoveryMetadata
+
+ val sessionType = "test"
+ val sessionPath = s"v1/$sessionType"
+ val sessionManagerPath = s"v1/$sessionType/state"
+
+ val conf = new LivyConf()
+ it("should set session state and session counter when saving a session.") {
+ val stateStore = mock[StateStore]
+ val sessionStore = new SessionStore(conf, stateStore)
+
+ val m = TestRecoveryMetadata(99)
+ sessionStore.save(sessionType, m)
+ verify(stateStore).set(s"$sessionPath/99", m)
+ }
+
+ it("should return existing sessions") {
+ val validMetadata = Map(
+ "0" -> Some(TestRecoveryMetadata(0)),
+ "5" -> None,
+ "77" -> Some(TestRecoveryMetadata(77)))
+ val corruptedMetadata = Map(
+ "7" -> new RuntimeException("Test"),
+ "11212" -> new RuntimeException("Test")
+ )
+ val stateStore = mock[StateStore]
+ val sessionStore = new SessionStore(conf, stateStore)
+ when(stateStore.getChildren(sessionPath))
+ .thenReturn((validMetadata ++ corruptedMetadata).keys.toList)
+
+ validMetadata.foreach { case (id, m) =>
+ when(stateStore.get[TestRecoveryMetadata](s"$sessionPath/$id")).thenReturn(m)
+ }
+
+ corruptedMetadata.foreach { case (id, ex) =>
+ when(stateStore.get[TestRecoveryMetadata](s"$sessionPath/$id")).thenThrow(ex)
+ }
+
+ val s = sessionStore.getAllSessions[TestRecoveryMetadata](sessionType)
+ // Verify normal metadata are retrieved.
+ s.filter(_.isSuccess) should contain theSameElementsAs
+ validMetadata.values.filter(_.isDefined).map(m => Success(m.get))
+ // Verify exceptions are wrapped as in Try and are returned.
+ s.filter(_.isFailure) should have size corruptedMetadata.size
+ }
+
+ it("should not throw if the state store is empty") {
+ val stateStore = mock[StateStore]
+ val sessionStore = new SessionStore(conf, stateStore)
+ when(stateStore.getChildren(sessionPath)).thenReturn(Seq.empty)
+
+ val s = sessionStore.getAllSessions[TestRecoveryMetadata](sessionType)
+ s.filter(_.isSuccess) shouldBe empty
+ }
+
+ it("should return correct next session id") {
+ val stateStore = mock[StateStore]
+ val sessionStore = new SessionStore(conf, stateStore)
+
+ when(stateStore.get[SessionManagerState](sessionManagerPath)).thenReturn(None)
+ sessionStore.getNextSessionId(sessionType) shouldBe 0
+
+ val sms = SessionManagerState(100)
+ when(stateStore.get[SessionManagerState](sessionManagerPath)).thenReturn(Some(sms))
+ sessionStore.getNextSessionId(sessionType) shouldBe sms.nextSessionId
+ }
+
+ it("should remove session") {
+ val stateStore = mock[StateStore]
+ val sessionStore = new SessionStore(conf, stateStore)
+ val id = 1
+
+ sessionStore.remove(sessionType, 1)
+ verify(stateStore).remove(s"$sessionPath/$id")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/org/apache/livy/server/recovery/StateStoreSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/server/recovery/StateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/StateStoreSpec.scala
new file mode 100644
index 0000000..c7040a5
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/server/recovery/StateStoreSpec.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server.recovery
+
+import scala.reflect.classTag
+
+import org.scalatest.{BeforeAndAfter, FunSpec}
+import org.scalatest.Matchers._
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+import org.apache.livy.sessions.SessionManager
+
+class StateStoreSpec extends FunSpec with BeforeAndAfter with LivyBaseUnitTestSuite {
+ describe("StateStore") {
+ after {
+ StateStore.cleanup()
+ }
+
+ def createConf(stateStore: String): LivyConf = {
+ val conf = new LivyConf()
+ conf.set(LivyConf.RECOVERY_MODE.key, SessionManager.SESSION_RECOVERY_MODE_RECOVERY)
+ conf.set(LivyConf.RECOVERY_STATE_STORE.key, stateStore)
+ conf
+ }
+
+ it("should throw an error on get if it's not initialized") {
+ intercept[AssertionError] { StateStore.get }
+ }
+
+ it("should initialize blackhole state store if recovery is disabled") {
+ StateStore.init(new LivyConf())
+ StateStore.get shouldBe a[BlackholeStateStore]
+ }
+
+ it("should pick the correct store according to state store config") {
+ StateStore.pickStateStore(createConf("filesystem")) shouldBe classOf[FileSystemStateStore]
+ StateStore.pickStateStore(createConf("zookeeper")) shouldBe classOf[ZooKeeperStateStore]
+ }
+
+ it("should return error if an unknown recovery mode is set") {
+ val conf = new LivyConf()
+ conf.set(LivyConf.RECOVERY_MODE.key, "unknown")
+ intercept[IllegalArgumentException] { StateStore.init(conf) }
+ }
+
+ it("should return error if an unknown state store is set") {
+ intercept[IllegalArgumentException] { StateStore.init(createConf("unknown")) }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala
new file mode 100644
index 0000000..88e530f
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server.recovery
+
+import scala.collection.JavaConverters._
+
+import org.apache.curator.framework.CuratorFramework
+import org.apache.curator.framework.api._
+import org.apache.curator.framework.listen.Listenable
+import org.apache.zookeeper.data.Stat
+import org.mockito.Mockito._
+import org.scalatest.FunSpec
+import org.scalatest.Matchers._
+import org.scalatest.mock.MockitoSugar.mock
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+
+class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
+ describe("ZooKeeperStateStore") {
+ case class TestFixture(stateStore: ZooKeeperStateStore, curatorClient: CuratorFramework)
+ val conf = new LivyConf()
+ conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "host")
+ val key = "key"
+ val prefixedKey = s"/livy/$key"
+
+ def withMock[R](testBody: TestFixture => R): R = {
+ val curatorClient = mock[CuratorFramework]
+ when(curatorClient.getUnhandledErrorListenable())
+ .thenReturn(mock[Listenable[UnhandledErrorListener]])
+ val stateStore = new ZooKeeperStateStore(conf, Some(curatorClient))
+ testBody(TestFixture(stateStore, curatorClient))
+ }
+
+ def mockExistsBuilder(curatorClient: CuratorFramework, exists: Boolean): Unit = {
+ val existsBuilder = mock[ExistsBuilder]
+ when(curatorClient.checkExists()).thenReturn(existsBuilder)
+ if (exists) {
+ when(existsBuilder.forPath(prefixedKey)).thenReturn(mock[Stat])
+ }
+ }
+
+ it("should throw on bad config") {
+ withMock { f =>
+ val conf = new LivyConf()
+ intercept[IllegalArgumentException] { new ZooKeeperStateStore(conf) }
+
+ conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "host")
+ conf.set(ZooKeeperStateStore.ZK_RETRY_CONF, "bad")
+ intercept[IllegalArgumentException] { new ZooKeeperStateStore(conf) }
+ }
+ }
+
+ it("set should use curatorClient") {
+ withMock { f =>
+ mockExistsBuilder(f.curatorClient, true)
+
+ val setDataBuilder = mock[SetDataBuilder]
+ when(f.curatorClient.setData()).thenReturn(setDataBuilder)
+
+ f.stateStore.set("key", 1.asInstanceOf[Object])
+
+ verify(f.curatorClient).start()
+ verify(setDataBuilder).forPath(prefixedKey, Array[Byte](49))
+ }
+ }
+
+ it("set should create parents if they don't exist") {
+ withMock { f =>
+ mockExistsBuilder(f.curatorClient, false)
+
+ val createBuilder = mock[CreateBuilder]
+ when(f.curatorClient.create()).thenReturn(createBuilder)
+ val p = mock[ProtectACLCreateModePathAndBytesable[String]]
+ when(createBuilder.creatingParentsIfNeeded()).thenReturn(p)
+
+ f.stateStore.set("key", 1.asInstanceOf[Object])
+
+ verify(f.curatorClient).start()
+ verify(p).forPath(prefixedKey, Array[Byte](49))
+ }
+ }
+
+ it("get should retrieve retry policy configs") {
+ conf.set(org.apache.livy.server.recovery.ZooKeeperStateStore.ZK_RETRY_CONF, "11,77")
+ withMock { f =>
+ mockExistsBuilder(f.curatorClient, true)
+
+ f.stateStore.retryPolicy should not be null
+ f.stateStore.retryPolicy.getN shouldBe 11
+ }
+ }
+
+ it("get should retrieve data from curatorClient") {
+ withMock { f =>
+ mockExistsBuilder(f.curatorClient, true)
+
+ val getDataBuilder = mock[GetDataBuilder]
+ when(f.curatorClient.getData()).thenReturn(getDataBuilder)
+ when(getDataBuilder.forPath(prefixedKey)).thenReturn(Array[Byte](50))
+
+ val v = f.stateStore.get[Int]("key")
+
+ verify(f.curatorClient).start()
+ v shouldBe Some(2)
+ }
+ }
+
+ it("get should return None if key doesn't exist") {
+ withMock { f =>
+ mockExistsBuilder(f.curatorClient, false)
+
+ val v = f.stateStore.get[Int]("key")
+
+ verify(f.curatorClient).start()
+ v shouldBe None
+ }
+ }
+
+ it("getChildren should use curatorClient") {
+ withMock { f =>
+ mockExistsBuilder(f.curatorClient, true)
+
+ val getChildrenBuilder = mock[GetChildrenBuilder]
+ when(f.curatorClient.getChildren()).thenReturn(getChildrenBuilder)
+ val children = List("abc", "def")
+ when(getChildrenBuilder.forPath(prefixedKey)).thenReturn(children.asJava)
+
+ val c = f.stateStore.getChildren("key")
+
+ verify(f.curatorClient).start()
+ c shouldBe children
+ }
+ }
+
+ it("getChildren should return empty list if key doesn't exist") {
+ withMock { f =>
+ mockExistsBuilder(f.curatorClient, false)
+
+ val c = f.stateStore.getChildren("key")
+
+ verify(f.curatorClient).start()
+ c shouldBe empty
+ }
+ }
+
+ it("remove should use curatorClient") {
+ withMock { f =>
+ val deleteBuilder = mock[DeleteBuilder]
+ when(f.curatorClient.delete()).thenReturn(deleteBuilder)
+ val g = mock[ChildrenDeletable]
+ when(deleteBuilder.guaranteed()).thenReturn(g)
+
+ f.stateStore.remove(key)
+
+ verify(g).forPath(prefixedKey)
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/sessions/MockSession.scala b/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
new file mode 100644
index 0000000..3cfbe46
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.sessions
+
+import org.apache.livy.LivyConf
+
+class MockSession(id: Int, owner: String, conf: LivyConf) extends Session(id, owner, conf) {
+ case class RecoveryMetadata(id: Int) extends Session.RecoveryMetadata()
+
+ override val proxyUser = None
+
+ override protected def stopSession(): Unit = ()
+
+ override def logLines(): IndexedSeq[String] = IndexedSeq()
+
+ override def state: SessionState = SessionState.Idle()
+
+ override def recoveryMetadata: RecoveryMetadata = RecoveryMetadata(0)
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
new file mode 100644
index 0000000..beffa71
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.sessions
+
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.{Failure, Try}
+
+import org.mockito.Mockito.{doReturn, never, verify, when}
+import org.scalatest.{FunSpec, Matchers}
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.mock.MockitoSugar.mock
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+import org.apache.livy.server.batch.{BatchRecoveryMetadata, BatchSession}
+import org.apache.livy.server.interactive.InteractiveSession
+import org.apache.livy.server.recovery.SessionStore
+import org.apache.livy.sessions.Session.RecoveryMetadata
+
+class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuite {
+ implicit def executor: ExecutionContext = ExecutionContext.global
+
+ describe("SessionManager") {
+ it("should garbage collect old sessions") {
+ val livyConf = new LivyConf()
+ livyConf.set(LivyConf.SESSION_TIMEOUT, "100ms")
+ val manager = new SessionManager[MockSession, RecoveryMetadata](
+ livyConf,
+ { _ => assert(false).asInstanceOf[MockSession] },
+ mock[SessionStore],
+ "test",
+ Some(Seq.empty))
+ val session = manager.register(new MockSession(manager.nextId(), null, livyConf))
+ manager.get(session.id).isDefined should be(true)
+ eventually(timeout(5 seconds), interval(100 millis)) {
+ Await.result(manager.collectGarbage(), Duration.Inf)
+ manager.get(session.id) should be(None)
+ }
+ }
+
+ it("batch session should not be gc-ed until application is finished") {
+ val sessionId = 24
+ val session = mock[BatchSession]
+ when(session.id).thenReturn(sessionId)
+ when(session.stop()).thenReturn(Future {})
+ when(session.lastActivity).thenReturn(System.nanoTime())
+
+ val conf = new LivyConf().set(LivyConf.SESSION_STATE_RETAIN_TIME, "1s")
+ val sm = new BatchSessionManager(conf, mock[SessionStore], Some(Seq(session)))
+ testSessionGC(session, sm)
+ }
+
+ it("interactive session should not gc-ed if session timeout check is off") {
+ val sessionId = 24
+ val session = mock[InteractiveSession]
+ when(session.id).thenReturn(sessionId)
+ when(session.stop()).thenReturn(Future {})
+ when(session.lastActivity).thenReturn(System.nanoTime())
+
+ val conf = new LivyConf().set(LivyConf.SESSION_TIMEOUT_CHECK, false)
+ .set(LivyConf.SESSION_STATE_RETAIN_TIME, "1s")
+ val sm = new InteractiveSessionManager(conf, mock[SessionStore], Some(Seq(session)))
+ testSessionGC(session, sm)
+ }
+
+ def testSessionGC(session: Session, sm: SessionManager[_, _]): Unit = {
+
+ def changeStateAndCheck(s: SessionState)(fn: SessionManager[_, _] => Unit): Unit = {
+ doReturn(s).when(session).state
+ Await.result(sm.collectGarbage(), Duration.Inf)
+ fn(sm)
+ }
+
+ // Batch session should not be gc-ed when alive
+ for (s <- Seq(SessionState.Running(),
+ SessionState.Idle(),
+ SessionState.Recovering(),
+ SessionState.NotStarted(),
+ SessionState.Busy(),
+ SessionState.ShuttingDown())) {
+ changeStateAndCheck(s) { sm => sm.get(session.id) should be (Some(session)) }
+ }
+
+ // Stopped session should be gc-ed after retained timeout
+ for (s <- Seq(SessionState.Error(),
+ SessionState.Success(),
+ SessionState.Dead())) {
+ eventually(timeout(30 seconds), interval(100 millis)) {
+ changeStateAndCheck(s) { sm => sm.get(session.id) should be (None) }
+ }
+ }
+ }
+ }
+
+ describe("BatchSessionManager") {
+ implicit def executor: ExecutionContext = ExecutionContext.global
+
+ def makeMetadata(id: Int, appTag: String): BatchRecoveryMetadata = {
+ BatchRecoveryMetadata(id, None, appTag, null, None)
+ }
+
+ def mockSession(id: Int): BatchSession = {
+ val session = mock[BatchSession]
+ when(session.id).thenReturn(id)
+ when(session.stop()).thenReturn(Future {})
+ when(session.lastActivity).thenReturn(System.nanoTime())
+
+ session
+ }
+
+ it("should not fail if state store is empty") {
+ val conf = new LivyConf()
+
+ val sessionStore = mock[SessionStore]
+ when(sessionStore.getAllSessions[BatchRecoveryMetadata]("batch"))
+ .thenReturn(Seq.empty)
+
+ val sm = new BatchSessionManager(conf, sessionStore)
+ sm.nextId() shouldBe 0
+ }
+
+ it("should recover sessions from state store") {
+ val conf = new LivyConf()
+ conf.set(LivyConf.LIVY_SPARK_MASTER.key, "yarn-cluster")
+
+ val sessionType = "batch"
+ val nextId = 99
+
+ val validMetadata = List(makeMetadata(0, "t1"), makeMetadata(77, "t2")).map(Try(_))
+ val invalidMetadata = List(Failure(new Exception("Fake invalid metadata")))
+ val sessionStore = mock[SessionStore]
+ when(sessionStore.getNextSessionId(sessionType)).thenReturn(nextId)
+ when(sessionStore.getAllSessions[BatchRecoveryMetadata](sessionType))
+ .thenReturn(validMetadata ++ invalidMetadata)
+
+ val sm = new BatchSessionManager(conf, sessionStore)
+ sm.nextId() shouldBe nextId
+ validMetadata.foreach { m =>
+ sm.get(m.get.id) shouldBe defined
+ }
+ sm.size shouldBe validMetadata.size
+ }
+
+ it("should delete sessions from state store") {
+ val conf = new LivyConf()
+
+ val sessionType = "batch"
+ val sessionId = 24
+ val sessionStore = mock[SessionStore]
+ val session = mockSession(sessionId)
+
+ val sm = new BatchSessionManager(conf, sessionStore, Some(Seq(session)))
+ sm.get(sessionId) shouldBe defined
+
+ Await.ready(sm.delete(sessionId).get, 30 seconds)
+
+ verify(sessionStore).remove(sessionType, sessionId)
+ sm.get(sessionId) shouldBe None
+ }
+
+ it("should delete sessions on shutdown when recovery is off") {
+ val conf = new LivyConf()
+ val sessionId = 24
+ val sessionStore = mock[SessionStore]
+ val session = mockSession(sessionId)
+
+ val sm = new BatchSessionManager(conf, sessionStore, Some(Seq(session)))
+ sm.get(sessionId) shouldBe defined
+ sm.shutdown()
+
+ verify(session).stop()
+ }
+
+ it("should not delete sessions on shutdown with recovery is on") {
+ val conf = new LivyConf()
+ conf.set(LivyConf.RECOVERY_MODE, SessionManager.SESSION_RECOVERY_MODE_RECOVERY)
+
+ val sessionId = 24
+ val sessionStore = mock[SessionStore]
+ val session = mockSession(sessionId)
+
+ val sm = new BatchSessionManager(conf, sessionStore, Some(Seq(session)))
+ sm.get(sessionId) shouldBe defined
+ sm.shutdown()
+
+ verify(session, never).stop()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/org/apache/livy/sessions/SessionSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/sessions/SessionSpec.scala b/server/src/test/scala/org/apache/livy/sessions/SessionSpec.scala
new file mode 100644
index 0000000..a7bfaaa
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/sessions/SessionSpec.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.sessions
+
+import java.net.URI
+
+import org.scalatest.FunSuite
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+
+class SessionSpec extends FunSuite with LivyBaseUnitTestSuite {
+
+ test("use default fs in paths") {
+ val conf = new LivyConf(false)
+ conf.hadoopConf.set("fs.defaultFS", "dummy:///")
+
+ val uris = Seq("http://example.com/foo", "hdfs:/bar", "/baz")
+ val expected = Seq(uris(0), uris(1), "dummy:///baz")
+ assert(Session.resolveURIs(uris, conf) === expected)
+
+ intercept[IllegalArgumentException] {
+ Session.resolveURI(new URI("relative_path"), conf)
+ }
+ }
+
+ test("local fs whitelist") {
+ val conf = new LivyConf(false)
+ conf.set(LivyConf.LOCAL_FS_WHITELIST, "/allowed/,/also_allowed")
+
+ Seq("/allowed/file", "/also_allowed/file").foreach { path =>
+ assert(Session.resolveURI(new URI(path), conf) === new URI("file://" + path))
+ }
+
+ Seq("/not_allowed", "/allowed_not_really").foreach { path =>
+ intercept[IllegalArgumentException] {
+ Session.resolveURI(new URI(path), conf)
+ }
+ }
+ }
+
+ test("conf validation and preparation") {
+ val conf = new LivyConf(false)
+ conf.hadoopConf.set("fs.defaultFS", "dummy:///")
+ conf.set(LivyConf.LOCAL_FS_WHITELIST, "/allowed")
+
+ // Test baseline.
+ assert(Session.prepareConf(Map(), Nil, Nil, Nil, Nil, conf) === Map("spark.master" -> "local"))
+
+ // Test validations.
+ intercept[IllegalArgumentException] {
+ Session.prepareConf(Map("spark.do_not_set" -> "1"), Nil, Nil, Nil, Nil, conf)
+ }
+ conf.sparkFileLists.foreach { key =>
+ intercept[IllegalArgumentException] {
+ Session.prepareConf(Map(key -> "file:/not_allowed"), Nil, Nil, Nil, Nil, conf)
+ }
+ }
+ intercept[IllegalArgumentException] {
+ Session.prepareConf(Map(), Seq("file:/not_allowed"), Nil, Nil, Nil, conf)
+ }
+ intercept[IllegalArgumentException] {
+ Session.prepareConf(Map(), Nil, Seq("file:/not_allowed"), Nil, Nil, conf)
+ }
+ intercept[IllegalArgumentException] {
+ Session.prepareConf(Map(), Nil, Nil, Seq("file:/not_allowed"), Nil, conf)
+ }
+ intercept[IllegalArgumentException] {
+ Session.prepareConf(Map(), Nil, Nil, Nil, Seq("file:/not_allowed"), conf)
+ }
+
+ // Test that file lists are merged and resolved.
+ val base = "/file1.txt"
+ val other = Seq("/file2.txt")
+ val expected = Some(Seq("dummy://" + other(0), "dummy://" + base).mkString(","))
+
+ val userLists = Seq(LivyConf.SPARK_JARS, LivyConf.SPARK_FILES, LivyConf.SPARK_ARCHIVES,
+ LivyConf.SPARK_PY_FILES)
+ val baseConf = userLists.map { key => (key -> base) }.toMap
+ val result = Session.prepareConf(baseConf, other, other, other, other, conf)
+ userLists.foreach { key => assert(result.get(key) === expected) }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala b/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala
new file mode 100644
index 0000000..eb5f30a
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.utils
+
+import org.scalatest.FunSuite
+import org.scalatest.Matchers
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+import org.apache.livy.LivyConf._
+import org.apache.livy.server.LivyServer
+
+class LivySparkUtilsSuite extends FunSuite with Matchers with LivyBaseUnitTestSuite {
+
+ import LivySparkUtils._
+
+ private val livyConf = new LivyConf()
+ private val livyConf210 = new LivyConf()
+ livyConf210.set(LIVY_SPARK_SCALA_VERSION, "2.10.6")
+
+ private val livyConf211 = new LivyConf()
+ livyConf211.set(LIVY_SPARK_SCALA_VERSION, "2.11.1")
+
+ test("check for SPARK_HOME") {
+ testSparkHome(livyConf)
+ }
+
+ test("check spark-submit version") {
+ testSparkSubmit(livyConf)
+ }
+
+ test("should support Spark 1.6") {
+ testSparkVersion("1.6.0")
+ testSparkVersion("1.6.1")
+ testSparkVersion("1.6.1-SNAPSHOT")
+ testSparkVersion("1.6.2")
+ testSparkVersion("1.6")
+ testSparkVersion("1.6.3.2.5.0-12")
+ }
+
+ test("should support Spark 2.0.x") {
+ testSparkVersion("2.0.0")
+ testSparkVersion("2.0.1")
+ testSparkVersion("2.0.2")
+ testSparkVersion("2.0.3-SNAPSHOT")
+ testSparkVersion("2.0.0.2.5.1.0-56") // LIVY-229
+ testSparkVersion("2.0")
+ testSparkVersion("2.1.0")
+ testSparkVersion("2.1.1")
+ }
+
+ test("should not support Spark older than 1.6") {
+ intercept[IllegalArgumentException] { testSparkVersion("1.4.0") }
+ intercept[IllegalArgumentException] { testSparkVersion("1.5.0") }
+ intercept[IllegalArgumentException] { testSparkVersion("1.5.1") }
+ intercept[IllegalArgumentException] { testSparkVersion("1.5.2") }
+ intercept[IllegalArgumentException] { testSparkVersion("1.5.0-cdh5.6.1") }
+ }
+
+ test("should fail on bad version") {
+ intercept[IllegalArgumentException] { testSparkVersion("not a version") }
+ }
+
+ test("should error out if recovery is turned on but master isn't yarn") {
+ val livyConf = new LivyConf()
+ livyConf.set(LivyConf.LIVY_SPARK_MASTER, "local")
+ livyConf.set(LivyConf.RECOVERY_MODE, "recovery")
+ val s = new LivyServer()
+ intercept[IllegalArgumentException] { s.testRecovery(livyConf) }
+ }
+
+ test("formatScalaVersion() should format Scala version") {
+ formatScalaVersion("2.10.8") shouldBe "2.10"
+ formatScalaVersion("2.11.4") shouldBe "2.11"
+ formatScalaVersion("2.10") shouldBe "2.10"
+ formatScalaVersion("2.10.x.x.x.x") shouldBe "2.10"
+
+ // Throw exception for bad Scala version.
+ intercept[IllegalArgumentException] { formatScalaVersion("") }
+ intercept[IllegalArgumentException] { formatScalaVersion("xxx") }
+ }
+
+ test("defaultSparkScalaVersion() should return default Scala version") {
+ defaultSparkScalaVersion(formatSparkVersion("1.6.0")) shouldBe "2.10"
+ defaultSparkScalaVersion(formatSparkVersion("1.6.1")) shouldBe "2.10"
+ defaultSparkScalaVersion(formatSparkVersion("1.6.2")) shouldBe "2.10"
+ defaultSparkScalaVersion(formatSparkVersion("2.0.0")) shouldBe "2.11"
+ defaultSparkScalaVersion(formatSparkVersion("2.0.1")) shouldBe "2.11"
+
+ // Throw exception for unsupported Spark version.
+ intercept[IllegalArgumentException] { defaultSparkScalaVersion(formatSparkVersion("1.5.0")) }
+ }
+
+ test("sparkScalaVersion() should use spark-submit detected Scala version.") {
+ sparkScalaVersion(formatSparkVersion("2.0.1"), Some("2.10"), livyConf) shouldBe "2.10"
+ sparkScalaVersion(formatSparkVersion("1.6.0"), Some("2.11"), livyConf) shouldBe "2.11"
+ }
+
+ test("sparkScalaVersion() should throw if configured and detected Scala version mismatch.") {
+ intercept[IllegalArgumentException] {
+ sparkScalaVersion(formatSparkVersion("2.0.1"), Some("2.11"), livyConf210)
+ }
+ intercept[IllegalArgumentException] {
+ sparkScalaVersion(formatSparkVersion("1.6.1"), Some("2.10"), livyConf211)
+ }
+ }
+
+ test("sparkScalaVersion() should use configured Scala version if spark-submit doesn't tell.") {
+ sparkScalaVersion(formatSparkVersion("1.6.0"), None, livyConf210) shouldBe "2.10"
+ sparkScalaVersion(formatSparkVersion("1.6.2"), None, livyConf210) shouldBe "2.10"
+ sparkScalaVersion(formatSparkVersion("2.0.0"), None, livyConf210) shouldBe "2.10"
+ sparkScalaVersion(formatSparkVersion("2.0.1"), None, livyConf210) shouldBe "2.10"
+ sparkScalaVersion(formatSparkVersion("1.6.0"), None, livyConf211) shouldBe "2.11"
+ sparkScalaVersion(formatSparkVersion("1.6.2"), None, livyConf211) shouldBe "2.11"
+ sparkScalaVersion(formatSparkVersion("2.0.0"), None, livyConf211) shouldBe "2.11"
+ sparkScalaVersion(formatSparkVersion("2.0.1"), None, livyConf211) shouldBe "2.11"
+ }
+
+ test("sparkScalaVersion() should use default Spark Scala version.") {
+ sparkScalaVersion(formatSparkVersion("1.6.0"), None, livyConf) shouldBe "2.10"
+ sparkScalaVersion(formatSparkVersion("1.6.2"), None, livyConf) shouldBe "2.10"
+ sparkScalaVersion(formatSparkVersion("2.0.0"), None, livyConf) shouldBe "2.11"
+ sparkScalaVersion(formatSparkVersion("2.0.1"), None, livyConf) shouldBe "2.11"
+ sparkScalaVersion(formatSparkVersion("2.1.0"), None, livyConf) shouldBe "2.11"
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
new file mode 100644
index 0000000..b3c50da
--- /dev/null
+++ b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.livy.utils
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus.UNDEFINED
+import org.apache.hadoop.yarn.api.records.YarnApplicationState._
+import org.apache.hadoop.yarn.client.api.YarnClient
+import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.FunSpec
+import org.scalatest.mock.MockitoSugar.mock
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+import org.apache.livy.utils.SparkApp._
+
+class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
+ private def cleanupThread(t: Thread)(f: => Unit) = {
+ try { f } finally { t.interrupt() }
+ }
+
+ private def mockSleep(ms: Long) = {
+ Thread.`yield`()
+ }
+
+ describe("SparkYarnApp") {
+ val TEST_TIMEOUT = 30 seconds
+ val appId = ConverterUtils.toApplicationId("application_1467912463905_0021")
+ val appIdOption = Some(appId.toString)
+ val appTag = "fakeTag"
+ val livyConf = new LivyConf()
+ livyConf.set(LivyConf.YARN_APP_LOOKUP_TIMEOUT, "30s")
+
+ it("should poll YARN state and terminate") {
+ Clock.withSleepMethod(mockSleep) {
+ val mockYarnClient = mock[YarnClient]
+ val mockAppListener = mock[SparkAppListener]
+
+ val mockAppReport = mock[ApplicationReport]
+ when(mockAppReport.getApplicationId).thenReturn(appId)
+ when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
+ // Simulate YARN app state progression.
+ when(mockAppReport.getYarnApplicationState).thenAnswer(new Answer[YarnApplicationState]() {
+ private var stateSeq = List(ACCEPTED, RUNNING, FINISHED)
+
+ override def answer(invocation: InvocationOnMock): YarnApplicationState = {
+ val currentState = stateSeq.head
+ if (stateSeq.tail.nonEmpty) {
+ stateSeq = stateSeq.tail
+ }
+ currentState
+ }
+ })
+ when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
+
+ val app = new SparkYarnApp(
+ appTag,
+ appIdOption,
+ None,
+ Some(mockAppListener),
+ livyConf,
+ mockYarnClient)
+ cleanupThread(app.yarnAppMonitorThread) {
+ app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
+ assert(!app.yarnAppMonitorThread.isAlive,
+ "YarnAppMonitorThread should terminate after YARN app is finished.")
+ verify(mockYarnClient, atLeast(1)).getApplicationReport(appId)
+ verify(mockAppListener).stateChanged(State.STARTING, State.RUNNING)
+ verify(mockAppListener).stateChanged(State.RUNNING, State.FINISHED)
+ }
+ }
+ }
+
+ it("should kill yarn app") {
+ Clock.withSleepMethod(mockSleep) {
+ val diag = "DIAG"
+ val mockYarnClient = mock[YarnClient]
+
+ val mockAppReport = mock[ApplicationReport]
+ when(mockAppReport.getApplicationId).thenReturn(appId)
+ when(mockAppReport.getDiagnostics).thenReturn(diag)
+ when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
+
+ var appKilled = false
+ when(mockAppReport.getYarnApplicationState).thenAnswer(new Answer[YarnApplicationState]() {
+ override def answer(invocation: InvocationOnMock): YarnApplicationState = {
+ if (!appKilled) {
+ RUNNING
+ } else {
+ KILLED
+ }
+ }
+ })
+ when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
+
+ val app = new SparkYarnApp(appTag, appIdOption, None, None, livyConf, mockYarnClient)
+ cleanupThread(app.yarnAppMonitorThread) {
+ app.kill()
+ appKilled = true
+
+ app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
+ assert(!app.yarnAppMonitorThread.isAlive,
+ "YarnAppMonitorThread should terminate after YARN app is finished.")
+ verify(mockYarnClient, atLeast(1)).getApplicationReport(appId)
+ verify(mockYarnClient).killApplication(appId)
+ assert(app.log().mkString.contains(diag))
+ }
+ }
+ }
+
+ it("should return spark-submit log") {
+ Clock.withSleepMethod(mockSleep) {
+ val mockYarnClient = mock[YarnClient]
+ val mockSparkSubmit = mock[LineBufferedProcess]
+ val sparkSubmitInfoLog = IndexedSeq("SPARK-SUBMIT", "LOG")
+ val sparkSubmitErrorLog = IndexedSeq("SPARK-SUBMIT", "error log")
+ val sparkSubmitLog = ("stdout: " +: sparkSubmitInfoLog) ++
+ ("\nstderr: " +: sparkSubmitErrorLog) :+ "\nYARN Diagnostics: "
+ when(mockSparkSubmit.inputLines).thenReturn(sparkSubmitInfoLog)
+ when(mockSparkSubmit.errorLines).thenReturn(sparkSubmitErrorLog)
+ val waitForCalledLatch = new CountDownLatch(1)
+ when(mockSparkSubmit.waitFor()).thenAnswer(new Answer[Int]() {
+ override def answer(invocation: InvocationOnMock): Int = {
+ waitForCalledLatch.countDown()
+ 0
+ }
+ })
+
+ val mockAppReport = mock[ApplicationReport]
+ when(mockAppReport.getApplicationId).thenReturn(appId)
+ when(mockAppReport.getYarnApplicationState).thenReturn(YarnApplicationState.FINISHED)
+ when(mockAppReport.getDiagnostics).thenReturn(null)
+ when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
+
+ val app = new SparkYarnApp(
+ appTag,
+ appIdOption,
+ Some(mockSparkSubmit),
+ None,
+ livyConf,
+ mockYarnClient)
+ cleanupThread(app.yarnAppMonitorThread) {
+ waitForCalledLatch.await(TEST_TIMEOUT.toMillis, TimeUnit.MILLISECONDS)
+ assert(app.log() == sparkSubmitLog, "Expect spark-submit log")
+ }
+ }
+ }
+
+ it("can kill spark-submit while it's running") {
+ Clock.withSleepMethod(mockSleep) {
+ val livyConf = new LivyConf()
+ livyConf.set(LivyConf.YARN_APP_LOOKUP_TIMEOUT, "0")
+
+ val mockYarnClient = mock[YarnClient]
+ val mockSparkSubmit = mock[LineBufferedProcess]
+
+ val sparkSubmitRunningLatch = new CountDownLatch(1)
+ // Simulate a running spark-submit
+ when(mockSparkSubmit.waitFor()).thenAnswer(new Answer[Int]() {
+ override def answer(invocation: InvocationOnMock): Int = {
+ sparkSubmitRunningLatch.await()
+ 0
+ }
+ })
+
+ val app = new SparkYarnApp(
+ appTag,
+ appIdOption,
+ Some(mockSparkSubmit),
+ None,
+ livyConf,
+ mockYarnClient)
+ cleanupThread(app.yarnAppMonitorThread) {
+ app.kill()
+ verify(mockSparkSubmit, times(1)).destroy()
+ sparkSubmitRunningLatch.countDown()
+ }
+ }
+ }
+
+ it("should map YARN state to SparkApp.State correctly") {
+ val app = new SparkYarnApp(appTag, appIdOption, None, None, livyConf)
+ cleanupThread(app.yarnAppMonitorThread) {
+ assert(app.mapYarnState(appId, NEW, UNDEFINED) == State.STARTING)
+ assert(app.mapYarnState(appId, NEW_SAVING, UNDEFINED) == State.STARTING)
+ assert(app.mapYarnState(appId, SUBMITTED, UNDEFINED) == State.STARTING)
+ assert(app.mapYarnState(appId, ACCEPTED, UNDEFINED) == State.STARTING)
+ assert(app.mapYarnState(appId, RUNNING, UNDEFINED) == State.RUNNING)
+ assert(
+ app.mapYarnState(appId, FINISHED, FinalApplicationStatus.SUCCEEDED) == State.FINISHED)
+ assert(app.mapYarnState(appId, FINISHED, FinalApplicationStatus.FAILED) == State.FAILED)
+ assert(app.mapYarnState(appId, FINISHED, FinalApplicationStatus.KILLED) == State.KILLED)
+ assert(app.mapYarnState(appId, FINISHED, UNDEFINED) == State.FAILED)
+ assert(app.mapYarnState(appId, FAILED, UNDEFINED) == State.FAILED)
+ assert(app.mapYarnState(appId, KILLED, UNDEFINED) == State.KILLED)
+ }
+ }
+
+ it("should expose driver log url and Spark UI url") {
+ Clock.withSleepMethod(mockSleep) {
+ val mockYarnClient = mock[YarnClient]
+ val driverLogUrl = "DRIVER LOG URL"
+ val sparkUiUrl = "SPARK UI URL"
+
+ val mockApplicationAttemptId = mock[ApplicationAttemptId]
+ val mockAppReport = mock[ApplicationReport]
+ when(mockAppReport.getApplicationId).thenReturn(appId)
+ when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
+ when(mockAppReport.getTrackingUrl).thenReturn(sparkUiUrl)
+ when(mockAppReport.getCurrentApplicationAttemptId).thenReturn(mockApplicationAttemptId)
+ var done = false
+ when(mockAppReport.getYarnApplicationState).thenAnswer(new Answer[YarnApplicationState]() {
+ override def answer(invocation: InvocationOnMock): YarnApplicationState = {
+ if (!done) {
+ RUNNING
+ } else {
+ FINISHED
+ }
+ }
+ })
+ when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
+
+ val mockAttemptReport = mock[ApplicationAttemptReport]
+ val mockContainerId = mock[ContainerId]
+ when(mockAttemptReport.getAMContainerId).thenReturn(mockContainerId)
+ when(mockYarnClient.getApplicationAttemptReport(mockApplicationAttemptId))
+ .thenReturn(mockAttemptReport)
+
+ val mockContainerReport = mock[ContainerReport]
+ when(mockYarnClient.getContainerReport(mockContainerId)).thenReturn(mockContainerReport)
+
+ // Block test until getLogUrl is called 10 times.
+ val getLogUrlCountDown = new CountDownLatch(10)
+ when(mockContainerReport.getLogUrl).thenAnswer(new Answer[String] {
+ override def answer(invocation: InvocationOnMock): String = {
+ getLogUrlCountDown.countDown()
+ driverLogUrl
+ }
+ })
+
+ val mockListener = mock[SparkAppListener]
+
+ val app = new SparkYarnApp(
+ appTag, appIdOption, None, Some(mockListener), livyConf, mockYarnClient)
+ cleanupThread(app.yarnAppMonitorThread) {
+ getLogUrlCountDown.await(TEST_TIMEOUT.length, TEST_TIMEOUT.unit)
+ done = true
+
+ app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
+ assert(!app.yarnAppMonitorThread.isAlive,
+ "YarnAppMonitorThread should terminate after YARN app is finished.")
+
+ verify(mockYarnClient, atLeast(1)).getApplicationReport(appId)
+ verify(mockAppReport, atLeast(1)).getTrackingUrl()
+ verify(mockContainerReport, atLeast(1)).getLogUrl()
+ verify(mockListener).appIdKnown(appId.toString)
+ verify(mockListener).infoChanged(AppInfo(Some(driverLogUrl), Some(sparkUiUrl)))
+ }
+ }
+ }
+
+ it("should not die on YARN-4411") {
+ Clock.withSleepMethod(mockSleep) {
+ val mockYarnClient = mock[YarnClient]
+
+ // Block test until getApplicationReport is called 10 times.
+ val pollCountDown = new CountDownLatch(10)
+ when(mockYarnClient.getApplicationReport(appId)).thenAnswer(new Answer[ApplicationReport] {
+ override def answer(invocation: InvocationOnMock): ApplicationReport = {
+ pollCountDown.countDown()
+ throw new IllegalArgumentException("No enum constant " +
+ "org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState.FINAL_SAVING")
+ }
+ })
+
+ val app = new SparkYarnApp(appTag, appIdOption, None, None, livyConf, mockYarnClient)
+ cleanupThread(app.yarnAppMonitorThread) {
+ pollCountDown.await(TEST_TIMEOUT.length, TEST_TIMEOUT.unit)
+ assert(app.state == SparkApp.State.STARTING)
+
+ app.state = SparkApp.State.FINISHED
+ app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
+ }
+ }
+ }
+
+ it("should not die on ApplicationAttemptNotFoundException") {
+ Clock.withSleepMethod(mockSleep) {
+ val mockYarnClient = mock[YarnClient]
+ val mockAppReport = mock[ApplicationReport]
+ val mockApplicationAttemptId = mock[ApplicationAttemptId]
+ var done = false
+
+ when(mockAppReport.getApplicationId).thenReturn(appId)
+ when(mockAppReport.getYarnApplicationState).thenAnswer(
+ new Answer[YarnApplicationState]() {
+ override def answer(invocation: InvocationOnMock): YarnApplicationState = {
+ if (done) {
+ FINISHED
+ } else {
+ RUNNING
+ }
+ }
+ })
+ when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
+ when(mockAppReport.getCurrentApplicationAttemptId).thenReturn(mockApplicationAttemptId)
+ when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
+
+ // Block test until getApplicationReport is called 10 times.
+ val pollCountDown = new CountDownLatch(10)
+ when(mockYarnClient.getApplicationAttemptReport(mockApplicationAttemptId)).thenAnswer(
+ new Answer[ApplicationReport] {
+ override def answer(invocation: InvocationOnMock): ApplicationReport = {
+ pollCountDown.countDown()
+ throw new ApplicationAttemptNotFoundException("unit test")
+ }
+ })
+
+ val app = new SparkYarnApp(appTag, appIdOption, None, None, livyConf, mockYarnClient)
+ cleanupThread(app.yarnAppMonitorThread) {
+ pollCountDown.await(TEST_TIMEOUT.length, TEST_TIMEOUT.unit)
+ assert(app.state == SparkApp.State.RUNNING)
+ done = true
+
+ app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/pom.xml
----------------------------------------------------------------------
diff --git a/test-lib/pom.xml b/test-lib/pom.xml
index 131e0e6..c380764 100644
--- a/test-lib/pom.xml
+++ b/test-lib/pom.xml
@@ -18,14 +18,14 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-main</artifactId>
- <version>0.4.0-SNAPSHOT</version>
+ <version>0.4.0-incubating-SNAPSHOT</version>
</parent>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-test-lib</artifactId>
- <version>0.4.0-SNAPSHOT</version>
+ <version>0.4.0-incubating-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
@@ -34,7 +34,7 @@
<dependencies>
<dependency>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/com/cloudera/livy/test/apps/FailingApp.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/com/cloudera/livy/test/apps/FailingApp.java b/test-lib/src/main/java/com/cloudera/livy/test/apps/FailingApp.java
deleted file mode 100644
index 51ba795..0000000
--- a/test-lib/src/main/java/com/cloudera/livy/test/apps/FailingApp.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.apps;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public class FailingApp {
-
- public static void main(String[] args) throws Exception {
- if (args.length != 1) {
- throw new IllegalArgumentException("Missing output path.");
- }
- String output = args[0];
-
- FileSystem fs = FileSystem.get(new Configuration());
- Path out = new Path(output);
- fs.create(out).close();
-
- throw new IllegalStateException("This app always fails.");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/com/cloudera/livy/test/apps/SimpleSparkApp.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/com/cloudera/livy/test/apps/SimpleSparkApp.java b/test-lib/src/main/java/com/cloudera/livy/test/apps/SimpleSparkApp.java
deleted file mode 100644
index 922066e..0000000
--- a/test-lib/src/main/java/com/cloudera/livy/test/apps/SimpleSparkApp.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.apps;
-
-import java.util.Arrays;
-import java.util.List;
-
-import scala.Tuple2;
-
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.PairFunction;
-
-public class SimpleSparkApp {
-
- public static void main(String[] args) throws Exception {
- if (args.length < 1 || args.length > 2) {
- throw new IllegalArgumentException(
- "Invalid arguments. <output path> [exit after output=true]>");
- }
-
- String output = args[0];
- Boolean exitAfterOutput = true;
- if (args.length == 2) {
- exitAfterOutput = Boolean.parseBoolean(args[1]);
- }
-
- JavaSparkContext sc = new JavaSparkContext();
- try {
- List<String> data = Arrays.asList("the", "quick", "brown", "fox", "jumped", "over", "the",
- "lazy", "dog");
-
- JavaPairRDD<String, Integer> rdd = sc.parallelize(data, 3)
- .mapToPair(new Counter());
- rdd.saveAsTextFile(output);
-
- if (!exitAfterOutput) {
- while (true) {
- Thread.sleep(60 * 60 * 1000);
- }
- }
- } finally {
- sc.close();
- }
- }
-
- private static class Counter implements PairFunction<String, String, Integer> {
-
- @Override
- public Tuple2<String, Integer> call(String s) throws Exception {
- return new Tuple2<>(s, s.length());
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/com/cloudera/livy/test/jobs/Echo.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/com/cloudera/livy/test/jobs/Echo.java b/test-lib/src/main/java/com/cloudera/livy/test/jobs/Echo.java
deleted file mode 100644
index 7ea3aa7..0000000
--- a/test-lib/src/main/java/com/cloudera/livy/test/jobs/Echo.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.jobs;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-
-public class Echo<T> implements Job<T> {
-
- private final T value;
-
- public Echo(T value) {
- this.value = value;
- }
-
- @Override
- public T call(JobContext jc) {
- return value;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/com/cloudera/livy/test/jobs/Failure.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/com/cloudera/livy/test/jobs/Failure.java b/test-lib/src/main/java/com/cloudera/livy/test/jobs/Failure.java
deleted file mode 100644
index 9232347..0000000
--- a/test-lib/src/main/java/com/cloudera/livy/test/jobs/Failure.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.jobs;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-
-public class Failure implements Job<Void> {
-
- @Override
- public Void call(JobContext jc) {
- throw new JobFailureException();
- }
-
- public static class JobFailureException extends RuntimeException {
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/com/cloudera/livy/test/jobs/FileReader.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/com/cloudera/livy/test/jobs/FileReader.java b/test-lib/src/main/java/com/cloudera/livy/test/jobs/FileReader.java
deleted file mode 100644
index 3bca2f3..0000000
--- a/test-lib/src/main/java/com/cloudera/livy/test/jobs/FileReader.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.jobs;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import org.apache.spark.SparkFiles;
-import org.apache.spark.api.java.function.Function;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-
-public class FileReader implements Job<String> {
-
- private final boolean isResource;
- private final String fileName;
-
- public FileReader(String fileName, boolean isResource) {
- this.fileName = fileName;
- this.isResource = isResource;
- }
-
- @Override
- public String call(JobContext jc) {
- return jc.sc().parallelize(Arrays.asList(1)).map(new Reader()).collect().get(0);
- }
-
- private class Reader implements Function<Integer, String> {
-
- @Override
- public String call(Integer i) throws Exception {
- InputStream in;
- if (isResource) {
- ClassLoader ccl = Thread.currentThread().getContextClassLoader();
- in = ccl.getResourceAsStream(fileName);
- if (in == null) {
- throw new IOException("Resource not found: " + fileName);
- }
- } else {
- in = new FileInputStream(SparkFiles.get(fileName));
- }
- try {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- byte[] buf = new byte[1024];
- int read;
- while ((read = in.read(buf)) >= 0) {
- out.write(buf, 0, read);
- }
- byte[] bytes = out.toByteArray();
- return new String(bytes, 0, bytes.length, UTF_8);
- } finally {
- in.close();
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/com/cloudera/livy/test/jobs/GetCurrentUser.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/com/cloudera/livy/test/jobs/GetCurrentUser.java b/test-lib/src/main/java/com/cloudera/livy/test/jobs/GetCurrentUser.java
deleted file mode 100644
index 5919896..0000000
--- a/test-lib/src/main/java/com/cloudera/livy/test/jobs/GetCurrentUser.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.jobs;
-
-import org.apache.hadoop.security.UserGroupInformation;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-
-public class GetCurrentUser implements Job<String> {
-
- @Override
- public String call(JobContext jc) throws Exception {
- return UserGroupInformation.getCurrentUser().getUserName();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/com/cloudera/livy/test/jobs/SQLGetTweets.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/com/cloudera/livy/test/jobs/SQLGetTweets.java b/test-lib/src/main/java/com/cloudera/livy/test/jobs/SQLGetTweets.java
deleted file mode 100644
index 15aa79b..0000000
--- a/test-lib/src/main/java/com/cloudera/livy/test/jobs/SQLGetTweets.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.jobs;
-
-import java.io.File;
-import java.io.InputStream;
-import java.net.URI;
-import java.nio.file.Files;
-import java.nio.file.StandardCopyOption;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-
-public class SQLGetTweets implements Job<List<String>> {
-
- private final boolean useHiveContext;
-
- public SQLGetTweets(boolean useHiveContext) {
- this.useHiveContext = useHiveContext;
- }
-
- @Override
- public List<String> call(JobContext jc) throws Exception {
- InputStream source = getClass().getResourceAsStream("/testweet.json");
-
- // Save the resource as a file in HDFS (or the local tmp dir when using a local filesystem).
- URI input;
- File local = File.createTempFile("tweets", ".json", jc.getLocalTmpDir());
- Files.copy(source, local.toPath(), StandardCopyOption.REPLACE_EXISTING);
- FileSystem fs = FileSystem.get(jc.sc().sc().hadoopConfiguration());
- if ("file".equals(fs.getUri().getScheme())) {
- input = local.toURI();
- } else {
- String uuid = UUID.randomUUID().toString();
- Path target = new Path("/tmp/" + uuid + "-tweets.json");
- fs.copyFromLocalFile(new Path(local.toURI()), target);
- input = target.toUri();
- }
-
- SQLContext sqlctx = useHiveContext ? jc.hivectx() : jc.sqlctx();
- sqlctx.jsonFile(input.toString()).registerTempTable("tweets");
-
- List<String> tweetList = new ArrayList<>();
- Row[] result =
- (Row[])(sqlctx.sql("SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10")
- .collect());
- for (Row r : result) {
- tweetList.add(r.toString());
- }
- return tweetList;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/com/cloudera/livy/test/jobs/Sleeper.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/com/cloudera/livy/test/jobs/Sleeper.java b/test-lib/src/main/java/com/cloudera/livy/test/jobs/Sleeper.java
deleted file mode 100644
index a3c33d4..0000000
--- a/test-lib/src/main/java/com/cloudera/livy/test/jobs/Sleeper.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.jobs;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-
-public class Sleeper implements Job<Void> {
-
- private final long millis;
-
- public Sleeper(long millis) {
- this.millis = millis;
- }
-
- @Override
- public Void call(JobContext jc) throws Exception {
- Thread.sleep(millis);
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/com/cloudera/livy/test/jobs/SmallCount.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/com/cloudera/livy/test/jobs/SmallCount.java b/test-lib/src/main/java/com/cloudera/livy/test/jobs/SmallCount.java
deleted file mode 100644
index fb74027..0000000
--- a/test-lib/src/main/java/com/cloudera/livy/test/jobs/SmallCount.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.jobs;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-
-public class SmallCount implements Job<Long> {
-
- private final int count;
-
- public SmallCount(int count) {
- this.count = count;
- }
-
- @Override
- public Long call(JobContext jc) {
- Random r = new Random();
- int partitions = Math.min(r.nextInt(10) + 1, count);
-
- List<Integer> elements = new ArrayList<>(count);
- for (int i = 0; i < count; i++) {
- elements.add(r.nextInt());
- }
-
- return jc.sc().parallelize(elements, partitions).count();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/com/cloudera/livy/test/jobs/VoidJob.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/com/cloudera/livy/test/jobs/VoidJob.java b/test-lib/src/main/java/com/cloudera/livy/test/jobs/VoidJob.java
deleted file mode 100644
index 0333ccf..0000000
--- a/test-lib/src/main/java/com/cloudera/livy/test/jobs/VoidJob.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.jobs;
-
-import com.cloudera.livy.Job;
-import com.cloudera.livy.JobContext;
-
-public class VoidJob implements Job<Void> {
- @Override
- public Void call(JobContext jc) {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/org/apache/livy/test/apps/FailingApp.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/org/apache/livy/test/apps/FailingApp.java b/test-lib/src/main/java/org/apache/livy/test/apps/FailingApp.java
new file mode 100644
index 0000000..6fa4850
--- /dev/null
+++ b/test-lib/src/main/java/org/apache/livy/test/apps/FailingApp.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.test.apps;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class FailingApp {
+
+ public static void main(String[] args) throws Exception {
+ if (args.length != 1) {
+ throw new IllegalArgumentException("Missing output path.");
+ }
+ String output = args[0];
+
+ FileSystem fs = FileSystem.get(new Configuration());
+ Path out = new Path(output);
+ fs.create(out).close();
+
+ throw new IllegalStateException("This app always fails.");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/org/apache/livy/test/apps/SimpleSparkApp.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/org/apache/livy/test/apps/SimpleSparkApp.java b/test-lib/src/main/java/org/apache/livy/test/apps/SimpleSparkApp.java
new file mode 100644
index 0000000..ef8e1a6
--- /dev/null
+++ b/test-lib/src/main/java/org/apache/livy/test/apps/SimpleSparkApp.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.test.apps;
+
+import java.util.Arrays;
+import java.util.List;
+
+import scala.Tuple2;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFunction;
+
+public class SimpleSparkApp {
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 1 || args.length > 2) {
+ throw new IllegalArgumentException(
+ "Invalid arguments. <output path> [exit after output=true]>");
+ }
+
+ String output = args[0];
+ Boolean exitAfterOutput = true;
+ if (args.length == 2) {
+ exitAfterOutput = Boolean.parseBoolean(args[1]);
+ }
+
+ JavaSparkContext sc = new JavaSparkContext();
+ try {
+ List<String> data = Arrays.asList("the", "quick", "brown", "fox", "jumped", "over", "the",
+ "lazy", "dog");
+
+ JavaPairRDD<String, Integer> rdd = sc.parallelize(data, 3)
+ .mapToPair(new Counter());
+ rdd.saveAsTextFile(output);
+
+ if (!exitAfterOutput) {
+ while (true) {
+ Thread.sleep(60 * 60 * 1000);
+ }
+ }
+ } finally {
+ sc.close();
+ }
+ }
+
+ private static class Counter implements PairFunction<String, String, Integer> {
+
+ @Override
+ public Tuple2<String, Integer> call(String s) throws Exception {
+ return new Tuple2<>(s, s.length());
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/org/apache/livy/test/jobs/Echo.java
----------------------------------------------------------------------
diff --git a/test-lib/src/main/java/org/apache/livy/test/jobs/Echo.java b/test-lib/src/main/java/org/apache/livy/test/jobs/Echo.java
new file mode 100644
index 0000000..3c3c834
--- /dev/null
+++ b/test-lib/src/main/java/org/apache/livy/test/jobs/Echo.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.test.jobs;
+
+import org.apache.livy.Job;
+import org.apache.livy.JobContext;
+
+public class Echo<T> implements Job<T> {
+
+ private final T value;
+
+ public Echo(T value) {
+ this.value = value;
+ }
+
+ @Override
+ public T call(JobContext jc) {
+ return value;
+ }
+
+}