You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/07/05 03:06:58 UTC
[29/33] incubator-livy git commit: LIVY-375. Change Livy code package
name to org.apache.livy
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index e32ca78..e15c470 100644
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -20,14 +20,14 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-main</artifactId>
<relativePath>../pom.xml</relativePath>
- <version>0.4.0-SNAPSHOT</version>
+ <version>0.4.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>livy-integration-test</artifactId>
- <version>0.4.0-SNAPSHOT</version>
+ <version>0.4.0-incubating-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala b/integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala
deleted file mode 100644
index 039ee0a..0000000
--- a/integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.framework
-
-import java.io.File
-import java.util.UUID
-
-import scala.concurrent._
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.util.control.NonFatal
-
-import com.ning.http.client.AsyncHttpClient
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.scalatest._
-
-abstract class BaseIntegrationTestSuite extends FunSuite with Matchers with BeforeAndAfterAll {
- import scala.concurrent.ExecutionContext.Implicits.global
-
- var cluster: Cluster = _
- var httpClient: AsyncHttpClient = _
- var livyClient: LivyRestClient = _
-
- protected def livyEndpoint: String = cluster.livyEndpoint
-
- protected val testLib = sys.props("java.class.path")
- .split(File.pathSeparator)
- .find(new File(_).getName().startsWith("livy-test-lib-"))
- .getOrElse(throw new Exception(s"Cannot find test lib in ${sys.props("java.class.path")}"))
-
- protected def getYarnLog(appId: String): String = {
- require(appId != null, "appId shouldn't be null")
-
- val appReport = cluster.yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId))
- assert(appReport != null, "appReport shouldn't be null")
-
- appReport.getDiagnostics()
- }
-
- protected def restartLivy(): Unit = {
- val f = future {
- cluster.stopLivy()
- cluster.runLivy()
- }
- Await.result(f, 3 minutes)
- }
-
- /** Uploads a file to HDFS and returns just its path. */
- protected def uploadToHdfs(file: File): String = {
- val hdfsPath = new Path(cluster.hdfsScratchDir(),
- UUID.randomUUID().toString() + "-" + file.getName())
- cluster.fs.copyFromLocalFile(new Path(file.toURI()), hdfsPath)
- hdfsPath.toUri().getPath()
- }
-
- /** Wrapper around test() to be used by pyspark tests. */
- protected def pytest(desc: String)(testFn: => Unit): Unit = {
- test(desc) {
- assume(cluster.isRealSpark(), "PySpark tests require a real Spark installation.")
- testFn
- }
- }
-
- /** Wrapper around test() to be used by SparkR tests. */
- protected def rtest(desc: String)(testFn: => Unit): Unit = {
- test(desc) {
- assume(!sys.props.getOrElse("skipRTests", "false").toBoolean, "Skipping R tests.")
- assume(cluster.isRealSpark(), "SparkR tests require a real Spark installation.")
- assume(cluster.hasSparkR(), "Spark under test does not support R.")
- testFn
- }
- }
-
- /** Clean up session and show info when test fails. */
- protected def withSession[S <: LivyRestClient#Session, R]
- (s: S)
- (f: (S) => R): R = {
- try {
- f(s)
- } catch {
- case NonFatal(e) =>
- try {
- val state = s.snapshot()
- info(s"Final session state: $state")
- state.appId.foreach { id => info(s"YARN diagnostics: ${getYarnLog(id)}") }
- } catch { case NonFatal(_) => }
- throw e
- } finally {
- try {
- s.stop()
- } catch {
- case NonFatal(e) => alert(s"Failed to stop session: $e")
- }
- }
- }
-
- // We need beforeAll() here because BatchIT's beforeAll() has to be executed after this.
- // Please create an issue if this breaks test logging for cluster creation.
- protected override def beforeAll() = {
- cluster = Cluster.get()
- httpClient = new AsyncHttpClient()
- livyClient = new LivyRestClient(httpClient, livyEndpoint)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/main/scala/com/cloudera/livy/test/framework/Cluster.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/main/scala/com/cloudera/livy/test/framework/Cluster.scala b/integration-test/src/main/scala/com/cloudera/livy/test/framework/Cluster.scala
deleted file mode 100644
index 06fa11d..0000000
--- a/integration-test/src/main/scala/com/cloudera/livy/test/framework/Cluster.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.framework
-
-import java.io._
-import java.nio.charset.StandardCharsets.UTF_8
-import java.util.Properties
-
-import scala.collection.JavaConverters._
-import scala.util.Try
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.yarn.client.api.YarnClient
-
-import com.cloudera.livy.Logging
-
-/**
- * An common interface to run test on real cluster and mini cluster.
- */
-trait Cluster {
- def deploy(): Unit
- def cleanUp(): Unit
- def configDir(): File
- def isRealSpark(): Boolean
- def hasSparkR(): Boolean
-
- def runLivy(): Unit
- def stopLivy(): Unit
- def livyEndpoint: String
- def hdfsScratchDir(): Path
-
- def doAsClusterUser[T](task: => T): T
-
- lazy val hadoopConf = {
- val conf = new Configuration(false)
- configDir().listFiles().foreach { f =>
- if (f.getName().endsWith(".xml")) {
- conf.addResource(new Path(f.toURI()))
- }
- }
- conf
- }
-
- lazy val yarnConf = {
- val conf = new Configuration(false)
- conf.addResource(new Path(s"${configDir().getCanonicalPath}/yarn-site.xml"))
- conf
- }
-
- lazy val fs = doAsClusterUser {
- FileSystem.get(hadoopConf)
- }
-
- lazy val yarnClient = doAsClusterUser {
- val c = YarnClient.createYarnClient()
- c.init(yarnConf)
- c.start()
- c
- }
-}
-
-object Cluster extends Logging {
- private val CLUSTER_TYPE = "cluster.type"
-
- private lazy val config = {
- sys.props.get("cluster.spec")
- .filter { path => path.nonEmpty && path != "default" }
- .map { path =>
- val in = Option(getClass.getClassLoader.getResourceAsStream(path))
- .getOrElse(new FileInputStream(path))
- val p = new Properties()
- val reader = new InputStreamReader(in, UTF_8)
- try {
- p.load(reader)
- } finally {
- reader.close()
- }
- p.asScala.toMap
- }
- .getOrElse(Map(CLUSTER_TYPE -> "mini"))
- }
-
- private lazy val cluster = {
- var _cluster: Cluster = null
- try {
- _cluster = config.get(CLUSTER_TYPE) match {
- case Some("real") => new RealCluster(config)
- case Some("mini") => new MiniCluster(config)
- case t => throw new Exception(s"Unknown or unset cluster.type $t")
- }
- Runtime.getRuntime.addShutdownHook(new Thread {
- override def run(): Unit = {
- info("Shutting down cluster pool.")
- _cluster.cleanUp()
- }
- })
- _cluster.deploy()
- } catch {
- case e: Throwable =>
- error("Failed to initialize cluster.", e)
- Option(_cluster).foreach { c =>
- Try(c.cleanUp()).recover { case e =>
- error("Furthermore, failed to clean up cluster after failure.", e)
- }
- }
- throw e
- }
- _cluster
- }
-
- def get(): Cluster = cluster
-
- def isRunningOnTravis: Boolean = sys.env.contains("TRAVIS")
-}
-
-trait ClusterUtils {
-
- protected def saveProperties(props: Map[String, String], dest: File): Unit = {
- val jprops = new Properties()
- props.foreach { case (k, v) => jprops.put(k, v) }
-
- val tempFile = new File(dest.getAbsolutePath() + ".tmp")
- val out = new OutputStreamWriter(new FileOutputStream(tempFile), UTF_8)
- try {
- jprops.store(out, "Configuration")
- } finally {
- out.close()
- }
- tempFile.renameTo(dest)
- }
-
- protected def loadProperties(file: File): Map[String, String] = {
- val in = new InputStreamReader(new FileInputStream(file), UTF_8)
- val props = new Properties()
- try {
- props.load(in)
- } finally {
- in.close()
- }
- props.asScala.toMap
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/main/scala/com/cloudera/livy/test/framework/LivyRestClient.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/main/scala/com/cloudera/livy/test/framework/LivyRestClient.scala b/integration-test/src/main/scala/com/cloudera/livy/test/framework/LivyRestClient.scala
deleted file mode 100644
index ec63193..0000000
--- a/integration-test/src/main/scala/com/cloudera/livy/test/framework/LivyRestClient.scala
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.cloudera.livy.test.framework
-
-import java.util.regex.Pattern
-import javax.servlet.http.HttpServletResponse
-
-import scala.annotation.tailrec
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.util.{Either, Left, Right}
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import com.ning.http.client.AsyncHttpClient
-import com.ning.http.client.Response
-import org.apache.hadoop.yarn.api.records.ApplicationId
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.scalatest.concurrent.Eventually._
-
-import com.cloudera.livy.server.batch.CreateBatchRequest
-import com.cloudera.livy.server.interactive.CreateInteractiveRequest
-import com.cloudera.livy.sessions.{Kind, SessionKindModule, SessionState}
-import com.cloudera.livy.utils.AppInfo
-
-object LivyRestClient {
- private val BATCH_TYPE = "batches"
- private val INTERACTIVE_TYPE = "sessions"
-
- // TODO Define these in production code and share them with test code.
- @JsonIgnoreProperties(ignoreUnknown = true)
- private case class StatementResult(id: Int, state: String, output: Map[String, Any])
-
- @JsonIgnoreProperties(ignoreUnknown = true)
- case class StatementError(ename: String, evalue: String, stackTrace: Seq[String])
-
- @JsonIgnoreProperties(ignoreUnknown = true)
- case class SessionSnapshot(
- id: Int,
- appId: Option[String],
- state: String,
- appInfo: AppInfo,
- log: IndexedSeq[String])
-}
-
-class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String) {
- import LivyRestClient._
-
- val mapper = new ObjectMapper()
- .registerModule(DefaultScalaModule)
- .registerModule(new SessionKindModule())
-
- class Session(val id: Int, sessionType: String) {
- val url: String = s"$livyEndpoint/$sessionType/$id"
-
- def appId(): ApplicationId = {
- ConverterUtils.toApplicationId(snapshot().appId.get)
- }
-
- def snapshot(): SessionSnapshot = {
- val r = httpClient.prepareGet(url).execute().get()
- assertStatusCode(r, HttpServletResponse.SC_OK)
-
- mapper.readValue(r.getResponseBodyAsStream, classOf[SessionSnapshot])
- }
-
- def stop(): Unit = {
- httpClient.prepareDelete(url).execute().get()
-
- eventually(timeout(30 seconds), interval(1 second)) {
- verifySessionDoesNotExist()
- }
- }
-
- def verifySessionState(state: SessionState): Unit = {
- verifySessionState(Set(state))
- }
-
- def verifySessionState(states: Set[SessionState]): Unit = {
- val t = if (Cluster.isRunningOnTravis) 5.minutes else 2.minutes
- val strStates = states.map(_.toString)
- // Travis uses very slow VM. It needs a longer timeout.
- // Keeping the original timeout to avoid slowing down local development.
- eventually(timeout(t), interval(1 second)) {
- val s = snapshot().state
- assert(strStates.contains(s), s"Session $id state $s doesn't equal one of $strStates")
- }
- }
-
- def verifySessionDoesNotExist(): Unit = {
- val r = httpClient.prepareGet(url).execute().get()
- assertStatusCode(r, HttpServletResponse.SC_NOT_FOUND)
- }
- }
-
- class BatchSession(id: Int) extends Session(id, BATCH_TYPE) {
- def verifySessionDead(): Unit = verifySessionState(SessionState.Dead())
- def verifySessionRunning(): Unit = verifySessionState(SessionState.Running())
- def verifySessionSuccess(): Unit = verifySessionState(SessionState.Success())
- }
-
- class InteractiveSession(id: Int) extends Session(id, INTERACTIVE_TYPE) {
- class Statement(code: String) {
- val stmtId = {
- val requestBody = Map("code" -> code)
- val r = httpClient.preparePost(s"$url/statements")
- .setBody(mapper.writeValueAsString(requestBody))
- .execute()
- .get()
- assertStatusCode(r, HttpServletResponse.SC_CREATED)
-
- val newStmt = mapper.readValue(r.getResponseBodyAsStream, classOf[StatementResult])
- newStmt.id
- }
-
- final def result(): Either[String, StatementError] = {
- eventually(timeout(1 minute), interval(1 second)) {
- val r = httpClient.prepareGet(s"$url/statements/$stmtId")
- .execute()
- .get()
- assertStatusCode(r, HttpServletResponse.SC_OK)
-
- val newStmt = mapper.readValue(r.getResponseBodyAsStream, classOf[StatementResult])
- assert(newStmt.state == "available", s"Statement isn't available: ${newStmt.state}")
-
- val output = newStmt.output
- output.get("status") match {
- case Some("ok") =>
- val data = output("data").asInstanceOf[Map[String, Any]]
- var rst = data.getOrElse("text/plain", "")
- val magicRst = data.getOrElse("application/vnd.livy.table.v1+json", null)
- if (magicRst != null) {
- rst = mapper.writeValueAsString(magicRst)
- }
- Left(rst.asInstanceOf[String])
- case Some("error") => Right(mapper.convertValue(output, classOf[StatementError]))
- case Some(status) =>
- throw new IllegalStateException(s"Unknown statement $stmtId status: $status")
- case None =>
- throw new IllegalStateException(s"Unknown statement $stmtId output: $newStmt")
- }
- }
- }
-
- def verifyResult(expectedRegex: String): Unit = {
- result() match {
- case Left(result) =>
- if (expectedRegex != null) {
- matchStrings(result, expectedRegex)
- }
- case Right(error) =>
- assert(false, s"Got error from statement $stmtId $code: ${error.evalue}")
- }
- }
-
- def verifyError(
- ename: String = null, evalue: String = null, stackTrace: String = null): Unit = {
- result() match {
- case Left(result) =>
- assert(false, s"Statement $stmtId `$code` expected to fail, but succeeded.")
- case Right(error) =>
- val remoteStack = Option(error.stackTrace).getOrElse(Nil).mkString("\n")
- Seq(error.ename -> ename, error.evalue -> evalue, remoteStack -> stackTrace).foreach {
- case (actual, expected) if expected != null => matchStrings(actual, expected)
- case _ =>
- }
- }
- }
-
- private def matchStrings(actual: String, expected: String): Unit = {
- val regex = Pattern.compile(expected, Pattern.DOTALL)
- assert(regex.matcher(actual).matches(), s"$actual did not match regex $expected")
- }
- }
-
- def run(code: String): Statement = { new Statement(code) }
-
- def runFatalStatement(code: String): Unit = {
- val requestBody = Map("code" -> code)
- val r = httpClient.preparePost(s"$url/statements")
- .setBody(mapper.writeValueAsString(requestBody))
- .execute()
-
- verifySessionState(SessionState.Dead())
- }
-
- def verifySessionIdle(): Unit = {
- verifySessionState(SessionState.Idle())
- }
- }
-
- def startBatch(
- file: String,
- className: Option[String],
- args: List[String],
- sparkConf: Map[String, String]): BatchSession = {
- val r = new CreateBatchRequest()
- r.file = file
- r.className = className
- r.args = args
- r.conf = Map("spark.yarn.maxAppAttempts" -> "1") ++ sparkConf
-
- val id = start(BATCH_TYPE, mapper.writeValueAsString(r))
- new BatchSession(id)
- }
-
- def startSession(
- kind: Kind,
- sparkConf: Map[String, String],
- heartbeatTimeoutInSecond: Int): InteractiveSession = {
- val r = new CreateInteractiveRequest()
- r.kind = kind
- r.conf = sparkConf
- r.heartbeatTimeoutInSecond = heartbeatTimeoutInSecond
-
- val id = start(INTERACTIVE_TYPE, mapper.writeValueAsString(r))
- new InteractiveSession(id)
- }
-
- def connectSession(id: Int): InteractiveSession = { new InteractiveSession(id) }
-
- private def start(sessionType: String, body: String): Int = {
- val r = httpClient.preparePost(s"$livyEndpoint/$sessionType")
- .setBody(body)
- .execute()
- .get()
-
- assertStatusCode(r, HttpServletResponse.SC_CREATED)
-
- val newSession = mapper.readValue(r.getResponseBodyAsStream, classOf[SessionSnapshot])
- newSession.id
- }
-
- private def assertStatusCode(r: Response, expected: Int): Unit = {
- def pretty(r: Response): String = {
- s"${r.getStatusCode} ${r.getResponseBody}"
- }
- assert(r.getStatusCode() == expected, s"HTTP status code != $expected: ${pretty(r)}")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/main/scala/com/cloudera/livy/test/framework/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/main/scala/com/cloudera/livy/test/framework/MiniCluster.scala b/integration-test/src/main/scala/com/cloudera/livy/test/framework/MiniCluster.scala
deleted file mode 100644
index edb25c6..0000000
--- a/integration-test/src/main/scala/com/cloudera/livy/test/framework/MiniCluster.scala
+++ /dev/null
@@ -1,386 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.framework
-
-import java.io._
-import java.nio.charset.Charset
-import java.nio.file.{Files, Paths}
-import javax.servlet.http.HttpServletResponse
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import com.ning.http.client.AsyncHttpClient
-import org.apache.commons.io.FileUtils
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hdfs.MiniDFSCluster
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.server.MiniYARNCluster
-import org.apache.spark.launcher.SparkLauncher
-import org.scalatest.concurrent.Eventually._
-
-import com.cloudera.livy.{LivyConf, Logging}
-import com.cloudera.livy.client.common.TestUtils
-import com.cloudera.livy.server.LivyServer
-
-private class MiniClusterConfig(val config: Map[String, String]) {
-
- val nmCount = getInt("yarn.nm-count", 1)
- val localDirCount = getInt("yarn.local-dir-count", 1)
- val logDirCount = getInt("yarn.log-dir-count", 1)
- val dnCount = getInt("hdfs.dn-count", 1)
-
- private def getInt(key: String, default: Int): Int = {
- config.get(key).map(_.toInt).getOrElse(default)
- }
-
-}
-
-sealed trait MiniClusterUtils extends ClusterUtils {
- private val livySparkScalaVersionEnvVarName = "LIVY_SPARK_SCALA_VERSION"
-
- protected def getSparkScalaVersion(): String = {
- sys.env.getOrElse(livySparkScalaVersionEnvVarName, {
- throw new RuntimeException(s"Please specify env var $livySparkScalaVersionEnvVarName.")
- })
- }
-
- protected def saveConfig(conf: Configuration, dest: File): Unit = {
- val redacted = new Configuration(conf)
- // This setting references a test class that is not available when using a real Spark
- // installation, so remove it from client configs.
- redacted.unset("net.topology.node.switch.mapping.impl")
-
- val out = new FileOutputStream(dest)
- try {
- redacted.writeXml(out)
- } finally {
- out.close()
- }
- }
-
-}
-
-sealed abstract class MiniClusterBase extends MiniClusterUtils with Logging {
-
- def main(args: Array[String]): Unit = {
- val klass = getClass().getSimpleName()
-
- info(s"$klass is starting up.")
-
- val Array(configPath) = args
- val config = {
- val file = new File(s"$configPath/cluster.conf")
- val props = loadProperties(file)
- new MiniClusterConfig(props)
- }
- start(config, configPath)
-
- info(s"$klass running.")
-
- while (true) synchronized {
- wait()
- }
- }
-
- protected def start(config: MiniClusterConfig, configPath: String): Unit
-
-}
-
-object MiniHdfsMain extends MiniClusterBase {
-
- override protected def start(config: MiniClusterConfig, configPath: String): Unit = {
- val hadoopConf = new Configuration()
- val hdfsCluster = new MiniDFSCluster.Builder(hadoopConf)
- .numDataNodes(config.dnCount)
- .format(true)
- .waitSafeMode(true)
- .build()
-
- hdfsCluster.waitActive()
-
- saveConfig(hadoopConf, new File(configPath + "/core-site.xml"))
- }
-
-}
-
-object MiniYarnMain extends MiniClusterBase {
-
- override protected def start(config: MiniClusterConfig, configPath: String): Unit = {
- val baseConfig = new YarnConfiguration()
- var yarnCluster = new MiniYARNCluster(getClass().getName(), config.nmCount,
- config.localDirCount, config.logDirCount)
- yarnCluster.init(baseConfig)
-
- // Install a shutdown hook for stop the service and kill all running applications.
- Runtime.getRuntime().addShutdownHook(new Thread() {
- override def run(): Unit = yarnCluster.stop()
- })
-
- yarnCluster.start()
-
- // Workaround for YARN-2642.
- val yarnConfig = yarnCluster.getConfig()
- eventually(timeout(30 seconds), interval(100 millis)) {
- assert(yarnConfig.get(YarnConfiguration.RM_ADDRESS).split(":")(1) != "0",
- "RM not up yes.")
- }
-
- info(s"RM address in configuration is ${yarnConfig.get(YarnConfiguration.RM_ADDRESS)}")
- saveConfig(yarnConfig, new File(configPath + "/yarn-site.xml"))
- }
-
-}
-
-object MiniLivyMain extends MiniClusterBase {
- var livyUrl: Option[String] = None
-
- def start(config: MiniClusterConfig, configPath: String): Unit = {
- var livyConf = Map(
- LivyConf.LIVY_SPARK_MASTER.key -> "yarn",
- LivyConf.LIVY_SPARK_DEPLOY_MODE.key -> "cluster",
- LivyConf.LIVY_SPARK_SCALA_VERSION.key -> getSparkScalaVersion(),
- LivyConf.HEARTBEAT_WATCHDOG_INTERVAL.key -> "1s",
- LivyConf.YARN_POLL_INTERVAL.key -> "500ms",
- LivyConf.RECOVERY_MODE.key -> "recovery",
- LivyConf.RECOVERY_STATE_STORE.key -> "filesystem",
- LivyConf.RECOVERY_STATE_STORE_URL.key -> s"file://$configPath/state-store")
-
- if (Cluster.isRunningOnTravis) {
- livyConf ++= Map("livy.server.yarn.app-lookup-timeout" -> "2m")
- }
-
- saveProperties(livyConf, new File(configPath + "/livy.conf"))
-
- val server = new LivyServer()
- server.start()
- server.livyConf.set(LivyConf.ENABLE_HIVE_CONTEXT, true)
- // Write a serverUrl.conf file to the conf directory with the location of the Livy
- // server. Do it atomically since it's used by MiniCluster to detect when the Livy server
- // is up and ready.
- eventually(timeout(30 seconds), interval(1 second)) {
- val serverUrlConf = Map("livy.server.server-url" -> server.serverUrl())
- saveProperties(serverUrlConf, new File(configPath + "/serverUrl.conf"))
- }
- }
-}
-
-private case class ProcessInfo(process: Process, logFile: File)
-
-/**
- * Cluster implementation that uses HDFS / YARN mini clusters running as sub-processes locally.
- * Launching Livy through this mini cluster results in three child processes:
- *
- * - A HDFS mini cluster
- * - A YARN mini cluster
- * - The Livy server
- *
- * Each service will write its client configuration to a temporary directory managed by the
- * framework, so that applications can connect to the services.
- *
- * TODO: add support for MiniKdc.
- */
-class MiniCluster(config: Map[String, String]) extends Cluster with MiniClusterUtils with Logging {
- private val tempDir = new File(s"${sys.props("java.io.tmpdir")}/livy-int-test")
- private var sparkConfDir: File = _
- private var _configDir: File = _
- private var hdfs: Option[ProcessInfo] = None
- private var yarn: Option[ProcessInfo] = None
- private var livy: Option[ProcessInfo] = None
- private var livyUrl: String = _
- private var _hdfsScrathDir: Path = _
-
- override def configDir(): File = _configDir
-
- override def hdfsScratchDir(): Path = _hdfsScrathDir
-
- override def isRealSpark(): Boolean = {
- new File(sys.env("SPARK_HOME") + File.separator + "RELEASE").isFile()
- }
-
- override def hasSparkR(): Boolean = {
- val path = Seq(sys.env("SPARK_HOME"), "R", "lib", "sparkr.zip").mkString(File.separator)
- new File(path).isFile()
- }
-
- override def doAsClusterUser[T](task: => T): T = task
-
- // Explicitly remove the "test-lib" dependency from the classpath of child processes. We
- // want tests to explicitly upload this jar when necessary, to test those code paths.
- private val childClasspath = {
- val cp = sys.props("java.class.path").split(File.pathSeparator)
- val filtered = cp.filter { path => !new File(path).getName().startsWith("livy-test-lib-") }
- assert(cp.size != filtered.size, "livy-test-lib jar not found in classpath!")
- filtered.mkString(File.pathSeparator)
- }
-
- override def deploy(): Unit = {
- if (tempDir.exists()) {
- FileUtils.deleteQuietly(tempDir)
- }
- assert(tempDir.mkdir(), "Cannot create temp test dir.")
- sparkConfDir = mkdir("spark-conf")
-
- // When running a real Spark cluster, don't set the classpath.
- val extraCp = if (!isRealSpark()) {
- val sparkScalaVersion = getSparkScalaVersion()
- val classPathFile =
- new File(s"minicluster-dependencies/scala-$sparkScalaVersion/target/classpath")
- assert(classPathFile.isFile,
- s"Cannot read MiniCluster classpath file: ${classPathFile.getCanonicalPath}")
- val sparkClassPath =
- FileUtils.readFileToString(classPathFile, Charset.defaultCharset())
-
- val dummyJar = Files.createTempFile(Paths.get(tempDir.toURI), "dummy", "jar").toFile
- Map(
- SparkLauncher.DRIVER_EXTRA_CLASSPATH -> sparkClassPath,
- SparkLauncher.EXECUTOR_EXTRA_CLASSPATH -> sparkClassPath,
- // Used for Spark 2.0. Spark 2.0 will upload specified jars to distributed cache in yarn
- // mode, if not specified it will check jars folder. Here since jars folder is not
- // existed, so it will throw exception.
- "spark.yarn.jars" -> dummyJar.getAbsolutePath)
- } else {
- Map()
- }
-
- val sparkConf = extraCp ++ Map(
- "spark.executor.instances" -> "1",
- "spark.scheduler.minRegisteredResourcesRatio" -> "0.0",
- "spark.ui.enabled" -> "false",
- SparkLauncher.DRIVER_MEMORY -> "512m",
- SparkLauncher.EXECUTOR_MEMORY -> "512m",
- SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS -> "-Dtest.appender=console",
- SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS -> "-Dtest.appender=console"
- )
- saveProperties(sparkConf, new File(sparkConfDir, "spark-defaults.conf"))
-
- _configDir = mkdir("hadoop-conf")
- saveProperties(config, new File(configDir, "cluster.conf"))
- hdfs = Some(start(MiniHdfsMain.getClass, new File(configDir, "core-site.xml")))
- yarn = Some(start(MiniYarnMain.getClass, new File(configDir, "yarn-site.xml")))
- runLivy()
-
- _hdfsScrathDir = fs.makeQualified(new Path("/"))
- }
-
- override def cleanUp(): Unit = {
- Seq(hdfs, yarn, livy).flatten.foreach(stop)
- hdfs = None
- yarn = None
- livy = None
- }
-
- def runLivy(): Unit = {
- assert(!livy.isDefined)
- val confFile = new File(configDir, "serverUrl.conf")
- val jacocoArgs = Option(TestUtils.getJacocoArgs())
- .map { args =>
- Seq(args, s"-Djacoco.args=$args")
- }.getOrElse(Nil)
- val localLivy = start(MiniLivyMain.getClass, confFile, extraJavaArgs = jacocoArgs)
-
- val props = loadProperties(confFile)
- livyUrl = props("livy.server.server-url")
-
- // Wait until Livy server responds.
- val httpClient = new AsyncHttpClient()
- eventually(timeout(30 seconds), interval(1 second)) {
- val res = httpClient.prepareGet(livyUrl + "/metrics").execute().get()
- assert(res.getStatusCode() == HttpServletResponse.SC_OK)
- }
-
- livy = Some(localLivy)
- }
-
- def stopLivy(): Unit = {
- assert(livy.isDefined)
- livy.foreach(stop)
- livyUrl = null
- livy = None
- }
-
- def livyEndpoint: String = livyUrl
-
- private def mkdir(name: String, parent: File = tempDir): File = {
- val dir = new File(parent, name)
- if (!dir.exists()) {
- assert(dir.mkdir(), s"Failed to create directory $name.")
- }
- dir
- }
-
- private def start(
- klass: Class[_],
- configFile: File,
- extraJavaArgs: Seq[String] = Nil): ProcessInfo = {
- val simpleName = klass.getSimpleName().stripSuffix("$")
- val procDir = mkdir(simpleName)
- val procTmp = mkdir("tmp", parent = procDir)
-
- // Before starting anything, clean up previous running sessions.
- sys.process.Process(s"pkill -f $simpleName") !
-
- val java = sys.props("java.home") + "/bin/java"
- val cmd =
- Seq(
- sys.props("java.home") + "/bin/java",
- "-Dtest.appender=console",
- "-Djava.io.tmpdir=" + procTmp.getAbsolutePath(),
- "-cp", childClasspath + File.pathSeparator + configDir.getAbsolutePath(),
- "-XX:MaxPermSize=256m") ++
- extraJavaArgs ++
- Seq(
- klass.getName().stripSuffix("$"),
- configDir.getAbsolutePath())
-
- val logFile = new File(procDir, "output.log")
- val pb = new ProcessBuilder(cmd.toArray: _*)
- .directory(procDir)
- .redirectErrorStream(true)
- .redirectOutput(ProcessBuilder.Redirect.appendTo(logFile))
-
- pb.environment().put("LIVY_CONF_DIR", configDir.getAbsolutePath())
- pb.environment().put("HADOOP_CONF_DIR", configDir.getAbsolutePath())
- pb.environment().put("SPARK_CONF_DIR", sparkConfDir.getAbsolutePath())
- pb.environment().put("SPARK_LOCAL_IP", "127.0.0.1")
-
- val child = pb.start()
-
- // Wait for the config file to show up before returning, so that dependent services
- // can see the configuration. Exit early if process dies.
- eventually(timeout(30 seconds), interval(100 millis)) {
- assert(configFile.isFile(), s"$simpleName hasn't started yet.")
-
- try {
- val exitCode = child.exitValue()
- throw new IOException(s"Child process exited unexpectedly (exit code $exitCode)")
- } catch {
- case _: IllegalThreadStateException => // Try again.
- }
- }
-
- ProcessInfo(child, logFile)
- }
-
- private def stop(svc: ProcessInfo): Unit = {
- svc.process.destroy()
- svc.process.waitFor()
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/main/scala/com/cloudera/livy/test/framework/RealCluster.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/main/scala/com/cloudera/livy/test/framework/RealCluster.scala b/integration-test/src/main/scala/com/cloudera/livy/test/framework/RealCluster.scala
deleted file mode 100644
index 9c79829..0000000
--- a/integration-test/src/main/scala/com/cloudera/livy/test/framework/RealCluster.scala
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.framework
-
-import java.io.{File, IOException}
-import java.nio.file.Files
-import java.security.PrivilegedExceptionAction
-import javax.servlet.http.HttpServletResponse._
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.sys.process._
-import scala.util.Random
-
-import com.decodified.scalassh._
-import com.ning.http.client.AsyncHttpClient
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.fs.permission.FsPermission
-import org.apache.hadoop.security.UserGroupInformation
-import org.scalatest.concurrent.Eventually._
-
-import com.cloudera.livy.{LivyConf, Logging}
-
-private class RealClusterConfig(config: Map[String, String]) {
- val ip = config("ip")
-
- val sshLogin = config("ssh.login")
- val sshPubKey = config("ssh.pubkey")
- val livyPort = config.getOrElse(LivyConf.SERVER_PORT.key, "8998").toInt
- val livyClasspath = config.getOrElse("livy.classpath", "")
-
- val deployLivy = config.getOrElse("deploy-livy", "true").toBoolean
- val noDeployLivyHome = config.get("livy-home")
-
- val sparkHome = config("env.spark_home")
- val sparkConf = config.getOrElse("env.spark_conf", "/etc/spark/conf")
- val hadoopConf = config.getOrElse("env.hadoop_conf", "/etc/hadoop/conf")
-
- val javaHome = config.getOrElse("env.java_home", "/usr/java/default")
-}
-
-class RealCluster(_config: Map[String, String])
- extends Cluster with ClusterUtils with Logging {
-
- private val config = new RealClusterConfig(_config)
-
- private var livyIsRunning = false
- private var livyHomePath: String = _
- private var livyEpoch = 0
-
- private var _configDir: File = _
-
- private var hdfsScratch: Path = _
-
- private var sparkConfDir: String = _
- private var tempDirPath: String = _
- private var _hasSparkR: Boolean = _
-
- override def isRealSpark(): Boolean = true
-
- override def hasSparkR(): Boolean = _hasSparkR
-
- override def configDir(): File = _configDir
-
- override def hdfsScratchDir(): Path = hdfsScratch
-
- override def doAsClusterUser[T](task: => T): T = {
- val user = UserGroupInformation.createRemoteUser(config.sshLogin)
- user.doAs(new PrivilegedExceptionAction[T] {
- override def run(): T = task
- })
- }
-
- private def sshClient[T](body: SshClient => SSH.Result[T]): T = {
- val sshLogin = PublicKeyLogin(
- config.sshLogin, None, config.sshPubKey :: Nil)
- val hostConfig = HostConfig(login = sshLogin, hostKeyVerifier = HostKeyVerifiers.DontVerify)
- SSH(config.ip, hostConfig)(body) match {
- case Left(err) => throw new IOException(err)
- case Right(result) => result
- }
- }
-
- private def exec(cmd: String): CommandResult = {
- info(s"Running command: $cmd")
- val result = sshClient(_.exec(cmd))
- result.exitCode match {
- case Some(ec) if ec > 0 =>
- throw new IOException(s"Command '$cmd' failed: $ec\n" +
- s"stdout: ${result.stdOutAsString()}\n" +
- s"stderr: ${result.stdErrAsString()}\n")
- case _ =>
- }
- result
- }
-
- private def upload(local: String, remote: String): Unit = {
- info(s"Uploading local path $local")
- sshClient(_.upload(local, remote))
- }
-
- private def download(remote: String, local: String): Unit = {
- info(s"Downloading remote path $remote")
- sshClient(_.download(remote, local))
- }
-
- override def deploy(): Unit = {
- // Make sure Livy is not running.
- stopLivy()
-
- // Check whether SparkR is supported in YARN (need the sparkr.zip archive).\
- _hasSparkR = try {
- exec(s"test -f ${config.sparkHome}/R/lib/sparkr.zip")
- true
- } catch {
- case e: IOException => false
- }
-
- // Copy the remove Hadoop configuration to a local temp dir so that tests can use it to
- // talk to HDFS and YARN.
- val localTemp = new File(sys.props("java.io.tmpdir") + File.separator + "hadoop-conf")
- download(config.hadoopConf, localTemp.getAbsolutePath())
- _configDir = localTemp
-
- // Create a temp directory where test files will be written.
- tempDirPath = s"/tmp/livy-it-${Random.alphanumeric.take(16).mkString}"
- exec(s"mkdir -p $tempDirPath")
-
- // Also create an HDFS scratch directory for tests.
- doAsClusterUser {
- hdfsScratch = fs.makeQualified(
- new Path(s"/tmp/livy-it-${Random.alphanumeric.take(16).mkString}"))
- fs.mkdirs(hdfsScratch)
- fs.setPermission(hdfsScratch, new FsPermission("777"))
- }
-
- // Create a copy of the Spark configuration, and make sure the master is "yarn-cluster".
- sparkConfDir = s"$tempDirPath/spark-conf"
- val sparkProps = s"$sparkConfDir/spark-defaults.conf"
- Seq(
- s"cp -Lr ${config.sparkConf} $sparkConfDir",
- s"touch $sparkProps",
- s"sed -i.old '/spark.master.*/d' $sparkProps",
- s"sed -i.old '/spark.submit.deployMode.*/d' $sparkProps",
- s"echo 'spark.master=yarn-cluster' >> $sparkProps"
- ).foreach(exec)
-
- if (config.deployLivy) {
- try {
- info(s"Deploying Livy to ${config.ip}...")
- val version = sys.props("project.version")
- val assemblyZip = new File(s"../assembly/target/livy-server-$version.zip")
- assert(assemblyZip.isFile,
- s"Can't find livy assembly zip at ${assemblyZip.getCanonicalPath}")
- val assemblyName = assemblyZip.getName()
-
- // SSH to the node to unzip and install Livy.
- upload(assemblyZip.getCanonicalPath, s"$tempDirPath/$assemblyName")
- exec(s"unzip -o $tempDirPath/$assemblyName -d $tempDirPath")
- livyHomePath = s"$tempDirPath/livy-server-$version"
- info(s"Deployed Livy to ${config.ip} at $livyHomePath.")
- } catch {
- case e: Exception =>
- error(s"Failed to deploy Livy to ${config.ip}.", e)
- cleanUp()
- throw e
- }
- } else {
- livyHomePath = config.noDeployLivyHome.get
- info("Skipping deployment.")
- }
-
- runLivy()
- }
-
- override def cleanUp(): Unit = {
- stopLivy()
- if (tempDirPath != null) {
- exec(s"rm -rf $tempDirPath")
- }
- if (hdfsScratch != null) {
- doAsClusterUser {
- // Cannot use the shared `fs` since this runs in a shutdown hook, and that instance
- // may have been closed already.
- val fs = FileSystem.newInstance(hadoopConf)
- try {
- fs.delete(hdfsScratch, true)
- } finally {
- fs.close()
- }
- }
- }
- }
-
- override def runLivy(): Unit = synchronized {
- assert(!livyIsRunning, "Livy is running already.")
-
- val livyConf = Map(
- "livy.server.port" -> config.livyPort.toString,
- // "livy.server.recovery.mode=local",
- "livy.environment" -> "development",
- LivyConf.LIVY_SPARK_MASTER.key -> "yarn",
- LivyConf.LIVY_SPARK_DEPLOY_MODE.key -> "cluster")
- val livyConfFile = File.createTempFile("livy.", ".properties")
- saveProperties(livyConf, livyConfFile)
- upload(livyConfFile.getAbsolutePath(), s"$livyHomePath/conf/livy.conf")
-
- val env = Map(
- "JAVA_HOME" -> config.javaHome,
- "HADOOP_CONF_DIR" -> config.hadoopConf,
- "SPARK_CONF_DIR" -> sparkConfDir,
- "SPARK_HOME" -> config.sparkHome,
- "CLASSPATH" -> config.livyClasspath,
- "LIVY_PID_DIR" -> s"$tempDirPath/pid",
- "LIVY_LOG_DIR" -> s"$tempDirPath/logs",
- "LIVY_MAX_LOG_FILES" -> "16",
- "LIVY_IDENT_STRING" -> "it"
- )
- val livyEnvFile = File.createTempFile("livy-env.", ".sh")
- saveProperties(env, livyEnvFile)
- upload(livyEnvFile.getAbsolutePath(), s"$livyHomePath/conf/livy-env.sh")
-
- info(s"Starting Livy @ port ${config.livyPort}...")
- exec(s"env -i $livyHomePath/bin/livy-server start")
- livyIsRunning = true
-
- val httpClient = new AsyncHttpClient()
- eventually(timeout(1 minute), interval(1 second)) {
- assert(httpClient.prepareGet(livyEndpoint).execute().get().getStatusCode == SC_OK)
- }
- info(s"Started Livy.")
- }
-
- override def stopLivy(): Unit = synchronized {
- info("Stopping Livy Server")
- try {
- exec(s"$livyHomePath/bin/livy-server stop")
- } catch {
- case e: Exception =>
- if (livyIsRunning) {
- throw e
- }
- }
-
- if (livyIsRunning) {
- // Wait a tiny bit so that the process finishes closing its output files.
- Thread.sleep(2)
-
- livyEpoch += 1
- val logName = "livy-it-server.out"
- val localName = s"livy-it-server-$livyEpoch.out"
- val logPath = s"$tempDirPath/logs/$logName"
- val localLog = sys.props("java.io.tmpdir") + File.separator + localName
- download(logPath, localLog)
- info(s"Log for epoch $livyEpoch available at $localLog")
- }
- }
-
- override def livyEndpoint: String = s"http://${config.ip}:${config.livyPort}"
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/main/scala/org/apache/livy/test/framework/BaseIntegrationTestSuite.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/main/scala/org/apache/livy/test/framework/BaseIntegrationTestSuite.scala b/integration-test/src/main/scala/org/apache/livy/test/framework/BaseIntegrationTestSuite.scala
new file mode 100644
index 0000000..d770e7e
--- /dev/null
+++ b/integration-test/src/main/scala/org/apache/livy/test/framework/BaseIntegrationTestSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.test.framework
+
+import java.io.File
+import java.util.UUID
+
+import scala.concurrent._
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.control.NonFatal
+
+import com.ning.http.client.AsyncHttpClient
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.scalatest._
+
+abstract class BaseIntegrationTestSuite extends FunSuite with Matchers with BeforeAndAfterAll {
+ import scala.concurrent.ExecutionContext.Implicits.global
+
+ var cluster: Cluster = _
+ var httpClient: AsyncHttpClient = _
+ var livyClient: LivyRestClient = _
+
+ protected def livyEndpoint: String = cluster.livyEndpoint
+
+ protected val testLib = sys.props("java.class.path")
+ .split(File.pathSeparator)
+ .find(new File(_).getName().startsWith("livy-test-lib-"))
+ .getOrElse(throw new Exception(s"Cannot find test lib in ${sys.props("java.class.path")}"))
+
+ protected def getYarnLog(appId: String): String = {
+ require(appId != null, "appId shouldn't be null")
+
+ val appReport = cluster.yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId))
+ assert(appReport != null, "appReport shouldn't be null")
+
+ appReport.getDiagnostics()
+ }
+
+ protected def restartLivy(): Unit = {
+ val f = future {
+ cluster.stopLivy()
+ cluster.runLivy()
+ }
+ Await.result(f, 3 minutes)
+ }
+
+ /** Uploads a file to HDFS and returns just its path. */
+ protected def uploadToHdfs(file: File): String = {
+ val hdfsPath = new Path(cluster.hdfsScratchDir(),
+ UUID.randomUUID().toString() + "-" + file.getName())
+ cluster.fs.copyFromLocalFile(new Path(file.toURI()), hdfsPath)
+ hdfsPath.toUri().getPath()
+ }
+
+ /** Wrapper around test() to be used by pyspark tests. */
+ protected def pytest(desc: String)(testFn: => Unit): Unit = {
+ test(desc) {
+ assume(cluster.isRealSpark(), "PySpark tests require a real Spark installation.")
+ testFn
+ }
+ }
+
+ /** Wrapper around test() to be used by SparkR tests. */
+ protected def rtest(desc: String)(testFn: => Unit): Unit = {
+ test(desc) {
+ assume(!sys.props.getOrElse("skipRTests", "false").toBoolean, "Skipping R tests.")
+ assume(cluster.isRealSpark(), "SparkR tests require a real Spark installation.")
+ assume(cluster.hasSparkR(), "Spark under test does not support R.")
+ testFn
+ }
+ }
+
+ /** Clean up session and show info when test fails. */
+ protected def withSession[S <: LivyRestClient#Session, R]
+ (s: S)
+ (f: (S) => R): R = {
+ try {
+ f(s)
+ } catch {
+ case NonFatal(e) =>
+ try {
+ val state = s.snapshot()
+ info(s"Final session state: $state")
+ state.appId.foreach { id => info(s"YARN diagnostics: ${getYarnLog(id)}") }
+ } catch { case NonFatal(_) => }
+ throw e
+ } finally {
+ try {
+ s.stop()
+ } catch {
+ case NonFatal(e) => alert(s"Failed to stop session: $e")
+ }
+ }
+ }
+
+ // We need beforeAll() here because BatchIT's beforeAll() has to be executed after this.
+ // Please create an issue if this breaks test logging for cluster creation.
+ protected override def beforeAll() = {
+ cluster = Cluster.get()
+ httpClient = new AsyncHttpClient()
+ livyClient = new LivyRestClient(httpClient, livyEndpoint)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/main/scala/org/apache/livy/test/framework/Cluster.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/main/scala/org/apache/livy/test/framework/Cluster.scala b/integration-test/src/main/scala/org/apache/livy/test/framework/Cluster.scala
new file mode 100644
index 0000000..e1b6844
--- /dev/null
+++ b/integration-test/src/main/scala/org/apache/livy/test/framework/Cluster.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.test.framework
+
+import java.io._
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.yarn.client.api.YarnClient
+
+import org.apache.livy.Logging
+
+/**
+ * An common interface to run test on real cluster and mini cluster.
+ */
+trait Cluster {
+ def deploy(): Unit
+ def cleanUp(): Unit
+ def configDir(): File
+ def isRealSpark(): Boolean
+ def hasSparkR(): Boolean
+
+ def runLivy(): Unit
+ def stopLivy(): Unit
+ def livyEndpoint: String
+ def hdfsScratchDir(): Path
+
+ def doAsClusterUser[T](task: => T): T
+
+ lazy val hadoopConf = {
+ val conf = new Configuration(false)
+ configDir().listFiles().foreach { f =>
+ if (f.getName().endsWith(".xml")) {
+ conf.addResource(new Path(f.toURI()))
+ }
+ }
+ conf
+ }
+
+ lazy val yarnConf = {
+ val conf = new Configuration(false)
+ conf.addResource(new Path(s"${configDir().getCanonicalPath}/yarn-site.xml"))
+ conf
+ }
+
+ lazy val fs = doAsClusterUser {
+ FileSystem.get(hadoopConf)
+ }
+
+ lazy val yarnClient = doAsClusterUser {
+ val c = YarnClient.createYarnClient()
+ c.init(yarnConf)
+ c.start()
+ c
+ }
+}
+
+object Cluster extends Logging {
+ private val CLUSTER_TYPE = "cluster.type"
+
+ private lazy val config = {
+ sys.props.get("cluster.spec")
+ .filter { path => path.nonEmpty && path != "default" }
+ .map { path =>
+ val in = Option(getClass.getClassLoader.getResourceAsStream(path))
+ .getOrElse(new FileInputStream(path))
+ val p = new Properties()
+ val reader = new InputStreamReader(in, UTF_8)
+ try {
+ p.load(reader)
+ } finally {
+ reader.close()
+ }
+ p.asScala.toMap
+ }
+ .getOrElse(Map(CLUSTER_TYPE -> "mini"))
+ }
+
+ private lazy val cluster = {
+ var _cluster: Cluster = null
+ try {
+ _cluster = config.get(CLUSTER_TYPE) match {
+ case Some("real") => new RealCluster(config)
+ case Some("mini") => new MiniCluster(config)
+ case t => throw new Exception(s"Unknown or unset cluster.type $t")
+ }
+ Runtime.getRuntime.addShutdownHook(new Thread {
+ override def run(): Unit = {
+ info("Shutting down cluster pool.")
+ _cluster.cleanUp()
+ }
+ })
+ _cluster.deploy()
+ } catch {
+ case e: Throwable =>
+ error("Failed to initialize cluster.", e)
+ Option(_cluster).foreach { c =>
+ Try(c.cleanUp()).recover { case e =>
+ error("Furthermore, failed to clean up cluster after failure.", e)
+ }
+ }
+ throw e
+ }
+ _cluster
+ }
+
+ def get(): Cluster = cluster
+
+ def isRunningOnTravis: Boolean = sys.env.contains("TRAVIS")
+}
+
+trait ClusterUtils {
+
+ protected def saveProperties(props: Map[String, String], dest: File): Unit = {
+ val jprops = new Properties()
+ props.foreach { case (k, v) => jprops.put(k, v) }
+
+ val tempFile = new File(dest.getAbsolutePath() + ".tmp")
+ val out = new OutputStreamWriter(new FileOutputStream(tempFile), UTF_8)
+ try {
+ jprops.store(out, "Configuration")
+ } finally {
+ out.close()
+ }
+ tempFile.renameTo(dest)
+ }
+
+ protected def loadProperties(file: File): Map[String, String] = {
+ val in = new InputStreamReader(new FileInputStream(file), UTF_8)
+ val props = new Properties()
+ try {
+ props.load(in)
+ } finally {
+ in.close()
+ }
+ props.asScala.toMap
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala b/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala
new file mode 100644
index 0000000..6d319c7
--- /dev/null
+++ b/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.livy.test.framework
+
+import java.util.regex.Pattern
+import javax.servlet.http.HttpServletResponse
+
+import scala.annotation.tailrec
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.{Either, Left, Right}
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.ning.http.client.AsyncHttpClient
+import com.ning.http.client.Response
+import org.apache.hadoop.yarn.api.records.ApplicationId
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.livy.server.batch.CreateBatchRequest
+import org.apache.livy.server.interactive.CreateInteractiveRequest
+import org.apache.livy.sessions.{Kind, SessionKindModule, SessionState}
+import org.apache.livy.utils.AppInfo
+
+object LivyRestClient {
+ private val BATCH_TYPE = "batches"
+ private val INTERACTIVE_TYPE = "sessions"
+
+ // TODO Define these in production code and share them with test code.
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ private case class StatementResult(id: Int, state: String, output: Map[String, Any])
+
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ case class StatementError(ename: String, evalue: String, stackTrace: Seq[String])
+
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ case class SessionSnapshot(
+ id: Int,
+ appId: Option[String],
+ state: String,
+ appInfo: AppInfo,
+ log: IndexedSeq[String])
+}
+
+class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String) {
+ import LivyRestClient._
+
+ val mapper = new ObjectMapper()
+ .registerModule(DefaultScalaModule)
+ .registerModule(new SessionKindModule())
+
+ class Session(val id: Int, sessionType: String) {
+ val url: String = s"$livyEndpoint/$sessionType/$id"
+
+ def appId(): ApplicationId = {
+ ConverterUtils.toApplicationId(snapshot().appId.get)
+ }
+
+ def snapshot(): SessionSnapshot = {
+ val r = httpClient.prepareGet(url).execute().get()
+ assertStatusCode(r, HttpServletResponse.SC_OK)
+
+ mapper.readValue(r.getResponseBodyAsStream, classOf[SessionSnapshot])
+ }
+
+ def stop(): Unit = {
+ httpClient.prepareDelete(url).execute().get()
+
+ eventually(timeout(30 seconds), interval(1 second)) {
+ verifySessionDoesNotExist()
+ }
+ }
+
+ def verifySessionState(state: SessionState): Unit = {
+ verifySessionState(Set(state))
+ }
+
+ def verifySessionState(states: Set[SessionState]): Unit = {
+ val t = if (Cluster.isRunningOnTravis) 5.minutes else 2.minutes
+ val strStates = states.map(_.toString)
+ // Travis uses very slow VM. It needs a longer timeout.
+ // Keeping the original timeout to avoid slowing down local development.
+ eventually(timeout(t), interval(1 second)) {
+ val s = snapshot().state
+ assert(strStates.contains(s), s"Session $id state $s doesn't equal one of $strStates")
+ }
+ }
+
+ def verifySessionDoesNotExist(): Unit = {
+ val r = httpClient.prepareGet(url).execute().get()
+ assertStatusCode(r, HttpServletResponse.SC_NOT_FOUND)
+ }
+ }
+
+ class BatchSession(id: Int) extends Session(id, BATCH_TYPE) {
+ def verifySessionDead(): Unit = verifySessionState(SessionState.Dead())
+ def verifySessionRunning(): Unit = verifySessionState(SessionState.Running())
+ def verifySessionSuccess(): Unit = verifySessionState(SessionState.Success())
+ }
+
+ class InteractiveSession(id: Int) extends Session(id, INTERACTIVE_TYPE) {
+ class Statement(code: String) {
+ val stmtId = {
+ val requestBody = Map("code" -> code)
+ val r = httpClient.preparePost(s"$url/statements")
+ .setBody(mapper.writeValueAsString(requestBody))
+ .execute()
+ .get()
+ assertStatusCode(r, HttpServletResponse.SC_CREATED)
+
+ val newStmt = mapper.readValue(r.getResponseBodyAsStream, classOf[StatementResult])
+ newStmt.id
+ }
+
+ final def result(): Either[String, StatementError] = {
+ eventually(timeout(1 minute), interval(1 second)) {
+ val r = httpClient.prepareGet(s"$url/statements/$stmtId")
+ .execute()
+ .get()
+ assertStatusCode(r, HttpServletResponse.SC_OK)
+
+ val newStmt = mapper.readValue(r.getResponseBodyAsStream, classOf[StatementResult])
+ assert(newStmt.state == "available", s"Statement isn't available: ${newStmt.state}")
+
+ val output = newStmt.output
+ output.get("status") match {
+ case Some("ok") =>
+ val data = output("data").asInstanceOf[Map[String, Any]]
+ var rst = data.getOrElse("text/plain", "")
+ val magicRst = data.getOrElse("application/vnd.livy.table.v1+json", null)
+ if (magicRst != null) {
+ rst = mapper.writeValueAsString(magicRst)
+ }
+ Left(rst.asInstanceOf[String])
+ case Some("error") => Right(mapper.convertValue(output, classOf[StatementError]))
+ case Some(status) =>
+ throw new IllegalStateException(s"Unknown statement $stmtId status: $status")
+ case None =>
+ throw new IllegalStateException(s"Unknown statement $stmtId output: $newStmt")
+ }
+ }
+ }
+
+ def verifyResult(expectedRegex: String): Unit = {
+ result() match {
+ case Left(result) =>
+ if (expectedRegex != null) {
+ matchStrings(result, expectedRegex)
+ }
+ case Right(error) =>
+ assert(false, s"Got error from statement $stmtId $code: ${error.evalue}")
+ }
+ }
+
+ def verifyError(
+ ename: String = null, evalue: String = null, stackTrace: String = null): Unit = {
+ result() match {
+ case Left(result) =>
+ assert(false, s"Statement $stmtId `$code` expected to fail, but succeeded.")
+ case Right(error) =>
+ val remoteStack = Option(error.stackTrace).getOrElse(Nil).mkString("\n")
+ Seq(error.ename -> ename, error.evalue -> evalue, remoteStack -> stackTrace).foreach {
+ case (actual, expected) if expected != null => matchStrings(actual, expected)
+ case _ =>
+ }
+ }
+ }
+
+ private def matchStrings(actual: String, expected: String): Unit = {
+ val regex = Pattern.compile(expected, Pattern.DOTALL)
+ assert(regex.matcher(actual).matches(), s"$actual did not match regex $expected")
+ }
+ }
+
+ def run(code: String): Statement = { new Statement(code) }
+
+ def runFatalStatement(code: String): Unit = {
+ val requestBody = Map("code" -> code)
+ val r = httpClient.preparePost(s"$url/statements")
+ .setBody(mapper.writeValueAsString(requestBody))
+ .execute()
+
+ verifySessionState(SessionState.Dead())
+ }
+
+ def verifySessionIdle(): Unit = {
+ verifySessionState(SessionState.Idle())
+ }
+ }
+
+ def startBatch(
+ file: String,
+ className: Option[String],
+ args: List[String],
+ sparkConf: Map[String, String]): BatchSession = {
+ val r = new CreateBatchRequest()
+ r.file = file
+ r.className = className
+ r.args = args
+ r.conf = Map("spark.yarn.maxAppAttempts" -> "1") ++ sparkConf
+
+ val id = start(BATCH_TYPE, mapper.writeValueAsString(r))
+ new BatchSession(id)
+ }
+
+ def startSession(
+ kind: Kind,
+ sparkConf: Map[String, String],
+ heartbeatTimeoutInSecond: Int): InteractiveSession = {
+ val r = new CreateInteractiveRequest()
+ r.kind = kind
+ r.conf = sparkConf
+ r.heartbeatTimeoutInSecond = heartbeatTimeoutInSecond
+
+ val id = start(INTERACTIVE_TYPE, mapper.writeValueAsString(r))
+ new InteractiveSession(id)
+ }
+
+ def connectSession(id: Int): InteractiveSession = { new InteractiveSession(id) }
+
+ private def start(sessionType: String, body: String): Int = {
+ val r = httpClient.preparePost(s"$livyEndpoint/$sessionType")
+ .setBody(body)
+ .execute()
+ .get()
+
+ assertStatusCode(r, HttpServletResponse.SC_CREATED)
+
+ val newSession = mapper.readValue(r.getResponseBodyAsStream, classOf[SessionSnapshot])
+ newSession.id
+ }
+
+ private def assertStatusCode(r: Response, expected: Int): Unit = {
+ def pretty(r: Response): String = {
+ s"${r.getStatusCode} ${r.getResponseBody}"
+ }
+ assert(r.getStatusCode() == expected, s"HTTP status code != $expected: ${pretty(r)}")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/main/scala/org/apache/livy/test/framework/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/main/scala/org/apache/livy/test/framework/MiniCluster.scala b/integration-test/src/main/scala/org/apache/livy/test/framework/MiniCluster.scala
new file mode 100644
index 0000000..005a3e9
--- /dev/null
+++ b/integration-test/src/main/scala/org/apache/livy/test/framework/MiniCluster.scala
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.test.framework
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import javax.servlet.http.HttpServletResponse
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import com.ning.http.client.AsyncHttpClient
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hdfs.MiniDFSCluster
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.server.MiniYARNCluster
+import org.apache.spark.launcher.SparkLauncher
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.livy.{LivyConf, Logging}
+import org.apache.livy.client.common.TestUtils
+import org.apache.livy.server.LivyServer
+
+private class MiniClusterConfig(val config: Map[String, String]) {
+
+ val nmCount = getInt("yarn.nm-count", 1)
+ val localDirCount = getInt("yarn.local-dir-count", 1)
+ val logDirCount = getInt("yarn.log-dir-count", 1)
+ val dnCount = getInt("hdfs.dn-count", 1)
+
+ private def getInt(key: String, default: Int): Int = {
+ config.get(key).map(_.toInt).getOrElse(default)
+ }
+
+}
+
+sealed trait MiniClusterUtils extends ClusterUtils {
+ private val livySparkScalaVersionEnvVarName = "LIVY_SPARK_SCALA_VERSION"
+
+ protected def getSparkScalaVersion(): String = {
+ sys.env.getOrElse(livySparkScalaVersionEnvVarName, {
+ throw new RuntimeException(s"Please specify env var $livySparkScalaVersionEnvVarName.")
+ })
+ }
+
+ protected def saveConfig(conf: Configuration, dest: File): Unit = {
+ val redacted = new Configuration(conf)
+ // This setting references a test class that is not available when using a real Spark
+ // installation, so remove it from client configs.
+ redacted.unset("net.topology.node.switch.mapping.impl")
+
+ val out = new FileOutputStream(dest)
+ try {
+ redacted.writeXml(out)
+ } finally {
+ out.close()
+ }
+ }
+
+}
+
+sealed abstract class MiniClusterBase extends MiniClusterUtils with Logging {
+
+ def main(args: Array[String]): Unit = {
+ val klass = getClass().getSimpleName()
+
+ info(s"$klass is starting up.")
+
+ val Array(configPath) = args
+ val config = {
+ val file = new File(s"$configPath/cluster.conf")
+ val props = loadProperties(file)
+ new MiniClusterConfig(props)
+ }
+ start(config, configPath)
+
+ info(s"$klass running.")
+
+ while (true) synchronized {
+ wait()
+ }
+ }
+
+ protected def start(config: MiniClusterConfig, configPath: String): Unit
+
+}
+
+object MiniHdfsMain extends MiniClusterBase {
+
+ override protected def start(config: MiniClusterConfig, configPath: String): Unit = {
+ val hadoopConf = new Configuration()
+ val hdfsCluster = new MiniDFSCluster.Builder(hadoopConf)
+ .numDataNodes(config.dnCount)
+ .format(true)
+ .waitSafeMode(true)
+ .build()
+
+ hdfsCluster.waitActive()
+
+ saveConfig(hadoopConf, new File(configPath + "/core-site.xml"))
+ }
+
+}
+
+object MiniYarnMain extends MiniClusterBase {
+
+ override protected def start(config: MiniClusterConfig, configPath: String): Unit = {
+ val baseConfig = new YarnConfiguration()
+ var yarnCluster = new MiniYARNCluster(getClass().getName(), config.nmCount,
+ config.localDirCount, config.logDirCount)
+ yarnCluster.init(baseConfig)
+
+ // Install a shutdown hook for stop the service and kill all running applications.
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ override def run(): Unit = yarnCluster.stop()
+ })
+
+ yarnCluster.start()
+
+ // Workaround for YARN-2642.
+ val yarnConfig = yarnCluster.getConfig()
+ eventually(timeout(30 seconds), interval(100 millis)) {
+ assert(yarnConfig.get(YarnConfiguration.RM_ADDRESS).split(":")(1) != "0",
+ "RM not up yes.")
+ }
+
+ info(s"RM address in configuration is ${yarnConfig.get(YarnConfiguration.RM_ADDRESS)}")
+ saveConfig(yarnConfig, new File(configPath + "/yarn-site.xml"))
+ }
+
+}
+
+object MiniLivyMain extends MiniClusterBase {
+ var livyUrl: Option[String] = None
+
+ def start(config: MiniClusterConfig, configPath: String): Unit = {
+ var livyConf = Map(
+ LivyConf.LIVY_SPARK_MASTER.key -> "yarn",
+ LivyConf.LIVY_SPARK_DEPLOY_MODE.key -> "cluster",
+ LivyConf.LIVY_SPARK_SCALA_VERSION.key -> getSparkScalaVersion(),
+ LivyConf.HEARTBEAT_WATCHDOG_INTERVAL.key -> "1s",
+ LivyConf.YARN_POLL_INTERVAL.key -> "500ms",
+ LivyConf.RECOVERY_MODE.key -> "recovery",
+ LivyConf.RECOVERY_STATE_STORE.key -> "filesystem",
+ LivyConf.RECOVERY_STATE_STORE_URL.key -> s"file://$configPath/state-store")
+
+ if (Cluster.isRunningOnTravis) {
+ livyConf ++= Map("livy.server.yarn.app-lookup-timeout" -> "2m")
+ }
+
+ saveProperties(livyConf, new File(configPath + "/livy.conf"))
+
+ val server = new LivyServer()
+ server.start()
+ server.livyConf.set(LivyConf.ENABLE_HIVE_CONTEXT, true)
+ // Write a serverUrl.conf file to the conf directory with the location of the Livy
+ // server. Do it atomically since it's used by MiniCluster to detect when the Livy server
+ // is up and ready.
+ eventually(timeout(30 seconds), interval(1 second)) {
+ val serverUrlConf = Map("livy.server.server-url" -> server.serverUrl())
+ saveProperties(serverUrlConf, new File(configPath + "/serverUrl.conf"))
+ }
+ }
+}
+
+private case class ProcessInfo(process: Process, logFile: File)
+
+/**
+ * Cluster implementation that uses HDFS / YARN mini clusters running as sub-processes locally.
+ * Launching Livy through this mini cluster results in three child processes:
+ *
+ * - A HDFS mini cluster
+ * - A YARN mini cluster
+ * - The Livy server
+ *
+ * Each service will write its client configuration to a temporary directory managed by the
+ * framework, so that applications can connect to the services.
+ *
+ * TODO: add support for MiniKdc.
+ */
+class MiniCluster(config: Map[String, String]) extends Cluster with MiniClusterUtils with Logging {
+ private val tempDir = new File(s"${sys.props("java.io.tmpdir")}/livy-int-test")
+ private var sparkConfDir: File = _
+ private var _configDir: File = _
+ private var hdfs: Option[ProcessInfo] = None
+ private var yarn: Option[ProcessInfo] = None
+ private var livy: Option[ProcessInfo] = None
+ private var livyUrl: String = _
+ private var _hdfsScrathDir: Path = _
+
+ override def configDir(): File = _configDir
+
+ override def hdfsScratchDir(): Path = _hdfsScrathDir
+
+ override def isRealSpark(): Boolean = {
+ new File(sys.env("SPARK_HOME") + File.separator + "RELEASE").isFile()
+ }
+
+ override def hasSparkR(): Boolean = {
+ val path = Seq(sys.env("SPARK_HOME"), "R", "lib", "sparkr.zip").mkString(File.separator)
+ new File(path).isFile()
+ }
+
+ override def doAsClusterUser[T](task: => T): T = task
+
+ // Explicitly remove the "test-lib" dependency from the classpath of child processes. We
+ // want tests to explicitly upload this jar when necessary, to test those code paths.
+ private val childClasspath = {
+ val cp = sys.props("java.class.path").split(File.pathSeparator)
+ val filtered = cp.filter { path => !new File(path).getName().startsWith("livy-test-lib-") }
+ assert(cp.size != filtered.size, "livy-test-lib jar not found in classpath!")
+ filtered.mkString(File.pathSeparator)
+ }
+
+ override def deploy(): Unit = {
+ if (tempDir.exists()) {
+ FileUtils.deleteQuietly(tempDir)
+ }
+ assert(tempDir.mkdir(), "Cannot create temp test dir.")
+ sparkConfDir = mkdir("spark-conf")
+
+ // When running a real Spark cluster, don't set the classpath.
+ val extraCp = if (!isRealSpark()) {
+ val sparkScalaVersion = getSparkScalaVersion()
+ val classPathFile =
+ new File(s"minicluster-dependencies/scala-$sparkScalaVersion/target/classpath")
+ assert(classPathFile.isFile,
+ s"Cannot read MiniCluster classpath file: ${classPathFile.getCanonicalPath}")
+ val sparkClassPath =
+ FileUtils.readFileToString(classPathFile, Charset.defaultCharset())
+
+ val dummyJar = Files.createTempFile(Paths.get(tempDir.toURI), "dummy", "jar").toFile
+ Map(
+ SparkLauncher.DRIVER_EXTRA_CLASSPATH -> sparkClassPath,
+ SparkLauncher.EXECUTOR_EXTRA_CLASSPATH -> sparkClassPath,
+ // Used for Spark 2.0. Spark 2.0 will upload specified jars to distributed cache in yarn
+ // mode, if not specified it will check jars folder. Here since jars folder is not
+ // existed, so it will throw exception.
+ "spark.yarn.jars" -> dummyJar.getAbsolutePath)
+ } else {
+ Map()
+ }
+
+ val sparkConf = extraCp ++ Map(
+ "spark.executor.instances" -> "1",
+ "spark.scheduler.minRegisteredResourcesRatio" -> "0.0",
+ "spark.ui.enabled" -> "false",
+ SparkLauncher.DRIVER_MEMORY -> "512m",
+ SparkLauncher.EXECUTOR_MEMORY -> "512m",
+ SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS -> "-Dtest.appender=console",
+ SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS -> "-Dtest.appender=console"
+ )
+ saveProperties(sparkConf, new File(sparkConfDir, "spark-defaults.conf"))
+
+ _configDir = mkdir("hadoop-conf")
+ saveProperties(config, new File(configDir, "cluster.conf"))
+ hdfs = Some(start(MiniHdfsMain.getClass, new File(configDir, "core-site.xml")))
+ yarn = Some(start(MiniYarnMain.getClass, new File(configDir, "yarn-site.xml")))
+ runLivy()
+
+ _hdfsScrathDir = fs.makeQualified(new Path("/"))
+ }
+
+ override def cleanUp(): Unit = {
+ Seq(hdfs, yarn, livy).flatten.foreach(stop)
+ hdfs = None
+ yarn = None
+ livy = None
+ }
+
+ def runLivy(): Unit = {
+ assert(!livy.isDefined)
+ val confFile = new File(configDir, "serverUrl.conf")
+ val jacocoArgs = Option(TestUtils.getJacocoArgs())
+ .map { args =>
+ Seq(args, s"-Djacoco.args=$args")
+ }.getOrElse(Nil)
+ val localLivy = start(MiniLivyMain.getClass, confFile, extraJavaArgs = jacocoArgs)
+
+ val props = loadProperties(confFile)
+ livyUrl = props("livy.server.server-url")
+
+ // Wait until Livy server responds.
+ val httpClient = new AsyncHttpClient()
+ eventually(timeout(30 seconds), interval(1 second)) {
+ val res = httpClient.prepareGet(livyUrl + "/metrics").execute().get()
+ assert(res.getStatusCode() == HttpServletResponse.SC_OK)
+ }
+
+ livy = Some(localLivy)
+ }
+
+ def stopLivy(): Unit = {
+ assert(livy.isDefined)
+ livy.foreach(stop)
+ livyUrl = null
+ livy = None
+ }
+
+ def livyEndpoint: String = livyUrl
+
+ private def mkdir(name: String, parent: File = tempDir): File = {
+ val dir = new File(parent, name)
+ if (!dir.exists()) {
+ assert(dir.mkdir(), s"Failed to create directory $name.")
+ }
+ dir
+ }
+
+ private def start(
+ klass: Class[_],
+ configFile: File,
+ extraJavaArgs: Seq[String] = Nil): ProcessInfo = {
+ val simpleName = klass.getSimpleName().stripSuffix("$")
+ val procDir = mkdir(simpleName)
+ val procTmp = mkdir("tmp", parent = procDir)
+
+ // Before starting anything, clean up previous running sessions.
+ sys.process.Process(s"pkill -f $simpleName") !
+
+ val java = sys.props("java.home") + "/bin/java"
+ val cmd =
+ Seq(
+ sys.props("java.home") + "/bin/java",
+ "-Dtest.appender=console",
+ "-Djava.io.tmpdir=" + procTmp.getAbsolutePath(),
+ "-cp", childClasspath + File.pathSeparator + configDir.getAbsolutePath(),
+ "-XX:MaxPermSize=256m") ++
+ extraJavaArgs ++
+ Seq(
+ klass.getName().stripSuffix("$"),
+ configDir.getAbsolutePath())
+
+ val logFile = new File(procDir, "output.log")
+ val pb = new ProcessBuilder(cmd.toArray: _*)
+ .directory(procDir)
+ .redirectErrorStream(true)
+ .redirectOutput(ProcessBuilder.Redirect.appendTo(logFile))
+
+ pb.environment().put("LIVY_CONF_DIR", configDir.getAbsolutePath())
+ pb.environment().put("HADOOP_CONF_DIR", configDir.getAbsolutePath())
+ pb.environment().put("SPARK_CONF_DIR", sparkConfDir.getAbsolutePath())
+ pb.environment().put("SPARK_LOCAL_IP", "127.0.0.1")
+
+ val child = pb.start()
+
+ // Wait for the config file to show up before returning, so that dependent services
+ // can see the configuration. Exit early if process dies.
+ eventually(timeout(30 seconds), interval(100 millis)) {
+ assert(configFile.isFile(), s"$simpleName hasn't started yet.")
+
+ try {
+ val exitCode = child.exitValue()
+ throw new IOException(s"Child process exited unexpectedly (exit code $exitCode)")
+ } catch {
+ case _: IllegalThreadStateException => // Try again.
+ }
+ }
+
+ ProcessInfo(child, logFile)
+ }
+
+ private def stop(svc: ProcessInfo): Unit = {
+ svc.process.destroy()
+ svc.process.waitFor()
+ }
+
+}