You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/07/05 03:06:33 UTC
[04/33] incubator-livy git commit: LIVY-375. Change Livy code package
name to org.apache.livy
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/server/interactive/CreateInteractiveRequestSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/com/cloudera/livy/server/interactive/CreateInteractiveRequestSpec.scala b/server/src/test/scala/com/cloudera/livy/server/interactive/CreateInteractiveRequestSpec.scala
deleted file mode 100644
index 168d487..0000000
--- a/server/src/test/scala/com/cloudera/livy/server/interactive/CreateInteractiveRequestSpec.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.server.interactive
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import org.scalatest.FunSpec
-
-import com.cloudera.livy.LivyBaseUnitTestSuite
-import com.cloudera.livy.sessions.{PySpark, SessionKindModule}
-
-class CreateInteractiveRequestSpec extends FunSpec with LivyBaseUnitTestSuite {
-
- private val mapper = new ObjectMapper()
- .registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule)
- .registerModule(new SessionKindModule())
-
- describe("CreateInteractiveRequest") {
-
- it("should have default values for fields after deserialization") {
- val json = """{ "kind" : "pyspark" }"""
- val req = mapper.readValue(json, classOf[CreateInteractiveRequest])
- assert(req.kind === PySpark())
- assert(req.proxyUser === None)
- assert(req.jars === List())
- assert(req.pyFiles === List())
- assert(req.files === List())
- assert(req.driverMemory === None)
- assert(req.driverCores === None)
- assert(req.executorMemory === None)
- assert(req.executorCores === None)
- assert(req.numExecutors === None)
- assert(req.archives === List())
- assert(req.queue === None)
- assert(req.name === None)
- assert(req.conf === Map())
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionServletSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionServletSpec.scala b/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionServletSpec.scala
deleted file mode 100644
index c546718..0000000
--- a/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionServletSpec.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.server.interactive
-
-import java.util.concurrent.atomic.AtomicInteger
-import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Future
-
-import org.json4s.jackson.Json4sScalaModule
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
-import org.scalatest.Entry
-import org.scalatest.mock.MockitoSugar.mock
-
-import com.cloudera.livy.{ExecuteRequest, LivyConf}
-import com.cloudera.livy.client.common.HttpMessages.SessionInfo
-import com.cloudera.livy.rsc.driver.{Statement, StatementState}
-import com.cloudera.livy.server.recovery.SessionStore
-import com.cloudera.livy.sessions._
-import com.cloudera.livy.utils.AppInfo
-
-class InteractiveSessionServletSpec extends BaseInteractiveServletSpec {
-
- mapper.registerModule(new Json4sScalaModule())
-
- class MockInteractiveSessionServlet(
- sessionManager: InteractiveSessionManager,
- conf: LivyConf)
- extends InteractiveSessionServlet(sessionManager, mock[SessionStore], conf) {
-
- private var statements = IndexedSeq[Statement]()
-
- override protected def createSession(req: HttpServletRequest): InteractiveSession = {
- val statementCounter = new AtomicInteger()
-
- val session = mock[InteractiveSession]
- when(session.kind).thenReturn(Spark())
- when(session.appId).thenReturn(None)
- when(session.appInfo).thenReturn(AppInfo())
- when(session.logLines()).thenReturn(IndexedSeq())
- when(session.state).thenReturn(SessionState.Idle())
- when(session.stop()).thenReturn(Future.successful(()))
- when(session.proxyUser).thenReturn(None)
- when(session.statements).thenAnswer(
- new Answer[IndexedSeq[Statement]]() {
- override def answer(args: InvocationOnMock): IndexedSeq[Statement] = statements
- })
- when(session.executeStatement(any(classOf[ExecuteRequest]))).thenAnswer(
- new Answer[Statement]() {
- override def answer(args: InvocationOnMock): Statement = {
- val id = statementCounter.getAndIncrement
- val statement = new Statement(id, "1+1", StatementState.Available, "1")
-
- statements :+= statement
- statement
- }
- })
- when(session.cancelStatement(anyInt())).thenAnswer(
- new Answer[Unit] {
- override def answer(args: InvocationOnMock): Unit = {
- statements = IndexedSeq(
- new Statement(statementCounter.get(), null, StatementState.Cancelled, null))
- }
- }
- )
-
- session
- }
-
- }
-
- override def createServlet(): InteractiveSessionServlet = {
- val conf = createConf()
- val sessionManager = new InteractiveSessionManager(conf, mock[SessionStore], Some(Seq.empty))
- new MockInteractiveSessionServlet(sessionManager, conf)
- }
-
- it("should setup and tear down an interactive session") {
- jget[Map[String, Any]]("/") { data =>
- data("sessions") should equal(Seq())
- }
-
- jpost[Map[String, Any]]("/", createRequest()) { data =>
- header("Location") should equal("/0")
- data("id") should equal (0)
-
- val session = servlet.sessionManager.get(0)
- session should be (defined)
- }
-
- jget[Map[String, Any]]("/0") { data =>
- data("id") should equal (0)
- data("state") should equal ("idle")
-
- val batch = servlet.sessionManager.get(0)
- batch should be (defined)
- }
-
- jpost[Map[String, Any]]("/0/statements", ExecuteRequest("foo")) { data =>
- data("id") should be (0)
- data("code") shouldBe "1+1"
- data("progress") should be (0.0)
- data("output") shouldBe 1
- }
-
- jget[Map[String, Any]]("/0/statements") { data =>
- data("total_statements") should be (1)
- data("statements").asInstanceOf[Seq[Map[String, Any]]](0)("id") should be (0)
- }
-
- jpost[Map[String, Any]]("/0/statements/0/cancel", null, HttpServletResponse.SC_OK) { data =>
- data should equal(Map("msg" -> "canceled"))
- }
-
- jget[Map[String, Any]]("/0/statements") { data =>
- data("total_statements") should be (1)
- data("statements").asInstanceOf[Seq[Map[String, Any]]](0)("state") should be ("cancelled")
- }
-
- jdelete[Map[String, Any]]("/0") { data =>
- data should equal (Map("msg" -> "deleted"))
-
- val session = servlet.sessionManager.get(0)
- session should not be defined
- }
- }
-
- it("should show session properties") {
- val id = 0
- val appId = "appid"
- val owner = "owner"
- val proxyUser = "proxyUser"
- val state = SessionState.Running()
- val kind = Spark()
- val appInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL"))
- val log = IndexedSeq[String]("log1", "log2")
-
- val session = mock[InteractiveSession]
- when(session.id).thenReturn(id)
- when(session.appId).thenReturn(Some(appId))
- when(session.owner).thenReturn(owner)
- when(session.proxyUser).thenReturn(Some(proxyUser))
- when(session.state).thenReturn(state)
- when(session.kind).thenReturn(kind)
- when(session.appInfo).thenReturn(appInfo)
- when(session.logLines()).thenReturn(log)
-
- val req = mock[HttpServletRequest]
-
- val view = servlet.asInstanceOf[InteractiveSessionServlet].clientSessionView(session, req)
- .asInstanceOf[SessionInfo]
-
- view.id shouldEqual id
- view.appId shouldEqual appId
- view.owner shouldEqual owner
- view.proxyUser shouldEqual proxyUser
- view.state shouldEqual state.toString
- view.kind shouldEqual kind.toString
- view.appInfo should contain (Entry(AppInfo.DRIVER_LOG_URL_NAME, appInfo.driverLogUrl.get))
- view.appInfo should contain (Entry(AppInfo.SPARK_UI_URL_NAME, appInfo.sparkUiUrl.get))
- view.log shouldEqual log.asJava
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala
deleted file mode 100644
index 7d92a43..0000000
--- a/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.server.interactive
-
-import java.net.URI
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import org.apache.spark.launcher.SparkLauncher
-import org.json4s.{DefaultFormats, Extraction, JValue}
-import org.json4s.jackson.JsonMethods.parse
-import org.mockito.{Matchers => MockitoMatchers}
-import org.mockito.Matchers._
-import org.mockito.Mockito.{atLeastOnce, verify, when}
-import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers}
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.mock.MockitoSugar.mock
-
-import com.cloudera.livy.{ExecuteRequest, JobHandle, LivyBaseUnitTestSuite, LivyConf}
-import com.cloudera.livy.rsc.{PingJob, RSCClient, RSCConf}
-import com.cloudera.livy.rsc.driver.StatementState
-import com.cloudera.livy.server.recovery.SessionStore
-import com.cloudera.livy.sessions.{PySpark, SessionState, Spark}
-import com.cloudera.livy.utils.{AppInfo, SparkApp}
-
-class InteractiveSessionSpec extends FunSpec
- with Matchers with BeforeAndAfterAll with LivyBaseUnitTestSuite {
-
- private val livyConf = new LivyConf()
- livyConf.set(LivyConf.REPL_JARS, "dummy.jar")
- .set(LivyConf.LIVY_SPARK_VERSION, "1.6.0")
- .set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10.5")
-
- implicit val formats = DefaultFormats
-
- private var session: InteractiveSession = null
-
- private def createSession(
- sessionStore: SessionStore = mock[SessionStore],
- mockApp: Option[SparkApp] = None): InteractiveSession = {
- assume(sys.env.get("SPARK_HOME").isDefined, "SPARK_HOME is not set.")
-
- val req = new CreateInteractiveRequest()
- req.kind = PySpark()
- req.driverMemory = Some("512m")
- req.driverCores = Some(1)
- req.executorMemory = Some("512m")
- req.executorCores = Some(1)
- req.name = Some("InteractiveSessionSpec")
- req.conf = Map(
- SparkLauncher.DRIVER_EXTRA_CLASSPATH -> sys.props("java.class.path"),
- RSCConf.Entry.LIVY_JARS.key() -> ""
- )
- InteractiveSession.create(0, null, None, livyConf, req, sessionStore, mockApp)
- }
-
- private def executeStatement(code: String): JValue = {
- val id = session.executeStatement(ExecuteRequest(code)).id
- eventually(timeout(30 seconds), interval(100 millis)) {
- val s = session.getStatement(id).get
- s.state.get() shouldBe StatementState.Available
- parse(s.output)
- }
- }
-
- override def afterAll(): Unit = {
- if (session != null) {
- Await.ready(session.stop(), 30 seconds)
- session = null
- }
- super.afterAll()
- }
-
- private def withSession(desc: String)(fn: (InteractiveSession) => Unit): Unit = {
- it(desc) {
- assume(session != null, "No active session.")
- eventually(timeout(30 seconds), interval(100 millis)) {
- session.state shouldBe a[SessionState.Idle]
- }
- fn(session)
- }
- }
-
- describe("A spark session") {
-
- it("should get scala version matched jars with livy.repl.jars") {
- val testedJars = Seq(
- "test_2.10-0.1.jar",
- "local://dummy-path/test/test1_2.10-1.0.jar",
- "file:///dummy-path/test/test2_2.11-1.0-SNAPSHOT.jar",
- "hdfs:///dummy-path/test/test3.jar",
- "non-jar",
- "dummy.jar"
- )
- val livyConf = new LivyConf(false)
- .set(LivyConf.REPL_JARS, testedJars.mkString(","))
- .set(LivyConf.LIVY_SPARK_VERSION, "1.6.2")
- .set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10")
- val properties = InteractiveSession.prepareBuilderProp(Map.empty, Spark(), livyConf)
- assert(properties(LivyConf.SPARK_JARS).split(",").toSet === Set("test_2.10-0.1.jar",
- "local://dummy-path/test/test1_2.10-1.0.jar",
- "hdfs:///dummy-path/test/test3.jar",
- "dummy.jar"))
-
- livyConf.set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.11")
- val properties1 = InteractiveSession.prepareBuilderProp(Map.empty, Spark(), livyConf)
- assert(properties1(LivyConf.SPARK_JARS).split(",").toSet === Set(
- "file:///dummy-path/test/test2_2.11-1.0-SNAPSHOT.jar",
- "hdfs:///dummy-path/test/test3.jar",
- "dummy.jar"))
- }
-
-
- it("should set rsc jars through livy conf") {
- val rscJars = Set(
- "dummy.jar",
- "local:///dummy-path/dummy1.jar",
- "file:///dummy-path/dummy2.jar",
- "hdfs:///dummy-path/dummy3.jar")
- val livyConf = new LivyConf(false)
- .set(LivyConf.REPL_JARS, "dummy.jar")
- .set(LivyConf.RSC_JARS, rscJars.mkString(","))
- .set(LivyConf.LIVY_SPARK_VERSION, "1.6.2")
- .set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10")
- val properties = InteractiveSession.prepareBuilderProp(Map.empty, Spark(), livyConf)
- // if livy.rsc.jars is configured in LivyConf, it should be passed to RSCConf.
- properties(RSCConf.Entry.LIVY_JARS.key()).split(",").toSet === rscJars
-
- val rscJars1 = Set(
- "foo.jar",
- "local:///dummy-path/foo1.jar",
- "file:///dummy-path/foo2.jar",
- "hdfs:///dummy-path/foo3.jar")
- val properties1 = InteractiveSession.prepareBuilderProp(
- Map(RSCConf.Entry.LIVY_JARS.key() -> rscJars1.mkString(",")), Spark(), livyConf)
- // if rsc jars are configured both in LivyConf and RSCConf, RSCConf should take precedence.
- properties1(RSCConf.Entry.LIVY_JARS.key()).split(",").toSet === rscJars1
- }
-
- it("should start in the idle state") {
- session = createSession()
- session.state should (be(a[SessionState.Starting]) or be(a[SessionState.Idle]))
- }
-
- it("should update appId and appInfo and session store") {
- val mockApp = mock[SparkApp]
- val sessionStore = mock[SessionStore]
- val session = createSession(sessionStore, Some(mockApp))
-
- val expectedAppId = "APPID"
- session.appIdKnown(expectedAppId)
- session.appId shouldEqual Some(expectedAppId)
-
- val expectedAppInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL"))
- session.infoChanged(expectedAppInfo)
- session.appInfo shouldEqual expectedAppInfo
-
- verify(sessionStore, atLeastOnce()).save(
- MockitoMatchers.eq(InteractiveSession.RECOVERY_SESSION_TYPE), anyObject())
- }
-
- withSession("should execute `1 + 2` == 3") { session =>
- val result = executeStatement("1 + 2")
- val expectedResult = Extraction.decompose(Map(
- "status" -> "ok",
- "execution_count" -> 0,
- "data" -> Map(
- "text/plain" -> "3"
- )
- ))
-
- result should equal (expectedResult)
- }
-
- withSession("should report an error if accessing an unknown variable") { session =>
- val result = executeStatement("x")
- val expectedResult = Extraction.decompose(Map(
- "status" -> "error",
- "execution_count" -> 1,
- "ename" -> "NameError",
- "evalue" -> "name 'x' is not defined",
- "traceback" -> List(
- "Traceback (most recent call last):\n",
- "NameError: name 'x' is not defined\n"
- )
- ))
-
- result should equal (expectedResult)
- eventually(timeout(10 seconds), interval(30 millis)) {
- session.state shouldBe a[SessionState.Idle]
- }
- }
-
- withSession("should get statement progress along with statement result") { session =>
- val code =
- """
- |from time import sleep
- |sleep(3)
- """.stripMargin
- val statement = session.executeStatement(ExecuteRequest(code))
- statement.progress should be (0.0)
-
- eventually(timeout(10 seconds), interval(100 millis)) {
- val s = session.getStatement(statement.id).get
- s.state.get() shouldBe StatementState.Available
- s.progress should be (1.0)
- }
- }
-
- withSession("should error out the session if the interpreter dies") { session =>
- session.executeStatement(ExecuteRequest("import os; os._exit(666)"))
- eventually(timeout(30 seconds), interval(100 millis)) {
- session.state shouldBe a[SessionState.Error]
- }
- }
- }
-
- describe("recovery") {
- it("should recover session") {
- val conf = new LivyConf()
- val sessionStore = mock[SessionStore]
- val mockClient = mock[RSCClient]
- when(mockClient.submit(any(classOf[PingJob]))).thenReturn(mock[JobHandle[Void]])
- val m =
- InteractiveRecoveryMetadata(
- 78, None, "appTag", Spark(), 0, null, None, Some(URI.create("")))
- val s = InteractiveSession.recover(m, conf, sessionStore, None, Some(mockClient))
-
- s.state shouldBe a[SessionState.Recovering]
-
- s.appIdKnown("appId")
- verify(sessionStore, atLeastOnce()).save(
- MockitoMatchers.eq(InteractiveSession.RECOVERY_SESSION_TYPE), anyObject())
- }
-
- it("should recover session to dead state if rscDriverUri is unknown") {
- val conf = new LivyConf()
- val sessionStore = mock[SessionStore]
- val m = InteractiveRecoveryMetadata(
- 78, Some("appId"), "appTag", Spark(), 0, null, None, None)
- val s = InteractiveSession.recover(m, conf, sessionStore, None)
-
- s.state shouldBe a[SessionState.Dead]
- s.logLines().mkString should include("RSCDriver URI is unknown")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/server/interactive/JobApiSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/com/cloudera/livy/server/interactive/JobApiSpec.scala b/server/src/test/scala/com/cloudera/livy/server/interactive/JobApiSpec.scala
deleted file mode 100644
index f8c7c3f..0000000
--- a/server/src/test/scala/com/cloudera/livy/server/interactive/JobApiSpec.scala
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.server.interactive
-
-import java.io.File
-import java.net.URI
-import java.nio.ByteBuffer
-import java.nio.file.{Files, Paths}
-import javax.servlet.http.HttpServletResponse._
-
-import scala.concurrent.duration._
-import scala.io.Source
-import scala.language.postfixOps
-
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.mock.MockitoSugar.mock
-
-import com.cloudera.livy.{Job, JobHandle}
-import com.cloudera.livy.client.common.{BufferUtils, Serializer}
-import com.cloudera.livy.client.common.HttpMessages._
-import com.cloudera.livy.server.RemoteUserOverride
-import com.cloudera.livy.server.recovery.SessionStore
-import com.cloudera.livy.sessions.{InteractiveSessionManager, SessionState}
-import com.cloudera.livy.test.jobs.{Echo, GetCurrentUser}
-
-class JobApiSpec extends BaseInteractiveServletSpec {
-
- private val PROXY = "__proxy__"
-
- private var sessionId: Int = -1
-
- override def createServlet(): InteractiveSessionServlet = {
- val conf = createConf()
- val sessionStore = mock[SessionStore]
- val sessionManager = new InteractiveSessionManager(conf, sessionStore, Some(Seq.empty))
- new InteractiveSessionServlet(sessionManager, sessionStore, conf) with RemoteUserOverride
- }
-
- def withSessionId(desc: String)(fn: (Int) => Unit): Unit = {
- it(desc) {
- assume(sessionId != -1, "No active session.")
- fn(sessionId)
- }
- }
-
- describe("Interactive Servlet") {
-
- it("should create sessions") {
- jpost[SessionInfo]("/", createRequest()) { data =>
- waitForIdle(data.id)
- header("Location") should equal("/0")
- data.id should equal (0)
- sessionId = data.id
- }
- }
-
- withSessionId("should handle asynchronous jobs") { testJobSubmission(_, false) }
-
- withSessionId("should handle synchronous jobs") { testJobSubmission(_, true) }
-
- // Test that the file does get copied over to the live home dir on HDFS - does not test end
- // to end that the RSCClient class copies it over to the app.
- withSessionId("should support file uploads") { id =>
- testResourceUpload("file", id)
- }
-
- withSessionId("should support jar uploads") { id =>
- testResourceUpload("jar", id)
- }
-
- withSessionId("should monitor async Spark jobs") { sid =>
- val ser = new Serializer()
- val job = BufferUtils.toByteArray(ser.serialize(new Echo("hello")))
- var jobId: Long = -1L
- jpost[JobStatus](s"/$sid/submit-job", new SerializedJob(job)) { status =>
- jobId = status.id
- }
-
- eventually(timeout(1 minute), interval(100 millis)) {
- jget[JobStatus](s"/$sid/jobs/$jobId") { status =>
- status.state should be (JobHandle.State.SUCCEEDED)
- }
- }
- }
-
- withSessionId("should update last activity on connect") { sid =>
- val currentActivity = servlet.sessionManager.get(sid).get.lastActivity
- jpost[SessionInfo](s"/$sid/connect", null, expectedStatus = SC_OK) { info =>
- val newActivity = servlet.sessionManager.get(sid).get.lastActivity
- assert(newActivity > currentActivity)
- }
- }
-
- withSessionId("should tear down sessions") { id =>
- jdelete[Map[String, Any]](s"/$id") { data =>
- data should equal (Map("msg" -> "deleted"))
- }
- jget[Map[String, Any]]("/") { data =>
- data("sessions") match {
- case contents: Seq[_] => contents.size should equal (0)
- case _ => fail("Response is not an array.")
- }
- }
-
- // Make sure the session's staging directory was cleaned up.
- assert(tempDir.listFiles().length === 0)
- }
-
- it("should support user impersonation") {
- val headers = makeUserHeaders(PROXY)
- jpost[SessionInfo]("/", createRequest(inProcess = false), headers = headers) { data =>
- try {
- waitForIdle(data.id)
- data.owner should be (PROXY)
- data.proxyUser should be (PROXY)
- val user = runJob(data.id, new GetCurrentUser(), headers = headers)
- user should be (PROXY)
- } finally {
- deleteSession(data.id)
- }
- }
- }
-
- it("should honor impersonation requests") {
- val request = createRequest(inProcess = false)
- request.proxyUser = Some(PROXY)
- jpost[SessionInfo]("/", request, headers = adminHeaders) { data =>
- try {
- waitForIdle(data.id)
- data.owner should be (ADMIN)
- data.proxyUser should be (PROXY)
- val user = runJob(data.id, new GetCurrentUser(), headers = adminHeaders)
- user should be (PROXY)
-
- // Test that files are uploaded to a new session directory.
- assert(tempDir.listFiles().length === 0)
- testResourceUpload("file", data.id)
- } finally {
- deleteSession(data.id)
- assert(tempDir.listFiles().length === 0)
- }
- }
- }
-
- it("should respect config black list") {
- jpost[SessionInfo]("/", createRequest(extraConf = BLACKLISTED_CONFIG),
- expectedStatus = SC_BAD_REQUEST) { _ => }
- }
-
- }
-
- private def waitForIdle(id: Int): Unit = {
- eventually(timeout(1 minute), interval(100 millis)) {
- jget[SessionInfo](s"/$id") { status =>
- status.state should be (SessionState.Idle().toString())
- }
- }
- }
-
- private def deleteSession(id: Int): Unit = {
- jdelete[Map[String, Any]](s"/$id", headers = adminHeaders) { _ => }
- }
-
- private def testResourceUpload(cmd: String, sessionId: Int): Unit = {
- val f = File.createTempFile("uploadTestFile", cmd)
- val conf = createConf()
-
- Files.write(Paths.get(f.getAbsolutePath), "Test data".getBytes())
-
- jupload[Unit](s"/$sessionId/upload-$cmd", Map(cmd -> f), expectedStatus = SC_OK) { _ =>
- // There should be a single directory under the staging dir.
- val subdirs = tempDir.listFiles()
- assert(subdirs.length === 1)
- val stagingDir = subdirs(0).toURI().toString()
-
- val resultFile = new File(new URI(s"$stagingDir/${f.getName}"))
- resultFile.deleteOnExit()
- resultFile.exists() should be(true)
- Source.fromFile(resultFile).mkString should be("Test data")
- }
- }
-
- private def testJobSubmission(sid: Int, sync: Boolean): Unit = {
- val result = runJob(sid, new Echo(42), sync = sync)
- result should be (42)
- }
-
- private def runJob[T](
- sid: Int,
- job: Job[T],
- sync: Boolean = false,
- headers: Map[String, String] = defaultHeaders): T = {
- val ser = new Serializer()
- val jobData = BufferUtils.toByteArray(ser.serialize(job))
- val route = if (sync) s"/$sid/submit-job" else s"/$sid/run-job"
- var jobId: Long = -1L
- jpost[JobStatus](route, new SerializedJob(jobData), headers = headers) { data =>
- jobId = data.id
- }
-
- var result: Option[T] = None
- eventually(timeout(1 minute), interval(100 millis)) {
- jget[JobStatus](s"/$sid/jobs/$jobId") { status =>
- status.id should be (jobId)
- status.state should be (JobHandle.State.SUCCEEDED)
- result = Some(ser.deserialize(ByteBuffer.wrap(status.result)).asInstanceOf[T])
- }
- }
- result.getOrElse(throw new IllegalStateException())
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/server/interactive/SessionHeartbeatSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/com/cloudera/livy/server/interactive/SessionHeartbeatSpec.scala b/server/src/test/scala/com/cloudera/livy/server/interactive/SessionHeartbeatSpec.scala
deleted file mode 100644
index 36eb7ef..0000000
--- a/server/src/test/scala/com/cloudera/livy/server/interactive/SessionHeartbeatSpec.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.server.interactive
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import org.mockito.Mockito.{never, verify, when}
-import org.scalatest.{FunSpec, Matchers}
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.mock.MockitoSugar.mock
-
-import com.cloudera.livy.LivyConf
-import com.cloudera.livy.server.recovery.SessionStore
-import com.cloudera.livy.sessions.{Session, SessionManager}
-import com.cloudera.livy.sessions.Session.RecoveryMetadata
-
-class SessionHeartbeatSpec extends FunSpec with Matchers {
- describe("SessionHeartbeat") {
- class TestHeartbeat(override val heartbeatTimeout: FiniteDuration) extends SessionHeartbeat {}
-
- it("should not expire if heartbeat was never called.") {
- val t = new TestHeartbeat(Duration.Zero)
- t.heartbeatExpired shouldBe false
- }
-
- it("should expire if time has elapsed.") {
- val t = new TestHeartbeat(Duration.fromNanos(1))
- t.heartbeat()
- eventually(timeout(2 nano), interval(1 nano)) {
- t.heartbeatExpired shouldBe true
- }
- }
-
- it("should not expire if time hasn't elapsed.") {
- val t = new TestHeartbeat(Duration.create(1, DAYS))
- t.heartbeat()
- t.heartbeatExpired shouldBe false
- }
- }
-
- describe("SessionHeartbeatWatchdog") {
- abstract class TestSession extends Session(0, null, null) with SessionHeartbeat {}
- class TestWatchdog(conf: LivyConf)
- extends SessionManager[TestSession, RecoveryMetadata](
- conf,
- { _ => assert(false).asInstanceOf[TestSession] },
- mock[SessionStore],
- "test",
- Some(Seq.empty))
- with SessionHeartbeatWatchdog[TestSession, RecoveryMetadata] {}
-
- it("should delete only expired sessions") {
- val expiredSession: TestSession = mock[TestSession]
- when(expiredSession.id).thenReturn(0)
- when(expiredSession.heartbeatExpired).thenReturn(true)
-
- val nonExpiredSession: TestSession = mock[TestSession]
- when(nonExpiredSession.id).thenReturn(1)
- when(nonExpiredSession.heartbeatExpired).thenReturn(false)
-
- val n = new TestWatchdog(new LivyConf())
-
- n.register(expiredSession)
- n.register(nonExpiredSession)
- n.deleteExpiredSessions()
-
- verify(expiredSession).stop()
- verify(nonExpiredSession, never).stop()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/server/recovery/BlackholeStateStoreSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/com/cloudera/livy/server/recovery/BlackholeStateStoreSpec.scala b/server/src/test/scala/com/cloudera/livy/server/recovery/BlackholeStateStoreSpec.scala
deleted file mode 100644
index c11feff..0000000
--- a/server/src/test/scala/com/cloudera/livy/server/recovery/BlackholeStateStoreSpec.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.server.recovery
-
-import org.scalatest.FunSpec
-import org.scalatest.Matchers._
-
-import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf}
-
-class BlackholeStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
- describe("BlackholeStateStore") {
- val stateStore = new BlackholeStateStore(new LivyConf())
-
- it("set should not throw") {
- stateStore.set("", 1.asInstanceOf[Object])
- }
-
- it("get should return None") {
- val v = stateStore.get[Object]("")
- v shouldBe None
- }
-
- it("getChildren should return empty list") {
- val c = stateStore.getChildren("")
- c shouldBe empty
- }
-
- it("remove should not throw") {
- stateStore.remove("")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/server/recovery/FileSystemStateStoreSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/com/cloudera/livy/server/recovery/FileSystemStateStoreSpec.scala b/server/src/test/scala/com/cloudera/livy/server/recovery/FileSystemStateStoreSpec.scala
deleted file mode 100644
index 9b7b0f3..0000000
--- a/server/src/test/scala/com/cloudera/livy/server/recovery/FileSystemStateStoreSpec.scala
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.server.recovery
-
-import java.io.{FileNotFoundException, InputStream, IOException}
-import java.util
-
-import org.apache.hadoop.fs._
-import org.apache.hadoop.fs.Options.{CreateOpts, Rename}
-import org.apache.hadoop.fs.permission.FsPermission
-import org.hamcrest.Description
-import org.mockito.ArgumentMatcher
-import org.mockito.Matchers.{any, anyInt, argThat, eq => equal}
-import org.mockito.Mockito.{atLeastOnce, verify, when}
-import org.mockito.internal.matchers.Equals
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
-import org.scalatest.FunSpec
-import org.scalatest.Matchers._
-import org.scalatest.mock.MockitoSugar.mock
-
-import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf}
-
-class FileSystemStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
- describe("FileSystemStateStore") {
- def pathEq(wantedPath: String): Path = argThat(new ArgumentMatcher[Path] {
- private val matcher = new Equals(wantedPath)
-
- override def matches(path: Any): Boolean = matcher.matches(path.toString)
-
- override def describeTo(d: Description): Unit = { matcher.describeTo(d) }
- })
-
- def makeConf(): LivyConf = {
- val conf = new LivyConf()
- conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "file://tmp/")
-
- conf
- }
-
- def mockFileContext(rootDirPermission: String): FileContext = {
- val fileContext = mock[FileContext]
- val rootDirStatus = mock[FileStatus]
- when(fileContext.getFileStatus(any())).thenReturn(rootDirStatus)
- when(rootDirStatus.getPermission).thenReturn(new FsPermission(rootDirPermission))
-
- fileContext
- }
-
- it("should throw if url is not configured") {
- intercept[IllegalArgumentException](new FileSystemStateStore(new LivyConf()))
- }
-
- it("should set and verify file permission") {
- val fileContext = mockFileContext("700")
- new FileSystemStateStore(makeConf(), Some(fileContext))
-
- verify(fileContext).setUMask(new FsPermission("077"))
- }
-
- it("should reject insecure permission") {
- def test(permission: String): Unit = {
- val fileContext = mockFileContext(permission)
-
- intercept[IllegalArgumentException](new FileSystemStateStore(makeConf(), Some(fileContext)))
- }
- test("600")
- test("400")
- test("677")
- test("670")
- test("607")
- }
-
- it("set should write with an intermediate file") {
- val fileContext = mockFileContext("700")
- val outputStream = mock[FSDataOutputStream]
- when(fileContext.create(pathEq("/key.tmp"), any[util.EnumSet[CreateFlag]], any[CreateOpts]))
- .thenReturn(outputStream)
-
- val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext))
-
- stateStore.set("key", "value")
-
- verify(outputStream).write(""""value"""".getBytes)
- verify(outputStream, atLeastOnce).close()
-
-
- verify(fileContext).rename(pathEq("/key.tmp"), pathEq("/key"), equal(Rename.OVERWRITE))
- verify(fileContext).delete(pathEq("/.key.tmp.crc"), equal(false))
- }
-
- it("get should read file") {
- val fileContext = mockFileContext("700")
- abstract class MockInputStream extends InputStream with Seekable with PositionedReadable {}
- val inputStream: InputStream = mock[MockInputStream]
- when(inputStream.read(any[Array[Byte]](), anyInt(), anyInt())).thenAnswer(new Answer[Int] {
- private var firstCall = true
- override def answer(invocation: InvocationOnMock): Int = {
- if (firstCall) {
- firstCall = false
- val buf = invocation.getArguments()(0).asInstanceOf[Array[Byte]]
- val b = """"value"""".getBytes()
- b.copyToArray(buf)
- b.length
- } else {
- -1
- }
- }
- })
-
- when(fileContext.open(pathEq("/key"))).thenReturn(new FSDataInputStream(inputStream))
-
- val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext))
-
- stateStore.get[String]("key") shouldBe Some("value")
-
- verify(inputStream, atLeastOnce).close()
- }
-
- it("get non-existent key should return None") {
- val fileContext = mockFileContext("700")
- when(fileContext.open(any())).thenThrow(new FileNotFoundException("Unit test"))
-
- val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext))
-
- stateStore.get[String]("key") shouldBe None
- }
-
- it("getChildren should list file") {
- val parentPath = "path"
- def makeFileStatus(name: String): FileStatus = {
- val fs = new FileStatus()
- fs.setPath(new Path(parentPath, name))
- fs
- }
- val children = Seq("c1", "c2")
-
- val fileContext = mockFileContext("700")
- val util = mock[FileContext#Util]
- when(util.listStatus(pathEq(s"/$parentPath")))
- .thenReturn(children.map(makeFileStatus).toArray)
- when(fileContext.util()).thenReturn(util)
-
- val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext))
- stateStore.getChildren(parentPath) should contain theSameElementsAs children
- }
-
- def getChildrenErrorTest(error: Exception): Unit = {
- val parentPath = "path"
-
- val fileContext = mockFileContext("700")
- val util = mock[FileContext#Util]
- when(util.listStatus(pathEq(s"/$parentPath"))).thenThrow(error)
- when(fileContext.util()).thenReturn(util)
-
- val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext))
- stateStore.getChildren(parentPath) shouldBe empty
- }
-
- it("getChildren should return empty list if the key doesn't exist") {
- getChildrenErrorTest(new IOException("Unit test"))
- }
-
- it("getChildren should return empty list if key doesn't exist") {
- getChildrenErrorTest(new FileNotFoundException("Unit test"))
- }
-
- it("remove should delete file") {
- val fileContext = mockFileContext("700")
-
- val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext))
- stateStore.remove("key")
-
- verify(fileContext).delete(pathEq("/key"), equal(false))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/server/recovery/SessionStoreSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/com/cloudera/livy/server/recovery/SessionStoreSpec.scala b/server/src/test/scala/com/cloudera/livy/server/recovery/SessionStoreSpec.scala
deleted file mode 100644
index 3435c2e..0000000
--- a/server/src/test/scala/com/cloudera/livy/server/recovery/SessionStoreSpec.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.server.recovery
-
-import scala.util.Success
-
-import org.mockito.Mockito._
-import org.scalatest.FunSpec
-import org.scalatest.Matchers._
-import org.scalatest.mock.MockitoSugar.mock
-
-import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf}
-import com.cloudera.livy.sessions.Session.RecoveryMetadata
-
-class SessionStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
- describe("SessionStore") {
- case class TestRecoveryMetadata(id: Int) extends RecoveryMetadata
-
- val sessionType = "test"
- val sessionPath = s"v1/$sessionType"
- val sessionManagerPath = s"v1/$sessionType/state"
-
- val conf = new LivyConf()
- it("should set session state and session counter when saving a session.") {
- val stateStore = mock[StateStore]
- val sessionStore = new SessionStore(conf, stateStore)
-
- val m = TestRecoveryMetadata(99)
- sessionStore.save(sessionType, m)
- verify(stateStore).set(s"$sessionPath/99", m)
- }
-
- it("should return existing sessions") {
- val validMetadata = Map(
- "0" -> Some(TestRecoveryMetadata(0)),
- "5" -> None,
- "77" -> Some(TestRecoveryMetadata(77)))
- val corruptedMetadata = Map(
- "7" -> new RuntimeException("Test"),
- "11212" -> new RuntimeException("Test")
- )
- val stateStore = mock[StateStore]
- val sessionStore = new SessionStore(conf, stateStore)
- when(stateStore.getChildren(sessionPath))
- .thenReturn((validMetadata ++ corruptedMetadata).keys.toList)
-
- validMetadata.foreach { case (id, m) =>
- when(stateStore.get[TestRecoveryMetadata](s"$sessionPath/$id")).thenReturn(m)
- }
-
- corruptedMetadata.foreach { case (id, ex) =>
- when(stateStore.get[TestRecoveryMetadata](s"$sessionPath/$id")).thenThrow(ex)
- }
-
- val s = sessionStore.getAllSessions[TestRecoveryMetadata](sessionType)
- // Verify normal metadata are retrieved.
- s.filter(_.isSuccess) should contain theSameElementsAs
- validMetadata.values.filter(_.isDefined).map(m => Success(m.get))
- // Verify exceptions are wrapped as in Try and are returned.
- s.filter(_.isFailure) should have size corruptedMetadata.size
- }
-
- it("should not throw if the state store is empty") {
- val stateStore = mock[StateStore]
- val sessionStore = new SessionStore(conf, stateStore)
- when(stateStore.getChildren(sessionPath)).thenReturn(Seq.empty)
-
- val s = sessionStore.getAllSessions[TestRecoveryMetadata](sessionType)
- s.filter(_.isSuccess) shouldBe empty
- }
-
- it("should return correct next session id") {
- val stateStore = mock[StateStore]
- val sessionStore = new SessionStore(conf, stateStore)
-
- when(stateStore.get[SessionManagerState](sessionManagerPath)).thenReturn(None)
- sessionStore.getNextSessionId(sessionType) shouldBe 0
-
- val sms = SessionManagerState(100)
- when(stateStore.get[SessionManagerState](sessionManagerPath)).thenReturn(Some(sms))
- sessionStore.getNextSessionId(sessionType) shouldBe sms.nextSessionId
- }
-
- it("should remove session") {
- val stateStore = mock[StateStore]
- val sessionStore = new SessionStore(conf, stateStore)
- val id = 1
-
- sessionStore.remove(sessionType, 1)
- verify(stateStore).remove(s"$sessionPath/$id")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/server/recovery/StateStoreSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/com/cloudera/livy/server/recovery/StateStoreSpec.scala b/server/src/test/scala/com/cloudera/livy/server/recovery/StateStoreSpec.scala
deleted file mode 100644
index c0c4918..0000000
--- a/server/src/test/scala/com/cloudera/livy/server/recovery/StateStoreSpec.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.server.recovery
-
-import scala.reflect.classTag
-
-import org.scalatest.{BeforeAndAfter, FunSpec}
-import org.scalatest.Matchers._
-
-import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf}
-import com.cloudera.livy.sessions.SessionManager
-
-class StateStoreSpec extends FunSpec with BeforeAndAfter with LivyBaseUnitTestSuite {
- describe("StateStore") {
- after {
- StateStore.cleanup()
- }
-
- def createConf(stateStore: String): LivyConf = {
- val conf = new LivyConf()
- conf.set(LivyConf.RECOVERY_MODE.key, SessionManager.SESSION_RECOVERY_MODE_RECOVERY)
- conf.set(LivyConf.RECOVERY_STATE_STORE.key, stateStore)
- conf
- }
-
- it("should throw an error on get if it's not initialized") {
- intercept[AssertionError] { StateStore.get }
- }
-
- it("should initialize blackhole state store if recovery is disabled") {
- StateStore.init(new LivyConf())
- StateStore.get shouldBe a[BlackholeStateStore]
- }
-
- it("should pick the correct store according to state store config") {
- StateStore.pickStateStore(createConf("filesystem")) shouldBe classOf[FileSystemStateStore]
- StateStore.pickStateStore(createConf("zookeeper")) shouldBe classOf[ZooKeeperStateStore]
- }
-
- it("should return error if an unknown recovery mode is set") {
- val conf = new LivyConf()
- conf.set(LivyConf.RECOVERY_MODE.key, "unknown")
- intercept[IllegalArgumentException] { StateStore.init(conf) }
- }
-
- it("should return error if an unknown state store is set") {
- intercept[IllegalArgumentException] { StateStore.init(createConf("unknown")) }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStoreSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStoreSpec.scala b/server/src/test/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStoreSpec.scala
deleted file mode 100644
index 860568f..0000000
--- a/server/src/test/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStoreSpec.scala
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.server.recovery
-
-import scala.collection.JavaConverters._
-
-import org.apache.curator.framework.CuratorFramework
-import org.apache.curator.framework.api._
-import org.apache.curator.framework.listen.Listenable
-import org.apache.zookeeper.data.Stat
-import org.mockito.Mockito._
-import org.scalatest.FunSpec
-import org.scalatest.Matchers._
-import org.scalatest.mock.MockitoSugar.mock
-
-import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf}
-
-class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
- describe("ZooKeeperStateStore") {
- case class TestFixture(stateStore: ZooKeeperStateStore, curatorClient: CuratorFramework)
- val conf = new LivyConf()
- conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "host")
- val key = "key"
- val prefixedKey = s"/livy/$key"
-
- def withMock[R](testBody: TestFixture => R): R = {
- val curatorClient = mock[CuratorFramework]
- when(curatorClient.getUnhandledErrorListenable())
- .thenReturn(mock[Listenable[UnhandledErrorListener]])
- val stateStore = new ZooKeeperStateStore(conf, Some(curatorClient))
- testBody(TestFixture(stateStore, curatorClient))
- }
-
- def mockExistsBuilder(curatorClient: CuratorFramework, exists: Boolean): Unit = {
- val existsBuilder = mock[ExistsBuilder]
- when(curatorClient.checkExists()).thenReturn(existsBuilder)
- if (exists) {
- when(existsBuilder.forPath(prefixedKey)).thenReturn(mock[Stat])
- }
- }
-
- it("should throw on bad config") {
- withMock { f =>
- val conf = new LivyConf()
- intercept[IllegalArgumentException] { new ZooKeeperStateStore(conf) }
-
- conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "host")
- conf.set(ZooKeeperStateStore.ZK_RETRY_CONF, "bad")
- intercept[IllegalArgumentException] { new ZooKeeperStateStore(conf) }
- }
- }
-
- it("set should use curatorClient") {
- withMock { f =>
- mockExistsBuilder(f.curatorClient, true)
-
- val setDataBuilder = mock[SetDataBuilder]
- when(f.curatorClient.setData()).thenReturn(setDataBuilder)
-
- f.stateStore.set("key", 1.asInstanceOf[Object])
-
- verify(f.curatorClient).start()
- verify(setDataBuilder).forPath(prefixedKey, Array[Byte](49))
- }
- }
-
- it("set should create parents if they don't exist") {
- withMock { f =>
- mockExistsBuilder(f.curatorClient, false)
-
- val createBuilder = mock[CreateBuilder]
- when(f.curatorClient.create()).thenReturn(createBuilder)
- val p = mock[ProtectACLCreateModePathAndBytesable[String]]
- when(createBuilder.creatingParentsIfNeeded()).thenReturn(p)
-
- f.stateStore.set("key", 1.asInstanceOf[Object])
-
- verify(f.curatorClient).start()
- verify(p).forPath(prefixedKey, Array[Byte](49))
- }
- }
-
- it("get should retrieve retry policy configs") {
- conf.set(com.cloudera.livy.server.recovery.ZooKeeperStateStore.ZK_RETRY_CONF, "11,77")
- withMock { f =>
- mockExistsBuilder(f.curatorClient, true)
-
- f.stateStore.retryPolicy should not be null
- f.stateStore.retryPolicy.getN shouldBe 11
- }
- }
-
- it("get should retrieve data from curatorClient") {
- withMock { f =>
- mockExistsBuilder(f.curatorClient, true)
-
- val getDataBuilder = mock[GetDataBuilder]
- when(f.curatorClient.getData()).thenReturn(getDataBuilder)
- when(getDataBuilder.forPath(prefixedKey)).thenReturn(Array[Byte](50))
-
- val v = f.stateStore.get[Int]("key")
-
- verify(f.curatorClient).start()
- v shouldBe Some(2)
- }
- }
-
- it("get should return None if key doesn't exist") {
- withMock { f =>
- mockExistsBuilder(f.curatorClient, false)
-
- val v = f.stateStore.get[Int]("key")
-
- verify(f.curatorClient).start()
- v shouldBe None
- }
- }
-
- it("getChildren should use curatorClient") {
- withMock { f =>
- mockExistsBuilder(f.curatorClient, true)
-
- val getChildrenBuilder = mock[GetChildrenBuilder]
- when(f.curatorClient.getChildren()).thenReturn(getChildrenBuilder)
- val children = List("abc", "def")
- when(getChildrenBuilder.forPath(prefixedKey)).thenReturn(children.asJava)
-
- val c = f.stateStore.getChildren("key")
-
- verify(f.curatorClient).start()
- c shouldBe children
- }
- }
-
- it("getChildren should return empty list if key doesn't exist") {
- withMock { f =>
- mockExistsBuilder(f.curatorClient, false)
-
- val c = f.stateStore.getChildren("key")
-
- verify(f.curatorClient).start()
- c shouldBe empty
- }
- }
-
- it("remove should use curatorClient") {
- withMock { f =>
- val deleteBuilder = mock[DeleteBuilder]
- when(f.curatorClient.delete()).thenReturn(deleteBuilder)
- val g = mock[ChildrenDeletable]
- when(deleteBuilder.guaranteed()).thenReturn(g)
-
- f.stateStore.remove(key)
-
- verify(g).forPath(prefixedKey)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/sessions/MockSession.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/com/cloudera/livy/sessions/MockSession.scala b/server/src/test/scala/com/cloudera/livy/sessions/MockSession.scala
deleted file mode 100644
index 9d129bc..0000000
--- a/server/src/test/scala/com/cloudera/livy/sessions/MockSession.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.sessions
-
-import com.cloudera.livy.LivyConf
-
-class MockSession(id: Int, owner: String, conf: LivyConf) extends Session(id, owner, conf) {
- case class RecoveryMetadata(id: Int) extends Session.RecoveryMetadata()
-
- override val proxyUser = None
-
- override protected def stopSession(): Unit = ()
-
- override def logLines(): IndexedSeq[String] = IndexedSeq()
-
- override def state: SessionState = SessionState.Idle()
-
- override def recoveryMetadata: RecoveryMetadata = RecoveryMetadata(0)
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala
deleted file mode 100644
index ef9b8c1..0000000
--- a/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.sessions
-
-import scala.concurrent.{Await, ExecutionContext, Future}
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.util.{Failure, Try}
-
-import org.mockito.Mockito.{doReturn, never, verify, when}
-import org.scalatest.{FunSpec, Matchers}
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.mock.MockitoSugar.mock
-
-import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf}
-import com.cloudera.livy.server.batch.{BatchRecoveryMetadata, BatchSession}
-import com.cloudera.livy.server.interactive.InteractiveSession
-import com.cloudera.livy.server.recovery.SessionStore
-import com.cloudera.livy.sessions.Session.RecoveryMetadata
-
-class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuite {
- implicit def executor: ExecutionContext = ExecutionContext.global
-
- describe("SessionManager") {
- it("should garbage collect old sessions") {
- val livyConf = new LivyConf()
- livyConf.set(LivyConf.SESSION_TIMEOUT, "100ms")
- val manager = new SessionManager[MockSession, RecoveryMetadata](
- livyConf,
- { _ => assert(false).asInstanceOf[MockSession] },
- mock[SessionStore],
- "test",
- Some(Seq.empty))
- val session = manager.register(new MockSession(manager.nextId(), null, livyConf))
- manager.get(session.id).isDefined should be(true)
- eventually(timeout(5 seconds), interval(100 millis)) {
- Await.result(manager.collectGarbage(), Duration.Inf)
- manager.get(session.id) should be(None)
- }
- }
-
- it("batch session should not be gc-ed until application is finished") {
- val sessionId = 24
- val session = mock[BatchSession]
- when(session.id).thenReturn(sessionId)
- when(session.stop()).thenReturn(Future {})
- when(session.lastActivity).thenReturn(System.nanoTime())
-
- val conf = new LivyConf().set(LivyConf.SESSION_STATE_RETAIN_TIME, "1s")
- val sm = new BatchSessionManager(conf, mock[SessionStore], Some(Seq(session)))
- testSessionGC(session, sm)
- }
-
- it("interactive session should not gc-ed if session timeout check is off") {
- val sessionId = 24
- val session = mock[InteractiveSession]
- when(session.id).thenReturn(sessionId)
- when(session.stop()).thenReturn(Future {})
- when(session.lastActivity).thenReturn(System.nanoTime())
-
- val conf = new LivyConf().set(LivyConf.SESSION_TIMEOUT_CHECK, false)
- .set(LivyConf.SESSION_STATE_RETAIN_TIME, "1s")
- val sm = new InteractiveSessionManager(conf, mock[SessionStore], Some(Seq(session)))
- testSessionGC(session, sm)
- }
-
- def testSessionGC(session: Session, sm: SessionManager[_, _]): Unit = {
-
- def changeStateAndCheck(s: SessionState)(fn: SessionManager[_, _] => Unit): Unit = {
- doReturn(s).when(session).state
- Await.result(sm.collectGarbage(), Duration.Inf)
- fn(sm)
- }
-
- // Batch session should not be gc-ed when alive
- for (s <- Seq(SessionState.Running(),
- SessionState.Idle(),
- SessionState.Recovering(),
- SessionState.NotStarted(),
- SessionState.Busy(),
- SessionState.ShuttingDown())) {
- changeStateAndCheck(s) { sm => sm.get(session.id) should be (Some(session)) }
- }
-
- // Stopped session should be gc-ed after retained timeout
- for (s <- Seq(SessionState.Error(),
- SessionState.Success(),
- SessionState.Dead())) {
- eventually(timeout(30 seconds), interval(100 millis)) {
- changeStateAndCheck(s) { sm => sm.get(session.id) should be (None) }
- }
- }
- }
- }
-
- describe("BatchSessionManager") {
- implicit def executor: ExecutionContext = ExecutionContext.global
-
- def makeMetadata(id: Int, appTag: String): BatchRecoveryMetadata = {
- BatchRecoveryMetadata(id, None, appTag, null, None)
- }
-
- def mockSession(id: Int): BatchSession = {
- val session = mock[BatchSession]
- when(session.id).thenReturn(id)
- when(session.stop()).thenReturn(Future {})
- when(session.lastActivity).thenReturn(System.nanoTime())
-
- session
- }
-
- it("should not fail if state store is empty") {
- val conf = new LivyConf()
-
- val sessionStore = mock[SessionStore]
- when(sessionStore.getAllSessions[BatchRecoveryMetadata]("batch"))
- .thenReturn(Seq.empty)
-
- val sm = new BatchSessionManager(conf, sessionStore)
- sm.nextId() shouldBe 0
- }
-
- it("should recover sessions from state store") {
- val conf = new LivyConf()
- conf.set(LivyConf.LIVY_SPARK_MASTER.key, "yarn-cluster")
-
- val sessionType = "batch"
- val nextId = 99
-
- val validMetadata = List(makeMetadata(0, "t1"), makeMetadata(77, "t2")).map(Try(_))
- val invalidMetadata = List(Failure(new Exception("Fake invalid metadata")))
- val sessionStore = mock[SessionStore]
- when(sessionStore.getNextSessionId(sessionType)).thenReturn(nextId)
- when(sessionStore.getAllSessions[BatchRecoveryMetadata](sessionType))
- .thenReturn(validMetadata ++ invalidMetadata)
-
- val sm = new BatchSessionManager(conf, sessionStore)
- sm.nextId() shouldBe nextId
- validMetadata.foreach { m =>
- sm.get(m.get.id) shouldBe defined
- }
- sm.size shouldBe validMetadata.size
- }
-
- it("should delete sessions from state store") {
- val conf = new LivyConf()
-
- val sessionType = "batch"
- val sessionId = 24
- val sessionStore = mock[SessionStore]
- val session = mockSession(sessionId)
-
- val sm = new BatchSessionManager(conf, sessionStore, Some(Seq(session)))
- sm.get(sessionId) shouldBe defined
-
- Await.ready(sm.delete(sessionId).get, 30 seconds)
-
- verify(sessionStore).remove(sessionType, sessionId)
- sm.get(sessionId) shouldBe None
- }
-
- it("should delete sessions on shutdown when recovery is off") {
- val conf = new LivyConf()
- val sessionId = 24
- val sessionStore = mock[SessionStore]
- val session = mockSession(sessionId)
-
- val sm = new BatchSessionManager(conf, sessionStore, Some(Seq(session)))
- sm.get(sessionId) shouldBe defined
- sm.shutdown()
-
- verify(session).stop()
- }
-
- it("should not delete sessions on shutdown with recovery is on") {
- val conf = new LivyConf()
- conf.set(LivyConf.RECOVERY_MODE, SessionManager.SESSION_RECOVERY_MODE_RECOVERY)
-
- val sessionId = 24
- val sessionStore = mock[SessionStore]
- val session = mockSession(sessionId)
-
- val sm = new BatchSessionManager(conf, sessionStore, Some(Seq(session)))
- sm.get(sessionId) shouldBe defined
- sm.shutdown()
-
- verify(session, never).stop()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/sessions/SessionSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/com/cloudera/livy/sessions/SessionSpec.scala b/server/src/test/scala/com/cloudera/livy/sessions/SessionSpec.scala
deleted file mode 100644
index b45a0ed..0000000
--- a/server/src/test/scala/com/cloudera/livy/sessions/SessionSpec.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.sessions
-
-import java.net.URI
-
-import org.scalatest.FunSuite
-
-import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf}
-
-class SessionSpec extends FunSuite with LivyBaseUnitTestSuite {
-
- test("use default fs in paths") {
- val conf = new LivyConf(false)
- conf.hadoopConf.set("fs.defaultFS", "dummy:///")
-
- val uris = Seq("http://example.com/foo", "hdfs:/bar", "/baz")
- val expected = Seq(uris(0), uris(1), "dummy:///baz")
- assert(Session.resolveURIs(uris, conf) === expected)
-
- intercept[IllegalArgumentException] {
- Session.resolveURI(new URI("relative_path"), conf)
- }
- }
-
- test("local fs whitelist") {
- val conf = new LivyConf(false)
- conf.set(LivyConf.LOCAL_FS_WHITELIST, "/allowed/,/also_allowed")
-
- Seq("/allowed/file", "/also_allowed/file").foreach { path =>
- assert(Session.resolveURI(new URI(path), conf) === new URI("file://" + path))
- }
-
- Seq("/not_allowed", "/allowed_not_really").foreach { path =>
- intercept[IllegalArgumentException] {
- Session.resolveURI(new URI(path), conf)
- }
- }
- }
-
- test("conf validation and preparation") {
- val conf = new LivyConf(false)
- conf.hadoopConf.set("fs.defaultFS", "dummy:///")
- conf.set(LivyConf.LOCAL_FS_WHITELIST, "/allowed")
-
- // Test baseline.
- assert(Session.prepareConf(Map(), Nil, Nil, Nil, Nil, conf) === Map("spark.master" -> "local"))
-
- // Test validations.
- intercept[IllegalArgumentException] {
- Session.prepareConf(Map("spark.do_not_set" -> "1"), Nil, Nil, Nil, Nil, conf)
- }
- conf.sparkFileLists.foreach { key =>
- intercept[IllegalArgumentException] {
- Session.prepareConf(Map(key -> "file:/not_allowed"), Nil, Nil, Nil, Nil, conf)
- }
- }
- intercept[IllegalArgumentException] {
- Session.prepareConf(Map(), Seq("file:/not_allowed"), Nil, Nil, Nil, conf)
- }
- intercept[IllegalArgumentException] {
- Session.prepareConf(Map(), Nil, Seq("file:/not_allowed"), Nil, Nil, conf)
- }
- intercept[IllegalArgumentException] {
- Session.prepareConf(Map(), Nil, Nil, Seq("file:/not_allowed"), Nil, conf)
- }
- intercept[IllegalArgumentException] {
- Session.prepareConf(Map(), Nil, Nil, Nil, Seq("file:/not_allowed"), conf)
- }
-
- // Test that file lists are merged and resolved.
- val base = "/file1.txt"
- val other = Seq("/file2.txt")
- val expected = Some(Seq("dummy://" + other(0), "dummy://" + base).mkString(","))
-
- val userLists = Seq(LivyConf.SPARK_JARS, LivyConf.SPARK_FILES, LivyConf.SPARK_ARCHIVES,
- LivyConf.SPARK_PY_FILES)
- val baseConf = userLists.map { key => (key -> base) }.toMap
- val result = Session.prepareConf(baseConf, other, other, other, other, conf)
- userLists.foreach { key => assert(result.get(key) === expected) }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/utils/LivySparkUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/com/cloudera/livy/utils/LivySparkUtilsSuite.scala b/server/src/test/scala/com/cloudera/livy/utils/LivySparkUtilsSuite.scala
deleted file mode 100644
index b01194a..0000000
--- a/server/src/test/scala/com/cloudera/livy/utils/LivySparkUtilsSuite.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.utils
-
-import org.scalatest.FunSuite
-import org.scalatest.Matchers
-
-import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf}
-import com.cloudera.livy.LivyConf._
-import com.cloudera.livy.server.LivyServer
-
-class LivySparkUtilsSuite extends FunSuite with Matchers with LivyBaseUnitTestSuite {
-
- import LivySparkUtils._
-
- private val livyConf = new LivyConf()
- private val livyConf210 = new LivyConf()
- livyConf210.set(LIVY_SPARK_SCALA_VERSION, "2.10.6")
-
- private val livyConf211 = new LivyConf()
- livyConf211.set(LIVY_SPARK_SCALA_VERSION, "2.11.1")
-
- test("check for SPARK_HOME") {
- testSparkHome(livyConf)
- }
-
- test("check spark-submit version") {
- testSparkSubmit(livyConf)
- }
-
- test("should support Spark 1.6") {
- testSparkVersion("1.6.0")
- testSparkVersion("1.6.1")
- testSparkVersion("1.6.1-SNAPSHOT")
- testSparkVersion("1.6.2")
- testSparkVersion("1.6")
- testSparkVersion("1.6.3.2.5.0-12")
- }
-
- test("should support Spark 2.0.x") {
- testSparkVersion("2.0.0")
- testSparkVersion("2.0.1")
- testSparkVersion("2.0.2")
- testSparkVersion("2.0.3-SNAPSHOT")
- testSparkVersion("2.0.0.2.5.1.0-56") // LIVY-229
- testSparkVersion("2.0")
- testSparkVersion("2.1.0")
- testSparkVersion("2.1.1")
- }
-
- test("should not support Spark older than 1.6") {
- intercept[IllegalArgumentException] { testSparkVersion("1.4.0") }
- intercept[IllegalArgumentException] { testSparkVersion("1.5.0") }
- intercept[IllegalArgumentException] { testSparkVersion("1.5.1") }
- intercept[IllegalArgumentException] { testSparkVersion("1.5.2") }
- intercept[IllegalArgumentException] { testSparkVersion("1.5.0-cdh5.6.1") }
- }
-
- test("should fail on bad version") {
- intercept[IllegalArgumentException] { testSparkVersion("not a version") }
- }
-
- test("should error out if recovery is turned on but master isn't yarn") {
- val livyConf = new LivyConf()
- livyConf.set(LivyConf.LIVY_SPARK_MASTER, "local")
- livyConf.set(LivyConf.RECOVERY_MODE, "recovery")
- val s = new LivyServer()
- intercept[IllegalArgumentException] { s.testRecovery(livyConf) }
- }
-
- test("formatScalaVersion() should format Scala version") {
- formatScalaVersion("2.10.8") shouldBe "2.10"
- formatScalaVersion("2.11.4") shouldBe "2.11"
- formatScalaVersion("2.10") shouldBe "2.10"
- formatScalaVersion("2.10.x.x.x.x") shouldBe "2.10"
-
- // Throw exception for bad Scala version.
- intercept[IllegalArgumentException] { formatScalaVersion("") }
- intercept[IllegalArgumentException] { formatScalaVersion("xxx") }
- }
-
- test("defaultSparkScalaVersion() should return default Scala version") {
- defaultSparkScalaVersion(formatSparkVersion("1.6.0")) shouldBe "2.10"
- defaultSparkScalaVersion(formatSparkVersion("1.6.1")) shouldBe "2.10"
- defaultSparkScalaVersion(formatSparkVersion("1.6.2")) shouldBe "2.10"
- defaultSparkScalaVersion(formatSparkVersion("2.0.0")) shouldBe "2.11"
- defaultSparkScalaVersion(formatSparkVersion("2.0.1")) shouldBe "2.11"
-
- // Throw exception for unsupported Spark version.
- intercept[IllegalArgumentException] { defaultSparkScalaVersion(formatSparkVersion("1.5.0")) }
- }
-
- test("sparkScalaVersion() should use spark-submit detected Scala version.") {
- sparkScalaVersion(formatSparkVersion("2.0.1"), Some("2.10"), livyConf) shouldBe "2.10"
- sparkScalaVersion(formatSparkVersion("1.6.0"), Some("2.11"), livyConf) shouldBe "2.11"
- }
-
- test("sparkScalaVersion() should throw if configured and detected Scala version mismatch.") {
- intercept[IllegalArgumentException] {
- sparkScalaVersion(formatSparkVersion("2.0.1"), Some("2.11"), livyConf210)
- }
- intercept[IllegalArgumentException] {
- sparkScalaVersion(formatSparkVersion("1.6.1"), Some("2.10"), livyConf211)
- }
- }
-
- test("sparkScalaVersion() should use configured Scala version if spark-submit doesn't tell.") {
- sparkScalaVersion(formatSparkVersion("1.6.0"), None, livyConf210) shouldBe "2.10"
- sparkScalaVersion(formatSparkVersion("1.6.2"), None, livyConf210) shouldBe "2.10"
- sparkScalaVersion(formatSparkVersion("2.0.0"), None, livyConf210) shouldBe "2.10"
- sparkScalaVersion(formatSparkVersion("2.0.1"), None, livyConf210) shouldBe "2.10"
- sparkScalaVersion(formatSparkVersion("1.6.0"), None, livyConf211) shouldBe "2.11"
- sparkScalaVersion(formatSparkVersion("1.6.2"), None, livyConf211) shouldBe "2.11"
- sparkScalaVersion(formatSparkVersion("2.0.0"), None, livyConf211) shouldBe "2.11"
- sparkScalaVersion(formatSparkVersion("2.0.1"), None, livyConf211) shouldBe "2.11"
- }
-
- test("sparkScalaVersion() should use default Spark Scala version.") {
- sparkScalaVersion(formatSparkVersion("1.6.0"), None, livyConf) shouldBe "2.10"
- sparkScalaVersion(formatSparkVersion("1.6.2"), None, livyConf) shouldBe "2.10"
- sparkScalaVersion(formatSparkVersion("2.0.0"), None, livyConf) shouldBe "2.11"
- sparkScalaVersion(formatSparkVersion("2.0.1"), None, livyConf) shouldBe "2.11"
- sparkScalaVersion(formatSparkVersion("2.1.0"), None, livyConf) shouldBe "2.11"
- }
-}