You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/07/05 03:06:58 UTC

[29/33] incubator-livy git commit: LIVY-375. Change Livy code package name to org.apache.livy

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index e32ca78..e15c470 100644
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -20,14 +20,14 @@
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
-    <groupId>com.cloudera.livy</groupId>
+    <groupId>org.apache.livy</groupId>
     <artifactId>livy-main</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>0.4.0-SNAPSHOT</version>
+    <version>0.4.0-incubating-SNAPSHOT</version>
   </parent>
 
   <artifactId>livy-integration-test</artifactId>
-  <version>0.4.0-SNAPSHOT</version>
+  <version>0.4.0-incubating-SNAPSHOT</version>
   <packaging>jar</packaging>
 
   <properties>

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala b/integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala
deleted file mode 100644
index 039ee0a..0000000
--- a/integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.framework
-
-import java.io.File
-import java.util.UUID
-
-import scala.concurrent._
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.util.control.NonFatal
-
-import com.ning.http.client.AsyncHttpClient
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.scalatest._
-
-abstract class BaseIntegrationTestSuite extends FunSuite with Matchers with BeforeAndAfterAll {
-  import scala.concurrent.ExecutionContext.Implicits.global
-
-  var cluster: Cluster = _
-  var httpClient: AsyncHttpClient = _
-  var livyClient: LivyRestClient = _
-
-  protected def livyEndpoint: String = cluster.livyEndpoint
-
-  protected val testLib = sys.props("java.class.path")
-    .split(File.pathSeparator)
-    .find(new File(_).getName().startsWith("livy-test-lib-"))
-    .getOrElse(throw new Exception(s"Cannot find test lib in ${sys.props("java.class.path")}"))
-
-  protected def getYarnLog(appId: String): String = {
-    require(appId != null, "appId shouldn't be null")
-
-    val appReport = cluster.yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId))
-    assert(appReport != null, "appReport shouldn't be null")
-
-    appReport.getDiagnostics()
-  }
-
-  protected def restartLivy(): Unit = {
-    val f = future {
-      cluster.stopLivy()
-      cluster.runLivy()
-    }
-    Await.result(f, 3 minutes)
-  }
-
-  /** Uploads a file to HDFS and returns just its path. */
-  protected def uploadToHdfs(file: File): String = {
-    val hdfsPath = new Path(cluster.hdfsScratchDir(),
-      UUID.randomUUID().toString() + "-" + file.getName())
-    cluster.fs.copyFromLocalFile(new Path(file.toURI()), hdfsPath)
-    hdfsPath.toUri().getPath()
-  }
-
-  /** Wrapper around test() to be used by pyspark tests. */
-  protected def pytest(desc: String)(testFn: => Unit): Unit = {
-    test(desc) {
-      assume(cluster.isRealSpark(), "PySpark tests require a real Spark installation.")
-      testFn
-    }
-  }
-
-  /** Wrapper around test() to be used by SparkR tests. */
-  protected def rtest(desc: String)(testFn: => Unit): Unit = {
-    test(desc) {
-      assume(!sys.props.getOrElse("skipRTests", "false").toBoolean, "Skipping R tests.")
-      assume(cluster.isRealSpark(), "SparkR tests require a real Spark installation.")
-      assume(cluster.hasSparkR(), "Spark under test does not support R.")
-      testFn
-    }
-  }
-
-  /** Clean up session and show info when test fails. */
-  protected def withSession[S <: LivyRestClient#Session, R]
-    (s: S)
-    (f: (S) => R): R = {
-    try {
-      f(s)
-    } catch {
-      case NonFatal(e) =>
-        try {
-          val state = s.snapshot()
-          info(s"Final session state: $state")
-          state.appId.foreach { id => info(s"YARN diagnostics: ${getYarnLog(id)}") }
-        } catch { case NonFatal(_) => }
-        throw e
-    } finally {
-      try {
-        s.stop()
-      } catch {
-        case NonFatal(e) => alert(s"Failed to stop session: $e")
-      }
-    }
-  }
-
-  // We need beforeAll() here because BatchIT's beforeAll() has to be executed after this.
-  // Please create an issue if this breaks test logging for cluster creation.
-  protected override def beforeAll() = {
-    cluster = Cluster.get()
-    httpClient = new AsyncHttpClient()
-    livyClient = new LivyRestClient(httpClient, livyEndpoint)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/main/scala/com/cloudera/livy/test/framework/Cluster.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/main/scala/com/cloudera/livy/test/framework/Cluster.scala b/integration-test/src/main/scala/com/cloudera/livy/test/framework/Cluster.scala
deleted file mode 100644
index 06fa11d..0000000
--- a/integration-test/src/main/scala/com/cloudera/livy/test/framework/Cluster.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.framework
-
-import java.io._
-import java.nio.charset.StandardCharsets.UTF_8
-import java.util.Properties
-
-import scala.collection.JavaConverters._
-import scala.util.Try
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.yarn.client.api.YarnClient
-
-import com.cloudera.livy.Logging
-
-/**
- * An common interface to run test on real cluster and mini cluster.
- */
-trait Cluster {
-  def deploy(): Unit
-  def cleanUp(): Unit
-  def configDir(): File
-  def isRealSpark(): Boolean
-  def hasSparkR(): Boolean
-
-  def runLivy(): Unit
-  def stopLivy(): Unit
-  def livyEndpoint: String
-  def hdfsScratchDir(): Path
-
-  def doAsClusterUser[T](task: => T): T
-
-  lazy val hadoopConf = {
-    val conf = new Configuration(false)
-    configDir().listFiles().foreach { f =>
-      if (f.getName().endsWith(".xml")) {
-        conf.addResource(new Path(f.toURI()))
-      }
-    }
-    conf
-  }
-
-  lazy val yarnConf = {
-    val conf = new Configuration(false)
-    conf.addResource(new Path(s"${configDir().getCanonicalPath}/yarn-site.xml"))
-    conf
-  }
-
-  lazy val fs = doAsClusterUser {
-    FileSystem.get(hadoopConf)
-  }
-
-  lazy val yarnClient = doAsClusterUser {
-    val c = YarnClient.createYarnClient()
-    c.init(yarnConf)
-    c.start()
-    c
-  }
-}
-
-object Cluster extends Logging {
-  private val CLUSTER_TYPE = "cluster.type"
-
-  private lazy val config = {
-    sys.props.get("cluster.spec")
-      .filter { path => path.nonEmpty && path != "default" }
-      .map { path =>
-        val in = Option(getClass.getClassLoader.getResourceAsStream(path))
-          .getOrElse(new FileInputStream(path))
-        val p = new Properties()
-        val reader = new InputStreamReader(in, UTF_8)
-        try {
-          p.load(reader)
-        } finally {
-          reader.close()
-        }
-        p.asScala.toMap
-      }
-      .getOrElse(Map(CLUSTER_TYPE -> "mini"))
-  }
-
-  private lazy val cluster = {
-    var _cluster: Cluster = null
-    try {
-      _cluster = config.get(CLUSTER_TYPE) match {
-        case Some("real") => new RealCluster(config)
-        case Some("mini") => new MiniCluster(config)
-        case t => throw new Exception(s"Unknown or unset cluster.type $t")
-      }
-      Runtime.getRuntime.addShutdownHook(new Thread {
-        override def run(): Unit = {
-          info("Shutting down cluster pool.")
-          _cluster.cleanUp()
-        }
-      })
-      _cluster.deploy()
-    } catch {
-      case e: Throwable =>
-        error("Failed to initialize cluster.", e)
-        Option(_cluster).foreach { c =>
-          Try(c.cleanUp()).recover { case e =>
-            error("Furthermore, failed to clean up cluster after failure.", e)
-          }
-        }
-        throw e
-    }
-    _cluster
-  }
-
-  def get(): Cluster = cluster
-
-  def isRunningOnTravis: Boolean = sys.env.contains("TRAVIS")
-}
-
-trait ClusterUtils {
-
-  protected def saveProperties(props: Map[String, String], dest: File): Unit = {
-    val jprops = new Properties()
-    props.foreach { case (k, v) => jprops.put(k, v) }
-
-    val tempFile = new File(dest.getAbsolutePath() + ".tmp")
-    val out = new OutputStreamWriter(new FileOutputStream(tempFile), UTF_8)
-    try {
-      jprops.store(out, "Configuration")
-    } finally {
-      out.close()
-    }
-    tempFile.renameTo(dest)
-  }
-
-  protected def loadProperties(file: File): Map[String, String] = {
-    val in = new InputStreamReader(new FileInputStream(file), UTF_8)
-    val props = new Properties()
-    try {
-      props.load(in)
-    } finally {
-      in.close()
-    }
-    props.asScala.toMap
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/main/scala/com/cloudera/livy/test/framework/LivyRestClient.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/main/scala/com/cloudera/livy/test/framework/LivyRestClient.scala b/integration-test/src/main/scala/com/cloudera/livy/test/framework/LivyRestClient.scala
deleted file mode 100644
index ec63193..0000000
--- a/integration-test/src/main/scala/com/cloudera/livy/test/framework/LivyRestClient.scala
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.cloudera.livy.test.framework
-
-import java.util.regex.Pattern
-import javax.servlet.http.HttpServletResponse
-
-import scala.annotation.tailrec
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.util.{Either, Left, Right}
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import com.ning.http.client.AsyncHttpClient
-import com.ning.http.client.Response
-import org.apache.hadoop.yarn.api.records.ApplicationId
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.scalatest.concurrent.Eventually._
-
-import com.cloudera.livy.server.batch.CreateBatchRequest
-import com.cloudera.livy.server.interactive.CreateInteractiveRequest
-import com.cloudera.livy.sessions.{Kind, SessionKindModule, SessionState}
-import com.cloudera.livy.utils.AppInfo
-
-object LivyRestClient {
-  private val BATCH_TYPE = "batches"
-  private val INTERACTIVE_TYPE = "sessions"
-
-  // TODO Define these in production code and share them with test code.
-  @JsonIgnoreProperties(ignoreUnknown = true)
-  private case class StatementResult(id: Int, state: String, output: Map[String, Any])
-
-  @JsonIgnoreProperties(ignoreUnknown = true)
-  case class StatementError(ename: String, evalue: String, stackTrace: Seq[String])
-
-  @JsonIgnoreProperties(ignoreUnknown = true)
-  case class SessionSnapshot(
-    id: Int,
-    appId: Option[String],
-    state: String,
-    appInfo: AppInfo,
-    log: IndexedSeq[String])
-}
-
-class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String) {
-  import LivyRestClient._
-
-  val mapper = new ObjectMapper()
-    .registerModule(DefaultScalaModule)
-    .registerModule(new SessionKindModule())
-
-  class Session(val id: Int, sessionType: String) {
-    val url: String = s"$livyEndpoint/$sessionType/$id"
-
-    def appId(): ApplicationId = {
-      ConverterUtils.toApplicationId(snapshot().appId.get)
-    }
-
-    def snapshot(): SessionSnapshot = {
-      val r = httpClient.prepareGet(url).execute().get()
-      assertStatusCode(r, HttpServletResponse.SC_OK)
-
-      mapper.readValue(r.getResponseBodyAsStream, classOf[SessionSnapshot])
-    }
-
-    def stop(): Unit = {
-      httpClient.prepareDelete(url).execute().get()
-
-      eventually(timeout(30 seconds), interval(1 second)) {
-        verifySessionDoesNotExist()
-      }
-    }
-
-    def verifySessionState(state: SessionState): Unit = {
-      verifySessionState(Set(state))
-    }
-
-    def verifySessionState(states: Set[SessionState]): Unit = {
-      val t = if (Cluster.isRunningOnTravis) 5.minutes else 2.minutes
-      val strStates = states.map(_.toString)
-      // Travis uses very slow VM. It needs a longer timeout.
-      // Keeping the original timeout to avoid slowing down local development.
-      eventually(timeout(t), interval(1 second)) {
-        val s = snapshot().state
-        assert(strStates.contains(s), s"Session $id state $s doesn't equal one of $strStates")
-      }
-    }
-
-    def verifySessionDoesNotExist(): Unit = {
-      val r = httpClient.prepareGet(url).execute().get()
-      assertStatusCode(r, HttpServletResponse.SC_NOT_FOUND)
-    }
-  }
-
-  class BatchSession(id: Int) extends Session(id, BATCH_TYPE) {
-    def verifySessionDead(): Unit = verifySessionState(SessionState.Dead())
-    def verifySessionRunning(): Unit = verifySessionState(SessionState.Running())
-    def verifySessionSuccess(): Unit = verifySessionState(SessionState.Success())
-  }
-
-  class InteractiveSession(id: Int) extends Session(id, INTERACTIVE_TYPE) {
-    class Statement(code: String) {
-      val stmtId = {
-        val requestBody = Map("code" -> code)
-        val r = httpClient.preparePost(s"$url/statements")
-          .setBody(mapper.writeValueAsString(requestBody))
-          .execute()
-          .get()
-        assertStatusCode(r, HttpServletResponse.SC_CREATED)
-
-        val newStmt = mapper.readValue(r.getResponseBodyAsStream, classOf[StatementResult])
-        newStmt.id
-      }
-
-      final def result(): Either[String, StatementError] = {
-        eventually(timeout(1 minute), interval(1 second)) {
-          val r = httpClient.prepareGet(s"$url/statements/$stmtId")
-            .execute()
-            .get()
-          assertStatusCode(r, HttpServletResponse.SC_OK)
-
-          val newStmt = mapper.readValue(r.getResponseBodyAsStream, classOf[StatementResult])
-          assert(newStmt.state == "available", s"Statement isn't available: ${newStmt.state}")
-
-          val output = newStmt.output
-          output.get("status") match {
-            case Some("ok") =>
-              val data = output("data").asInstanceOf[Map[String, Any]]
-              var rst = data.getOrElse("text/plain", "")
-              val magicRst = data.getOrElse("application/vnd.livy.table.v1+json", null)
-              if (magicRst != null) {
-                rst = mapper.writeValueAsString(magicRst)
-              }
-              Left(rst.asInstanceOf[String])
-            case Some("error") => Right(mapper.convertValue(output, classOf[StatementError]))
-            case Some(status) =>
-              throw new IllegalStateException(s"Unknown statement $stmtId status: $status")
-            case None =>
-              throw new IllegalStateException(s"Unknown statement $stmtId output: $newStmt")
-          }
-        }
-      }
-
-      def verifyResult(expectedRegex: String): Unit = {
-        result() match {
-          case Left(result) =>
-            if (expectedRegex != null) {
-              matchStrings(result, expectedRegex)
-            }
-          case Right(error) =>
-            assert(false, s"Got error from statement $stmtId $code: ${error.evalue}")
-        }
-      }
-
-      def verifyError(
-          ename: String = null, evalue: String = null, stackTrace: String = null): Unit = {
-        result() match {
-          case Left(result) =>
-            assert(false, s"Statement $stmtId `$code` expected to fail, but succeeded.")
-          case Right(error) =>
-            val remoteStack = Option(error.stackTrace).getOrElse(Nil).mkString("\n")
-            Seq(error.ename -> ename, error.evalue -> evalue, remoteStack -> stackTrace).foreach {
-              case (actual, expected) if expected != null => matchStrings(actual, expected)
-              case _ =>
-            }
-        }
-      }
-
-      private def matchStrings(actual: String, expected: String): Unit = {
-        val regex = Pattern.compile(expected, Pattern.DOTALL)
-        assert(regex.matcher(actual).matches(), s"$actual did not match regex $expected")
-      }
-    }
-
-    def run(code: String): Statement = { new Statement(code) }
-
-    def runFatalStatement(code: String): Unit = {
-      val requestBody = Map("code" -> code)
-      val r = httpClient.preparePost(s"$url/statements")
-        .setBody(mapper.writeValueAsString(requestBody))
-        .execute()
-
-      verifySessionState(SessionState.Dead())
-    }
-
-    def verifySessionIdle(): Unit = {
-      verifySessionState(SessionState.Idle())
-    }
-  }
-
-  def startBatch(
-      file: String,
-      className: Option[String],
-      args: List[String],
-      sparkConf: Map[String, String]): BatchSession = {
-    val r = new CreateBatchRequest()
-    r.file = file
-    r.className = className
-    r.args = args
-    r.conf = Map("spark.yarn.maxAppAttempts" -> "1") ++ sparkConf
-
-    val id = start(BATCH_TYPE, mapper.writeValueAsString(r))
-    new BatchSession(id)
-  }
-
-  def startSession(
-      kind: Kind,
-      sparkConf: Map[String, String],
-      heartbeatTimeoutInSecond: Int): InteractiveSession = {
-    val r = new CreateInteractiveRequest()
-    r.kind = kind
-    r.conf = sparkConf
-    r.heartbeatTimeoutInSecond = heartbeatTimeoutInSecond
-
-    val id = start(INTERACTIVE_TYPE, mapper.writeValueAsString(r))
-    new InteractiveSession(id)
-  }
-
-  def connectSession(id: Int): InteractiveSession = { new InteractiveSession(id) }
-
-  private def start(sessionType: String, body: String): Int = {
-    val r = httpClient.preparePost(s"$livyEndpoint/$sessionType")
-      .setBody(body)
-      .execute()
-      .get()
-
-    assertStatusCode(r, HttpServletResponse.SC_CREATED)
-
-    val newSession = mapper.readValue(r.getResponseBodyAsStream, classOf[SessionSnapshot])
-    newSession.id
-  }
-
-  private def assertStatusCode(r: Response, expected: Int): Unit = {
-    def pretty(r: Response): String = {
-      s"${r.getStatusCode} ${r.getResponseBody}"
-    }
-    assert(r.getStatusCode() == expected, s"HTTP status code != $expected: ${pretty(r)}")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/main/scala/com/cloudera/livy/test/framework/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/main/scala/com/cloudera/livy/test/framework/MiniCluster.scala b/integration-test/src/main/scala/com/cloudera/livy/test/framework/MiniCluster.scala
deleted file mode 100644
index edb25c6..0000000
--- a/integration-test/src/main/scala/com/cloudera/livy/test/framework/MiniCluster.scala
+++ /dev/null
@@ -1,386 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.framework
-
-import java.io._
-import java.nio.charset.Charset
-import java.nio.file.{Files, Paths}
-import javax.servlet.http.HttpServletResponse
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import com.ning.http.client.AsyncHttpClient
-import org.apache.commons.io.FileUtils
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hdfs.MiniDFSCluster
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.server.MiniYARNCluster
-import org.apache.spark.launcher.SparkLauncher
-import org.scalatest.concurrent.Eventually._
-
-import com.cloudera.livy.{LivyConf, Logging}
-import com.cloudera.livy.client.common.TestUtils
-import com.cloudera.livy.server.LivyServer
-
-private class MiniClusterConfig(val config: Map[String, String]) {
-
-  val nmCount = getInt("yarn.nm-count", 1)
-  val localDirCount = getInt("yarn.local-dir-count", 1)
-  val logDirCount = getInt("yarn.log-dir-count", 1)
-  val dnCount = getInt("hdfs.dn-count", 1)
-
-  private def getInt(key: String, default: Int): Int = {
-    config.get(key).map(_.toInt).getOrElse(default)
-  }
-
-}
-
-sealed trait MiniClusterUtils extends ClusterUtils {
-  private val livySparkScalaVersionEnvVarName = "LIVY_SPARK_SCALA_VERSION"
-
-  protected def getSparkScalaVersion(): String = {
-    sys.env.getOrElse(livySparkScalaVersionEnvVarName, {
-      throw new RuntimeException(s"Please specify env var $livySparkScalaVersionEnvVarName.")
-    })
-  }
-
-  protected def saveConfig(conf: Configuration, dest: File): Unit = {
-    val redacted = new Configuration(conf)
-    // This setting references a test class that is not available when using a real Spark
-    // installation, so remove it from client configs.
-    redacted.unset("net.topology.node.switch.mapping.impl")
-
-    val out = new FileOutputStream(dest)
-    try {
-      redacted.writeXml(out)
-    } finally {
-      out.close()
-    }
-  }
-
-}
-
-sealed abstract class MiniClusterBase extends MiniClusterUtils with Logging {
-
-  def main(args: Array[String]): Unit = {
-    val klass = getClass().getSimpleName()
-
-    info(s"$klass is starting up.")
-
-    val Array(configPath) = args
-    val config = {
-      val file = new File(s"$configPath/cluster.conf")
-      val props = loadProperties(file)
-      new MiniClusterConfig(props)
-    }
-    start(config, configPath)
-
-    info(s"$klass running.")
-
-    while (true) synchronized {
-      wait()
-    }
-  }
-
-  protected def start(config: MiniClusterConfig, configPath: String): Unit
-
-}
-
-object MiniHdfsMain extends MiniClusterBase {
-
-  override protected def start(config: MiniClusterConfig, configPath: String): Unit = {
-    val hadoopConf = new Configuration()
-    val hdfsCluster = new MiniDFSCluster.Builder(hadoopConf)
-      .numDataNodes(config.dnCount)
-      .format(true)
-      .waitSafeMode(true)
-      .build()
-
-    hdfsCluster.waitActive()
-
-    saveConfig(hadoopConf, new File(configPath + "/core-site.xml"))
-  }
-
-}
-
-object MiniYarnMain extends MiniClusterBase {
-
-  override protected def start(config: MiniClusterConfig, configPath: String): Unit = {
-    val baseConfig = new YarnConfiguration()
-    var yarnCluster = new MiniYARNCluster(getClass().getName(), config.nmCount,
-      config.localDirCount, config.logDirCount)
-    yarnCluster.init(baseConfig)
-
-    // Install a shutdown hook for stop the service and kill all running applications.
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      override def run(): Unit = yarnCluster.stop()
-    })
-
-    yarnCluster.start()
-
-    // Workaround for YARN-2642.
-    val yarnConfig = yarnCluster.getConfig()
-    eventually(timeout(30 seconds), interval(100 millis)) {
-      assert(yarnConfig.get(YarnConfiguration.RM_ADDRESS).split(":")(1) != "0",
-        "RM not up yes.")
-    }
-
-    info(s"RM address in configuration is ${yarnConfig.get(YarnConfiguration.RM_ADDRESS)}")
-    saveConfig(yarnConfig, new File(configPath + "/yarn-site.xml"))
-  }
-
-}
-
-object MiniLivyMain extends MiniClusterBase {
-  var livyUrl: Option[String] = None
-
-  def start(config: MiniClusterConfig, configPath: String): Unit = {
-    var livyConf = Map(
-      LivyConf.LIVY_SPARK_MASTER.key -> "yarn",
-      LivyConf.LIVY_SPARK_DEPLOY_MODE.key -> "cluster",
-      LivyConf.LIVY_SPARK_SCALA_VERSION.key -> getSparkScalaVersion(),
-      LivyConf.HEARTBEAT_WATCHDOG_INTERVAL.key -> "1s",
-      LivyConf.YARN_POLL_INTERVAL.key -> "500ms",
-      LivyConf.RECOVERY_MODE.key -> "recovery",
-      LivyConf.RECOVERY_STATE_STORE.key -> "filesystem",
-      LivyConf.RECOVERY_STATE_STORE_URL.key -> s"file://$configPath/state-store")
-
-    if (Cluster.isRunningOnTravis) {
-      livyConf ++= Map("livy.server.yarn.app-lookup-timeout" -> "2m")
-    }
-
-    saveProperties(livyConf, new File(configPath + "/livy.conf"))
-
-    val server = new LivyServer()
-    server.start()
-    server.livyConf.set(LivyConf.ENABLE_HIVE_CONTEXT, true)
-    // Write a serverUrl.conf file to the conf directory with the location of the Livy
-    // server. Do it atomically since it's used by MiniCluster to detect when the Livy server
-    // is up and ready.
-    eventually(timeout(30 seconds), interval(1 second)) {
-      val serverUrlConf = Map("livy.server.server-url" -> server.serverUrl())
-      saveProperties(serverUrlConf, new File(configPath + "/serverUrl.conf"))
-    }
-  }
-}
-
-private case class ProcessInfo(process: Process, logFile: File)
-
-/**
- * Cluster implementation that uses HDFS / YARN mini clusters running as sub-processes locally.
- * Launching Livy through this mini cluster results in three child processes:
- *
- * - A HDFS mini cluster
- * - A YARN mini cluster
- * - The Livy server
- *
- * Each service will write its client configuration to a temporary directory managed by the
- * framework, so that applications can connect to the services.
- *
- * TODO: add support for MiniKdc.
- */
-class MiniCluster(config: Map[String, String]) extends Cluster with MiniClusterUtils with Logging {
-  private val tempDir = new File(s"${sys.props("java.io.tmpdir")}/livy-int-test")
-  private var sparkConfDir: File = _
-  private var _configDir: File = _
-  private var hdfs: Option[ProcessInfo] = None
-  private var yarn: Option[ProcessInfo] = None
-  private var livy: Option[ProcessInfo] = None
-  private var livyUrl: String = _
-  private var _hdfsScrathDir: Path = _
-
-  override def configDir(): File = _configDir
-
-  override def hdfsScratchDir(): Path = _hdfsScrathDir
-
-  override def isRealSpark(): Boolean = {
-    new File(sys.env("SPARK_HOME") + File.separator + "RELEASE").isFile()
-  }
-
-  override def hasSparkR(): Boolean = {
-    val path = Seq(sys.env("SPARK_HOME"), "R", "lib", "sparkr.zip").mkString(File.separator)
-    new File(path).isFile()
-  }
-
-  override def doAsClusterUser[T](task: => T): T = task
-
-  // Explicitly remove the "test-lib" dependency from the classpath of child processes. We
-  // want tests to explicitly upload this jar when necessary, to test those code paths.
-  private val childClasspath = {
-    val cp = sys.props("java.class.path").split(File.pathSeparator)
-    val filtered = cp.filter { path => !new File(path).getName().startsWith("livy-test-lib-") }
-    assert(cp.size != filtered.size, "livy-test-lib jar not found in classpath!")
-    filtered.mkString(File.pathSeparator)
-  }
-
-  override def deploy(): Unit = {
-    if (tempDir.exists()) {
-      FileUtils.deleteQuietly(tempDir)
-    }
-    assert(tempDir.mkdir(), "Cannot create temp test dir.")
-    sparkConfDir = mkdir("spark-conf")
-
-    // When running a real Spark cluster, don't set the classpath.
-    val extraCp = if (!isRealSpark()) {
-      val sparkScalaVersion = getSparkScalaVersion()
-      val classPathFile =
-        new File(s"minicluster-dependencies/scala-$sparkScalaVersion/target/classpath")
-      assert(classPathFile.isFile,
-        s"Cannot read MiniCluster classpath file: ${classPathFile.getCanonicalPath}")
-      val sparkClassPath =
-        FileUtils.readFileToString(classPathFile, Charset.defaultCharset())
-
-      val dummyJar = Files.createTempFile(Paths.get(tempDir.toURI), "dummy", "jar").toFile
-      Map(
-        SparkLauncher.DRIVER_EXTRA_CLASSPATH -> sparkClassPath,
-        SparkLauncher.EXECUTOR_EXTRA_CLASSPATH -> sparkClassPath,
-        // Used for Spark 2.0. Spark 2.0 will upload specified jars to distributed cache in yarn
-        // mode, if not specified it will check jars folder. Here since jars folder is not
-        // existed, so it will throw exception.
-        "spark.yarn.jars" -> dummyJar.getAbsolutePath)
-    } else {
-      Map()
-    }
-
-    val sparkConf = extraCp ++ Map(
-      "spark.executor.instances" -> "1",
-      "spark.scheduler.minRegisteredResourcesRatio" -> "0.0",
-      "spark.ui.enabled" -> "false",
-      SparkLauncher.DRIVER_MEMORY -> "512m",
-      SparkLauncher.EXECUTOR_MEMORY -> "512m",
-      SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS -> "-Dtest.appender=console",
-      SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS -> "-Dtest.appender=console"
-    )
-    saveProperties(sparkConf, new File(sparkConfDir, "spark-defaults.conf"))
-
-    _configDir = mkdir("hadoop-conf")
-    saveProperties(config, new File(configDir, "cluster.conf"))
-    hdfs = Some(start(MiniHdfsMain.getClass, new File(configDir, "core-site.xml")))
-    yarn = Some(start(MiniYarnMain.getClass, new File(configDir, "yarn-site.xml")))
-    runLivy()
-
-    _hdfsScrathDir = fs.makeQualified(new Path("/"))
-  }
-
-  override def cleanUp(): Unit = {
-    Seq(hdfs, yarn, livy).flatten.foreach(stop)
-    hdfs = None
-    yarn = None
-    livy = None
-  }
-
-  def runLivy(): Unit = {
-    assert(!livy.isDefined)
-    val confFile = new File(configDir, "serverUrl.conf")
-    val jacocoArgs = Option(TestUtils.getJacocoArgs())
-      .map { args =>
-        Seq(args, s"-Djacoco.args=$args")
-      }.getOrElse(Nil)
-    val localLivy = start(MiniLivyMain.getClass, confFile, extraJavaArgs = jacocoArgs)
-
-    val props = loadProperties(confFile)
-    livyUrl = props("livy.server.server-url")
-
-    // Wait until Livy server responds.
-    val httpClient = new AsyncHttpClient()
-    eventually(timeout(30 seconds), interval(1 second)) {
-      val res = httpClient.prepareGet(livyUrl + "/metrics").execute().get()
-      assert(res.getStatusCode() == HttpServletResponse.SC_OK)
-    }
-
-    livy = Some(localLivy)
-  }
-
-  def stopLivy(): Unit = {
-    assert(livy.isDefined)
-    livy.foreach(stop)
-    livyUrl = null
-    livy = None
-  }
-
-  def livyEndpoint: String = livyUrl
-
-  private def mkdir(name: String, parent: File = tempDir): File = {
-    val dir = new File(parent, name)
-    if (!dir.exists()) {
-      assert(dir.mkdir(), s"Failed to create directory $name.")
-    }
-    dir
-  }
-
-  private def start(
-      klass: Class[_],
-      configFile: File,
-      extraJavaArgs: Seq[String] = Nil): ProcessInfo = {
-    val simpleName = klass.getSimpleName().stripSuffix("$")
-    val procDir = mkdir(simpleName)
-    val procTmp = mkdir("tmp", parent = procDir)
-
-    // Before starting anything, clean up previous running sessions.
-    sys.process.Process(s"pkill -f $simpleName") !
-
-    val java = sys.props("java.home") + "/bin/java"
-    val cmd =
-      Seq(
-        sys.props("java.home") + "/bin/java",
-        "-Dtest.appender=console",
-        "-Djava.io.tmpdir=" + procTmp.getAbsolutePath(),
-        "-cp", childClasspath + File.pathSeparator + configDir.getAbsolutePath(),
-        "-XX:MaxPermSize=256m") ++
-      extraJavaArgs ++
-      Seq(
-        klass.getName().stripSuffix("$"),
-        configDir.getAbsolutePath())
-
-    val logFile = new File(procDir, "output.log")
-    val pb = new ProcessBuilder(cmd.toArray: _*)
-      .directory(procDir)
-      .redirectErrorStream(true)
-      .redirectOutput(ProcessBuilder.Redirect.appendTo(logFile))
-
-    pb.environment().put("LIVY_CONF_DIR", configDir.getAbsolutePath())
-    pb.environment().put("HADOOP_CONF_DIR", configDir.getAbsolutePath())
-    pb.environment().put("SPARK_CONF_DIR", sparkConfDir.getAbsolutePath())
-    pb.environment().put("SPARK_LOCAL_IP", "127.0.0.1")
-
-    val child = pb.start()
-
-    // Wait for the config file to show up before returning, so that dependent services
-    // can see the configuration. Exit early if process dies.
-    eventually(timeout(30 seconds), interval(100 millis)) {
-      assert(configFile.isFile(), s"$simpleName hasn't started yet.")
-
-      try {
-        val exitCode = child.exitValue()
-        throw new IOException(s"Child process exited unexpectedly (exit code $exitCode)")
-      } catch {
-        case _: IllegalThreadStateException => // Try again.
-      }
-    }
-
-    ProcessInfo(child, logFile)
-  }
-
-  private def stop(svc: ProcessInfo): Unit = {
-    svc.process.destroy()
-    svc.process.waitFor()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/main/scala/com/cloudera/livy/test/framework/RealCluster.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/main/scala/com/cloudera/livy/test/framework/RealCluster.scala b/integration-test/src/main/scala/com/cloudera/livy/test/framework/RealCluster.scala
deleted file mode 100644
index 9c79829..0000000
--- a/integration-test/src/main/scala/com/cloudera/livy/test/framework/RealCluster.scala
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test.framework
-
-import java.io.{File, IOException}
-import java.nio.file.Files
-import java.security.PrivilegedExceptionAction
-import javax.servlet.http.HttpServletResponse._
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.sys.process._
-import scala.util.Random
-
-import com.decodified.scalassh._
-import com.ning.http.client.AsyncHttpClient
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.fs.permission.FsPermission
-import org.apache.hadoop.security.UserGroupInformation
-import org.scalatest.concurrent.Eventually._
-
-import com.cloudera.livy.{LivyConf, Logging}
-
-private class RealClusterConfig(config: Map[String, String]) {
-  val ip = config("ip")
-
-  val sshLogin = config("ssh.login")
-  val sshPubKey = config("ssh.pubkey")
-  val livyPort = config.getOrElse(LivyConf.SERVER_PORT.key, "8998").toInt
-  val livyClasspath = config.getOrElse("livy.classpath", "")
-
-  val deployLivy = config.getOrElse("deploy-livy", "true").toBoolean
-  val noDeployLivyHome = config.get("livy-home")
-
-  val sparkHome = config("env.spark_home")
-  val sparkConf = config.getOrElse("env.spark_conf", "/etc/spark/conf")
-  val hadoopConf = config.getOrElse("env.hadoop_conf", "/etc/hadoop/conf")
-
-  val javaHome = config.getOrElse("env.java_home", "/usr/java/default")
-}
-
-class RealCluster(_config: Map[String, String])
-  extends Cluster with ClusterUtils with Logging {
-
-  private val config = new RealClusterConfig(_config)
-
-  private var livyIsRunning = false
-  private var livyHomePath: String = _
-  private var livyEpoch = 0
-
-  private var _configDir: File = _
-
-  private var hdfsScratch: Path = _
-
-  private var sparkConfDir: String = _
-  private var tempDirPath: String = _
-  private var _hasSparkR: Boolean = _
-
-  override def isRealSpark(): Boolean = true
-
-  override def hasSparkR(): Boolean = _hasSparkR
-
-  override def configDir(): File = _configDir
-
-  override def hdfsScratchDir(): Path = hdfsScratch
-
-  override def doAsClusterUser[T](task: => T): T = {
-    val user = UserGroupInformation.createRemoteUser(config.sshLogin)
-    user.doAs(new PrivilegedExceptionAction[T] {
-      override def run(): T = task
-    })
-  }
-
-  private def sshClient[T](body: SshClient => SSH.Result[T]): T = {
-    val sshLogin = PublicKeyLogin(
-      config.sshLogin, None, config.sshPubKey :: Nil)
-    val hostConfig = HostConfig(login = sshLogin, hostKeyVerifier = HostKeyVerifiers.DontVerify)
-    SSH(config.ip, hostConfig)(body) match {
-      case Left(err) => throw new IOException(err)
-      case Right(result) => result
-    }
-  }
-
-  private def exec(cmd: String): CommandResult = {
-    info(s"Running command: $cmd")
-    val result = sshClient(_.exec(cmd))
-    result.exitCode match {
-      case Some(ec) if ec > 0 =>
-        throw new IOException(s"Command '$cmd' failed: $ec\n" +
-          s"stdout: ${result.stdOutAsString()}\n" +
-          s"stderr: ${result.stdErrAsString()}\n")
-      case _ =>
-    }
-    result
-  }
-
-  private def upload(local: String, remote: String): Unit = {
-    info(s"Uploading local path $local")
-    sshClient(_.upload(local, remote))
-  }
-
-  private def download(remote: String, local: String): Unit = {
-    info(s"Downloading remote path $remote")
-    sshClient(_.download(remote, local))
-  }
-
-  override def deploy(): Unit = {
-    // Make sure Livy is not running.
-    stopLivy()
-
-    // Check whether SparkR is supported in YARN (need the sparkr.zip archive).\
-    _hasSparkR = try {
-      exec(s"test -f ${config.sparkHome}/R/lib/sparkr.zip")
-      true
-    } catch {
-      case e: IOException => false
-    }
-
-    // Copy the remove Hadoop configuration to a local temp dir so that tests can use it to
-    // talk to HDFS and YARN.
-    val localTemp = new File(sys.props("java.io.tmpdir") + File.separator + "hadoop-conf")
-    download(config.hadoopConf, localTemp.getAbsolutePath())
-    _configDir = localTemp
-
-    // Create a temp directory where test files will be written.
-    tempDirPath = s"/tmp/livy-it-${Random.alphanumeric.take(16).mkString}"
-    exec(s"mkdir -p $tempDirPath")
-
-    // Also create an HDFS scratch directory for tests.
-    doAsClusterUser {
-      hdfsScratch = fs.makeQualified(
-        new Path(s"/tmp/livy-it-${Random.alphanumeric.take(16).mkString}"))
-      fs.mkdirs(hdfsScratch)
-      fs.setPermission(hdfsScratch, new FsPermission("777"))
-    }
-
-    // Create a copy of the Spark configuration, and make sure the master is "yarn-cluster".
-    sparkConfDir = s"$tempDirPath/spark-conf"
-    val sparkProps = s"$sparkConfDir/spark-defaults.conf"
-    Seq(
-      s"cp -Lr ${config.sparkConf} $sparkConfDir",
-      s"touch $sparkProps",
-      s"sed -i.old '/spark.master.*/d' $sparkProps",
-      s"sed -i.old '/spark.submit.deployMode.*/d' $sparkProps",
-      s"echo 'spark.master=yarn-cluster' >> $sparkProps"
-    ).foreach(exec)
-
-    if (config.deployLivy) {
-      try {
-        info(s"Deploying Livy to ${config.ip}...")
-        val version = sys.props("project.version")
-        val assemblyZip = new File(s"../assembly/target/livy-server-$version.zip")
-        assert(assemblyZip.isFile,
-          s"Can't find livy assembly zip at ${assemblyZip.getCanonicalPath}")
-        val assemblyName = assemblyZip.getName()
-
-        // SSH to the node to unzip and install Livy.
-        upload(assemblyZip.getCanonicalPath, s"$tempDirPath/$assemblyName")
-        exec(s"unzip -o $tempDirPath/$assemblyName -d $tempDirPath")
-        livyHomePath = s"$tempDirPath/livy-server-$version"
-        info(s"Deployed Livy to ${config.ip} at $livyHomePath.")
-      } catch {
-        case e: Exception =>
-          error(s"Failed to deploy Livy to ${config.ip}.", e)
-          cleanUp()
-          throw e
-      }
-    } else {
-      livyHomePath = config.noDeployLivyHome.get
-      info("Skipping deployment.")
-    }
-
-    runLivy()
-  }
-
-  override def cleanUp(): Unit = {
-    stopLivy()
-    if (tempDirPath != null) {
-      exec(s"rm -rf $tempDirPath")
-    }
-    if (hdfsScratch != null) {
-      doAsClusterUser {
-        // Cannot use the shared `fs` since this runs in a shutdown hook, and that instance
-        // may have been closed already.
-        val fs = FileSystem.newInstance(hadoopConf)
-        try {
-          fs.delete(hdfsScratch, true)
-        } finally {
-          fs.close()
-        }
-      }
-    }
-  }
-
-  override def runLivy(): Unit = synchronized {
-    assert(!livyIsRunning, "Livy is running already.")
-
-    val livyConf = Map(
-      "livy.server.port" -> config.livyPort.toString,
-      // "livy.server.recovery.mode=local",
-      "livy.environment" -> "development",
-      LivyConf.LIVY_SPARK_MASTER.key -> "yarn",
-      LivyConf.LIVY_SPARK_DEPLOY_MODE.key -> "cluster")
-    val livyConfFile = File.createTempFile("livy.", ".properties")
-    saveProperties(livyConf, livyConfFile)
-    upload(livyConfFile.getAbsolutePath(), s"$livyHomePath/conf/livy.conf")
-
-    val env = Map(
-        "JAVA_HOME" -> config.javaHome,
-        "HADOOP_CONF_DIR" -> config.hadoopConf,
-        "SPARK_CONF_DIR" -> sparkConfDir,
-        "SPARK_HOME" -> config.sparkHome,
-        "CLASSPATH" -> config.livyClasspath,
-        "LIVY_PID_DIR" -> s"$tempDirPath/pid",
-        "LIVY_LOG_DIR" -> s"$tempDirPath/logs",
-        "LIVY_MAX_LOG_FILES" -> "16",
-        "LIVY_IDENT_STRING" -> "it"
-      )
-    val livyEnvFile = File.createTempFile("livy-env.", ".sh")
-    saveProperties(env, livyEnvFile)
-    upload(livyEnvFile.getAbsolutePath(), s"$livyHomePath/conf/livy-env.sh")
-
-    info(s"Starting Livy @ port ${config.livyPort}...")
-    exec(s"env -i $livyHomePath/bin/livy-server start")
-    livyIsRunning = true
-
-    val httpClient = new AsyncHttpClient()
-    eventually(timeout(1 minute), interval(1 second)) {
-      assert(httpClient.prepareGet(livyEndpoint).execute().get().getStatusCode == SC_OK)
-    }
-    info(s"Started Livy.")
-  }
-
-  override def stopLivy(): Unit = synchronized {
-    info("Stopping Livy Server")
-    try {
-      exec(s"$livyHomePath/bin/livy-server stop")
-    } catch {
-      case e: Exception =>
-        if (livyIsRunning) {
-          throw e
-        }
-    }
-
-    if (livyIsRunning) {
-      // Wait a tiny bit so that the process finishes closing its output files.
-      Thread.sleep(2)
-
-      livyEpoch += 1
-      val logName = "livy-it-server.out"
-      val localName = s"livy-it-server-$livyEpoch.out"
-      val logPath = s"$tempDirPath/logs/$logName"
-      val localLog = sys.props("java.io.tmpdir") + File.separator + localName
-      download(logPath, localLog)
-      info(s"Log for epoch $livyEpoch available at $localLog")
-    }
-  }
-
-  override def livyEndpoint: String = s"http://${config.ip}:${config.livyPort}"
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/main/scala/org/apache/livy/test/framework/BaseIntegrationTestSuite.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/main/scala/org/apache/livy/test/framework/BaseIntegrationTestSuite.scala b/integration-test/src/main/scala/org/apache/livy/test/framework/BaseIntegrationTestSuite.scala
new file mode 100644
index 0000000..d770e7e
--- /dev/null
+++ b/integration-test/src/main/scala/org/apache/livy/test/framework/BaseIntegrationTestSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.test.framework
+
+import java.io.File
+import java.util.UUID
+
+import scala.concurrent._
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.control.NonFatal
+
+import com.ning.http.client.AsyncHttpClient
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.scalatest._
+
+abstract class BaseIntegrationTestSuite extends FunSuite with Matchers with BeforeAndAfterAll {
+  import scala.concurrent.ExecutionContext.Implicits.global
+
+  var cluster: Cluster = _
+  var httpClient: AsyncHttpClient = _
+  var livyClient: LivyRestClient = _
+
+  protected def livyEndpoint: String = cluster.livyEndpoint
+
+  protected val testLib = sys.props("java.class.path")
+    .split(File.pathSeparator)
+    .find(new File(_).getName().startsWith("livy-test-lib-"))
+    .getOrElse(throw new Exception(s"Cannot find test lib in ${sys.props("java.class.path")}"))
+
+  protected def getYarnLog(appId: String): String = {
+    require(appId != null, "appId shouldn't be null")
+
+    val appReport = cluster.yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId))
+    assert(appReport != null, "appReport shouldn't be null")
+
+    appReport.getDiagnostics()
+  }
+
+  protected def restartLivy(): Unit = {
+    val f = future {
+      cluster.stopLivy()
+      cluster.runLivy()
+    }
+    Await.result(f, 3 minutes)
+  }
+
+  /** Uploads a file to HDFS and returns just its path. */
+  protected def uploadToHdfs(file: File): String = {
+    val hdfsPath = new Path(cluster.hdfsScratchDir(),
+      UUID.randomUUID().toString() + "-" + file.getName())
+    cluster.fs.copyFromLocalFile(new Path(file.toURI()), hdfsPath)
+    hdfsPath.toUri().getPath()
+  }
+
+  /** Wrapper around test() to be used by pyspark tests. */
+  protected def pytest(desc: String)(testFn: => Unit): Unit = {
+    test(desc) {
+      assume(cluster.isRealSpark(), "PySpark tests require a real Spark installation.")
+      testFn
+    }
+  }
+
+  /** Wrapper around test() to be used by SparkR tests. */
+  protected def rtest(desc: String)(testFn: => Unit): Unit = {
+    test(desc) {
+      assume(!sys.props.getOrElse("skipRTests", "false").toBoolean, "Skipping R tests.")
+      assume(cluster.isRealSpark(), "SparkR tests require a real Spark installation.")
+      assume(cluster.hasSparkR(), "Spark under test does not support R.")
+      testFn
+    }
+  }
+
+  /** Clean up session and show info when test fails. */
+  protected def withSession[S <: LivyRestClient#Session, R]
+    (s: S)
+    (f: (S) => R): R = {
+    try {
+      f(s)
+    } catch {
+      case NonFatal(e) =>
+        try {
+          val state = s.snapshot()
+          info(s"Final session state: $state")
+          state.appId.foreach { id => info(s"YARN diagnostics: ${getYarnLog(id)}") }
+        } catch { case NonFatal(_) => }
+        throw e
+    } finally {
+      try {
+        s.stop()
+      } catch {
+        case NonFatal(e) => alert(s"Failed to stop session: $e")
+      }
+    }
+  }
+
+  // We need beforeAll() here because BatchIT's beforeAll() has to be executed after this.
+  // Please create an issue if this breaks test logging for cluster creation.
+  protected override def beforeAll() = {
+    cluster = Cluster.get()
+    httpClient = new AsyncHttpClient()
+    livyClient = new LivyRestClient(httpClient, livyEndpoint)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/main/scala/org/apache/livy/test/framework/Cluster.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/main/scala/org/apache/livy/test/framework/Cluster.scala b/integration-test/src/main/scala/org/apache/livy/test/framework/Cluster.scala
new file mode 100644
index 0000000..e1b6844
--- /dev/null
+++ b/integration-test/src/main/scala/org/apache/livy/test/framework/Cluster.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.test.framework
+
+import java.io._
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.yarn.client.api.YarnClient
+
+import org.apache.livy.Logging
+
+/**
+ * An common interface to run test on real cluster and mini cluster.
+ */
+trait Cluster {
+  def deploy(): Unit
+  def cleanUp(): Unit
+  def configDir(): File
+  def isRealSpark(): Boolean
+  def hasSparkR(): Boolean
+
+  def runLivy(): Unit
+  def stopLivy(): Unit
+  def livyEndpoint: String
+  def hdfsScratchDir(): Path
+
+  def doAsClusterUser[T](task: => T): T
+
+  lazy val hadoopConf = {
+    val conf = new Configuration(false)
+    configDir().listFiles().foreach { f =>
+      if (f.getName().endsWith(".xml")) {
+        conf.addResource(new Path(f.toURI()))
+      }
+    }
+    conf
+  }
+
+  lazy val yarnConf = {
+    val conf = new Configuration(false)
+    conf.addResource(new Path(s"${configDir().getCanonicalPath}/yarn-site.xml"))
+    conf
+  }
+
+  lazy val fs = doAsClusterUser {
+    FileSystem.get(hadoopConf)
+  }
+
+  lazy val yarnClient = doAsClusterUser {
+    val c = YarnClient.createYarnClient()
+    c.init(yarnConf)
+    c.start()
+    c
+  }
+}
+
+object Cluster extends Logging {
+  private val CLUSTER_TYPE = "cluster.type"
+
+  private lazy val config = {
+    sys.props.get("cluster.spec")
+      .filter { path => path.nonEmpty && path != "default" }
+      .map { path =>
+        val in = Option(getClass.getClassLoader.getResourceAsStream(path))
+          .getOrElse(new FileInputStream(path))
+        val p = new Properties()
+        val reader = new InputStreamReader(in, UTF_8)
+        try {
+          p.load(reader)
+        } finally {
+          reader.close()
+        }
+        p.asScala.toMap
+      }
+      .getOrElse(Map(CLUSTER_TYPE -> "mini"))
+  }
+
+  private lazy val cluster = {
+    var _cluster: Cluster = null
+    try {
+      _cluster = config.get(CLUSTER_TYPE) match {
+        case Some("real") => new RealCluster(config)
+        case Some("mini") => new MiniCluster(config)
+        case t => throw new Exception(s"Unknown or unset cluster.type $t")
+      }
+      Runtime.getRuntime.addShutdownHook(new Thread {
+        override def run(): Unit = {
+          info("Shutting down cluster pool.")
+          _cluster.cleanUp()
+        }
+      })
+      _cluster.deploy()
+    } catch {
+      case e: Throwable =>
+        error("Failed to initialize cluster.", e)
+        Option(_cluster).foreach { c =>
+          Try(c.cleanUp()).recover { case e =>
+            error("Furthermore, failed to clean up cluster after failure.", e)
+          }
+        }
+        throw e
+    }
+    _cluster
+  }
+
+  def get(): Cluster = cluster
+
+  def isRunningOnTravis: Boolean = sys.env.contains("TRAVIS")
+}
+
+trait ClusterUtils {
+
+  protected def saveProperties(props: Map[String, String], dest: File): Unit = {
+    val jprops = new Properties()
+    props.foreach { case (k, v) => jprops.put(k, v) }
+
+    val tempFile = new File(dest.getAbsolutePath() + ".tmp")
+    val out = new OutputStreamWriter(new FileOutputStream(tempFile), UTF_8)
+    try {
+      jprops.store(out, "Configuration")
+    } finally {
+      out.close()
+    }
+    tempFile.renameTo(dest)
+  }
+
+  protected def loadProperties(file: File): Map[String, String] = {
+    val in = new InputStreamReader(new FileInputStream(file), UTF_8)
+    val props = new Properties()
+    try {
+      props.load(in)
+    } finally {
+      in.close()
+    }
+    props.asScala.toMap
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala b/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala
new file mode 100644
index 0000000..6d319c7
--- /dev/null
+++ b/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.livy.test.framework
+
+import java.util.regex.Pattern
+import javax.servlet.http.HttpServletResponse
+
+import scala.annotation.tailrec
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.{Either, Left, Right}
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.ning.http.client.AsyncHttpClient
+import com.ning.http.client.Response
+import org.apache.hadoop.yarn.api.records.ApplicationId
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.livy.server.batch.CreateBatchRequest
+import org.apache.livy.server.interactive.CreateInteractiveRequest
+import org.apache.livy.sessions.{Kind, SessionKindModule, SessionState}
+import org.apache.livy.utils.AppInfo
+
+object LivyRestClient {
+  private val BATCH_TYPE = "batches"
+  private val INTERACTIVE_TYPE = "sessions"
+
+  // TODO Define these in production code and share them with test code.
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  private case class StatementResult(id: Int, state: String, output: Map[String, Any])
+
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  case class StatementError(ename: String, evalue: String, stackTrace: Seq[String])
+
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  case class SessionSnapshot(
+    id: Int,
+    appId: Option[String],
+    state: String,
+    appInfo: AppInfo,
+    log: IndexedSeq[String])
+}
+
+class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String) {
+  import LivyRestClient._
+
+  val mapper = new ObjectMapper()
+    .registerModule(DefaultScalaModule)
+    .registerModule(new SessionKindModule())
+
+  class Session(val id: Int, sessionType: String) {
+    val url: String = s"$livyEndpoint/$sessionType/$id"
+
+    def appId(): ApplicationId = {
+      ConverterUtils.toApplicationId(snapshot().appId.get)
+    }
+
+    def snapshot(): SessionSnapshot = {
+      val r = httpClient.prepareGet(url).execute().get()
+      assertStatusCode(r, HttpServletResponse.SC_OK)
+
+      mapper.readValue(r.getResponseBodyAsStream, classOf[SessionSnapshot])
+    }
+
+    def stop(): Unit = {
+      httpClient.prepareDelete(url).execute().get()
+
+      eventually(timeout(30 seconds), interval(1 second)) {
+        verifySessionDoesNotExist()
+      }
+    }
+
+    def verifySessionState(state: SessionState): Unit = {
+      verifySessionState(Set(state))
+    }
+
+    def verifySessionState(states: Set[SessionState]): Unit = {
+      val t = if (Cluster.isRunningOnTravis) 5.minutes else 2.minutes
+      val strStates = states.map(_.toString)
+      // Travis uses very slow VM. It needs a longer timeout.
+      // Keeping the original timeout to avoid slowing down local development.
+      eventually(timeout(t), interval(1 second)) {
+        val s = snapshot().state
+        assert(strStates.contains(s), s"Session $id state $s doesn't equal one of $strStates")
+      }
+    }
+
+    def verifySessionDoesNotExist(): Unit = {
+      val r = httpClient.prepareGet(url).execute().get()
+      assertStatusCode(r, HttpServletResponse.SC_NOT_FOUND)
+    }
+  }
+
+  class BatchSession(id: Int) extends Session(id, BATCH_TYPE) {
+    def verifySessionDead(): Unit = verifySessionState(SessionState.Dead())
+    def verifySessionRunning(): Unit = verifySessionState(SessionState.Running())
+    def verifySessionSuccess(): Unit = verifySessionState(SessionState.Success())
+  }
+
+  class InteractiveSession(id: Int) extends Session(id, INTERACTIVE_TYPE) {
+    class Statement(code: String) {
+      val stmtId = {
+        val requestBody = Map("code" -> code)
+        val r = httpClient.preparePost(s"$url/statements")
+          .setBody(mapper.writeValueAsString(requestBody))
+          .execute()
+          .get()
+        assertStatusCode(r, HttpServletResponse.SC_CREATED)
+
+        val newStmt = mapper.readValue(r.getResponseBodyAsStream, classOf[StatementResult])
+        newStmt.id
+      }
+
+      final def result(): Either[String, StatementError] = {
+        eventually(timeout(1 minute), interval(1 second)) {
+          val r = httpClient.prepareGet(s"$url/statements/$stmtId")
+            .execute()
+            .get()
+          assertStatusCode(r, HttpServletResponse.SC_OK)
+
+          val newStmt = mapper.readValue(r.getResponseBodyAsStream, classOf[StatementResult])
+          assert(newStmt.state == "available", s"Statement isn't available: ${newStmt.state}")
+
+          val output = newStmt.output
+          output.get("status") match {
+            case Some("ok") =>
+              val data = output("data").asInstanceOf[Map[String, Any]]
+              var rst = data.getOrElse("text/plain", "")
+              val magicRst = data.getOrElse("application/vnd.livy.table.v1+json", null)
+              if (magicRst != null) {
+                rst = mapper.writeValueAsString(magicRst)
+              }
+              Left(rst.asInstanceOf[String])
+            case Some("error") => Right(mapper.convertValue(output, classOf[StatementError]))
+            case Some(status) =>
+              throw new IllegalStateException(s"Unknown statement $stmtId status: $status")
+            case None =>
+              throw new IllegalStateException(s"Unknown statement $stmtId output: $newStmt")
+          }
+        }
+      }
+
+      def verifyResult(expectedRegex: String): Unit = {
+        result() match {
+          case Left(result) =>
+            if (expectedRegex != null) {
+              matchStrings(result, expectedRegex)
+            }
+          case Right(error) =>
+            assert(false, s"Got error from statement $stmtId $code: ${error.evalue}")
+        }
+      }
+
+      def verifyError(
+          ename: String = null, evalue: String = null, stackTrace: String = null): Unit = {
+        result() match {
+          case Left(result) =>
+            assert(false, s"Statement $stmtId `$code` expected to fail, but succeeded.")
+          case Right(error) =>
+            val remoteStack = Option(error.stackTrace).getOrElse(Nil).mkString("\n")
+            Seq(error.ename -> ename, error.evalue -> evalue, remoteStack -> stackTrace).foreach {
+              case (actual, expected) if expected != null => matchStrings(actual, expected)
+              case _ =>
+            }
+        }
+      }
+
+      private def matchStrings(actual: String, expected: String): Unit = {
+        val regex = Pattern.compile(expected, Pattern.DOTALL)
+        assert(regex.matcher(actual).matches(), s"$actual did not match regex $expected")
+      }
+    }
+
+    def run(code: String): Statement = { new Statement(code) }
+
+    def runFatalStatement(code: String): Unit = {
+      val requestBody = Map("code" -> code)
+      val r = httpClient.preparePost(s"$url/statements")
+        .setBody(mapper.writeValueAsString(requestBody))
+        .execute()
+
+      verifySessionState(SessionState.Dead())
+    }
+
+    def verifySessionIdle(): Unit = {
+      verifySessionState(SessionState.Idle())
+    }
+  }
+
+  def startBatch(
+      file: String,
+      className: Option[String],
+      args: List[String],
+      sparkConf: Map[String, String]): BatchSession = {
+    val r = new CreateBatchRequest()
+    r.file = file
+    r.className = className
+    r.args = args
+    r.conf = Map("spark.yarn.maxAppAttempts" -> "1") ++ sparkConf
+
+    val id = start(BATCH_TYPE, mapper.writeValueAsString(r))
+    new BatchSession(id)
+  }
+
+  def startSession(
+      kind: Kind,
+      sparkConf: Map[String, String],
+      heartbeatTimeoutInSecond: Int): InteractiveSession = {
+    val r = new CreateInteractiveRequest()
+    r.kind = kind
+    r.conf = sparkConf
+    r.heartbeatTimeoutInSecond = heartbeatTimeoutInSecond
+
+    val id = start(INTERACTIVE_TYPE, mapper.writeValueAsString(r))
+    new InteractiveSession(id)
+  }
+
+  def connectSession(id: Int): InteractiveSession = { new InteractiveSession(id) }
+
+  private def start(sessionType: String, body: String): Int = {
+    val r = httpClient.preparePost(s"$livyEndpoint/$sessionType")
+      .setBody(body)
+      .execute()
+      .get()
+
+    assertStatusCode(r, HttpServletResponse.SC_CREATED)
+
+    val newSession = mapper.readValue(r.getResponseBodyAsStream, classOf[SessionSnapshot])
+    newSession.id
+  }
+
+  private def assertStatusCode(r: Response, expected: Int): Unit = {
+    def pretty(r: Response): String = {
+      s"${r.getStatusCode} ${r.getResponseBody}"
+    }
+    assert(r.getStatusCode() == expected, s"HTTP status code != $expected: ${pretty(r)}")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/main/scala/org/apache/livy/test/framework/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/main/scala/org/apache/livy/test/framework/MiniCluster.scala b/integration-test/src/main/scala/org/apache/livy/test/framework/MiniCluster.scala
new file mode 100644
index 0000000..005a3e9
--- /dev/null
+++ b/integration-test/src/main/scala/org/apache/livy/test/framework/MiniCluster.scala
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.test.framework
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import javax.servlet.http.HttpServletResponse
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import com.ning.http.client.AsyncHttpClient
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hdfs.MiniDFSCluster
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.server.MiniYARNCluster
+import org.apache.spark.launcher.SparkLauncher
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.livy.{LivyConf, Logging}
+import org.apache.livy.client.common.TestUtils
+import org.apache.livy.server.LivyServer
+
+private class MiniClusterConfig(val config: Map[String, String]) {
+
+  val nmCount = getInt("yarn.nm-count", 1)
+  val localDirCount = getInt("yarn.local-dir-count", 1)
+  val logDirCount = getInt("yarn.log-dir-count", 1)
+  val dnCount = getInt("hdfs.dn-count", 1)
+
+  private def getInt(key: String, default: Int): Int = {
+    config.get(key).map(_.toInt).getOrElse(default)
+  }
+
+}
+
+sealed trait MiniClusterUtils extends ClusterUtils {
+  private val livySparkScalaVersionEnvVarName = "LIVY_SPARK_SCALA_VERSION"
+
+  protected def getSparkScalaVersion(): String = {
+    sys.env.getOrElse(livySparkScalaVersionEnvVarName, {
+      throw new RuntimeException(s"Please specify env var $livySparkScalaVersionEnvVarName.")
+    })
+  }
+
+  protected def saveConfig(conf: Configuration, dest: File): Unit = {
+    val redacted = new Configuration(conf)
+    // This setting references a test class that is not available when using a real Spark
+    // installation, so remove it from client configs.
+    redacted.unset("net.topology.node.switch.mapping.impl")
+
+    val out = new FileOutputStream(dest)
+    try {
+      redacted.writeXml(out)
+    } finally {
+      out.close()
+    }
+  }
+
+}
+
+sealed abstract class MiniClusterBase extends MiniClusterUtils with Logging {
+
+  def main(args: Array[String]): Unit = {
+    val klass = getClass().getSimpleName()
+
+    info(s"$klass is starting up.")
+
+    val Array(configPath) = args
+    val config = {
+      val file = new File(s"$configPath/cluster.conf")
+      val props = loadProperties(file)
+      new MiniClusterConfig(props)
+    }
+    start(config, configPath)
+
+    info(s"$klass running.")
+
+    while (true) synchronized {
+      wait()
+    }
+  }
+
+  protected def start(config: MiniClusterConfig, configPath: String): Unit
+
+}
+
+object MiniHdfsMain extends MiniClusterBase {
+
+  override protected def start(config: MiniClusterConfig, configPath: String): Unit = {
+    val hadoopConf = new Configuration()
+    val hdfsCluster = new MiniDFSCluster.Builder(hadoopConf)
+      .numDataNodes(config.dnCount)
+      .format(true)
+      .waitSafeMode(true)
+      .build()
+
+    hdfsCluster.waitActive()
+
+    saveConfig(hadoopConf, new File(configPath + "/core-site.xml"))
+  }
+
+}
+
+object MiniYarnMain extends MiniClusterBase {
+
+  override protected def start(config: MiniClusterConfig, configPath: String): Unit = {
+    val baseConfig = new YarnConfiguration()
+    var yarnCluster = new MiniYARNCluster(getClass().getName(), config.nmCount,
+      config.localDirCount, config.logDirCount)
+    yarnCluster.init(baseConfig)
+
+    // Install a shutdown hook for stop the service and kill all running applications.
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      override def run(): Unit = yarnCluster.stop()
+    })
+
+    yarnCluster.start()
+
+    // Workaround for YARN-2642.
+    val yarnConfig = yarnCluster.getConfig()
+    eventually(timeout(30 seconds), interval(100 millis)) {
+      assert(yarnConfig.get(YarnConfiguration.RM_ADDRESS).split(":")(1) != "0",
+        "RM not up yes.")
+    }
+
+    info(s"RM address in configuration is ${yarnConfig.get(YarnConfiguration.RM_ADDRESS)}")
+    saveConfig(yarnConfig, new File(configPath + "/yarn-site.xml"))
+  }
+
+}
+
+object MiniLivyMain extends MiniClusterBase {
+  var livyUrl: Option[String] = None
+
+  def start(config: MiniClusterConfig, configPath: String): Unit = {
+    var livyConf = Map(
+      LivyConf.LIVY_SPARK_MASTER.key -> "yarn",
+      LivyConf.LIVY_SPARK_DEPLOY_MODE.key -> "cluster",
+      LivyConf.LIVY_SPARK_SCALA_VERSION.key -> getSparkScalaVersion(),
+      LivyConf.HEARTBEAT_WATCHDOG_INTERVAL.key -> "1s",
+      LivyConf.YARN_POLL_INTERVAL.key -> "500ms",
+      LivyConf.RECOVERY_MODE.key -> "recovery",
+      LivyConf.RECOVERY_STATE_STORE.key -> "filesystem",
+      LivyConf.RECOVERY_STATE_STORE_URL.key -> s"file://$configPath/state-store")
+
+    if (Cluster.isRunningOnTravis) {
+      livyConf ++= Map("livy.server.yarn.app-lookup-timeout" -> "2m")
+    }
+
+    saveProperties(livyConf, new File(configPath + "/livy.conf"))
+
+    val server = new LivyServer()
+    server.start()
+    server.livyConf.set(LivyConf.ENABLE_HIVE_CONTEXT, true)
+    // Write a serverUrl.conf file to the conf directory with the location of the Livy
+    // server. Do it atomically since it's used by MiniCluster to detect when the Livy server
+    // is up and ready.
+    eventually(timeout(30 seconds), interval(1 second)) {
+      val serverUrlConf = Map("livy.server.server-url" -> server.serverUrl())
+      saveProperties(serverUrlConf, new File(configPath + "/serverUrl.conf"))
+    }
+  }
+}
+
+private case class ProcessInfo(process: Process, logFile: File)
+
+/**
+ * Cluster implementation that uses HDFS / YARN mini clusters running as sub-processes locally.
+ * Launching Livy through this mini cluster results in three child processes:
+ *
+ * - A HDFS mini cluster
+ * - A YARN mini cluster
+ * - The Livy server
+ *
+ * Each service will write its client configuration to a temporary directory managed by the
+ * framework, so that applications can connect to the services.
+ *
+ * TODO: add support for MiniKdc.
+ */
+class MiniCluster(config: Map[String, String]) extends Cluster with MiniClusterUtils with Logging {
+  private val tempDir = new File(s"${sys.props("java.io.tmpdir")}/livy-int-test")
+  private var sparkConfDir: File = _
+  private var _configDir: File = _
+  private var hdfs: Option[ProcessInfo] = None
+  private var yarn: Option[ProcessInfo] = None
+  private var livy: Option[ProcessInfo] = None
+  private var livyUrl: String = _
+  private var _hdfsScrathDir: Path = _
+
+  override def configDir(): File = _configDir
+
+  override def hdfsScratchDir(): Path = _hdfsScrathDir
+
+  override def isRealSpark(): Boolean = {
+    new File(sys.env("SPARK_HOME") + File.separator + "RELEASE").isFile()
+  }
+
+  override def hasSparkR(): Boolean = {
+    val path = Seq(sys.env("SPARK_HOME"), "R", "lib", "sparkr.zip").mkString(File.separator)
+    new File(path).isFile()
+  }
+
+  override def doAsClusterUser[T](task: => T): T = task
+
+  // Explicitly remove the "test-lib" dependency from the classpath of child processes. We
+  // want tests to explicitly upload this jar when necessary, to test those code paths.
+  private val childClasspath = {
+    val cp = sys.props("java.class.path").split(File.pathSeparator)
+    val filtered = cp.filter { path => !new File(path).getName().startsWith("livy-test-lib-") }
+    assert(cp.size != filtered.size, "livy-test-lib jar not found in classpath!")
+    filtered.mkString(File.pathSeparator)
+  }
+
+  override def deploy(): Unit = {
+    if (tempDir.exists()) {
+      FileUtils.deleteQuietly(tempDir)
+    }
+    assert(tempDir.mkdir(), "Cannot create temp test dir.")
+    sparkConfDir = mkdir("spark-conf")
+
+    // When running a real Spark cluster, don't set the classpath.
+    val extraCp = if (!isRealSpark()) {
+      val sparkScalaVersion = getSparkScalaVersion()
+      val classPathFile =
+        new File(s"minicluster-dependencies/scala-$sparkScalaVersion/target/classpath")
+      assert(classPathFile.isFile,
+        s"Cannot read MiniCluster classpath file: ${classPathFile.getCanonicalPath}")
+      val sparkClassPath =
+        FileUtils.readFileToString(classPathFile, Charset.defaultCharset())
+
+      val dummyJar = Files.createTempFile(Paths.get(tempDir.toURI), "dummy", "jar").toFile
+      Map(
+        SparkLauncher.DRIVER_EXTRA_CLASSPATH -> sparkClassPath,
+        SparkLauncher.EXECUTOR_EXTRA_CLASSPATH -> sparkClassPath,
+        // Used for Spark 2.0. Spark 2.0 will upload specified jars to distributed cache in yarn
+        // mode, if not specified it will check jars folder. Here since jars folder is not
+        // existed, so it will throw exception.
+        "spark.yarn.jars" -> dummyJar.getAbsolutePath)
+    } else {
+      Map()
+    }
+
+    val sparkConf = extraCp ++ Map(
+      "spark.executor.instances" -> "1",
+      "spark.scheduler.minRegisteredResourcesRatio" -> "0.0",
+      "spark.ui.enabled" -> "false",
+      SparkLauncher.DRIVER_MEMORY -> "512m",
+      SparkLauncher.EXECUTOR_MEMORY -> "512m",
+      SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS -> "-Dtest.appender=console",
+      SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS -> "-Dtest.appender=console"
+    )
+    saveProperties(sparkConf, new File(sparkConfDir, "spark-defaults.conf"))
+
+    _configDir = mkdir("hadoop-conf")
+    saveProperties(config, new File(configDir, "cluster.conf"))
+    hdfs = Some(start(MiniHdfsMain.getClass, new File(configDir, "core-site.xml")))
+    yarn = Some(start(MiniYarnMain.getClass, new File(configDir, "yarn-site.xml")))
+    runLivy()
+
+    _hdfsScrathDir = fs.makeQualified(new Path("/"))
+  }
+
+  override def cleanUp(): Unit = {
+    Seq(hdfs, yarn, livy).flatten.foreach(stop)
+    hdfs = None
+    yarn = None
+    livy = None
+  }
+
+  def runLivy(): Unit = {
+    assert(!livy.isDefined)
+    val confFile = new File(configDir, "serverUrl.conf")
+    val jacocoArgs = Option(TestUtils.getJacocoArgs())
+      .map { args =>
+        Seq(args, s"-Djacoco.args=$args")
+      }.getOrElse(Nil)
+    val localLivy = start(MiniLivyMain.getClass, confFile, extraJavaArgs = jacocoArgs)
+
+    val props = loadProperties(confFile)
+    livyUrl = props("livy.server.server-url")
+
+    // Wait until Livy server responds.
+    val httpClient = new AsyncHttpClient()
+    eventually(timeout(30 seconds), interval(1 second)) {
+      val res = httpClient.prepareGet(livyUrl + "/metrics").execute().get()
+      assert(res.getStatusCode() == HttpServletResponse.SC_OK)
+    }
+
+    livy = Some(localLivy)
+  }
+
+  def stopLivy(): Unit = {
+    assert(livy.isDefined)
+    livy.foreach(stop)
+    livyUrl = null
+    livy = None
+  }
+
+  def livyEndpoint: String = livyUrl
+
+  private def mkdir(name: String, parent: File = tempDir): File = {
+    val dir = new File(parent, name)
+    if (!dir.exists()) {
+      assert(dir.mkdir(), s"Failed to create directory $name.")
+    }
+    dir
+  }
+
+  private def start(
+      klass: Class[_],
+      configFile: File,
+      extraJavaArgs: Seq[String] = Nil): ProcessInfo = {
+    val simpleName = klass.getSimpleName().stripSuffix("$")
+    val procDir = mkdir(simpleName)
+    val procTmp = mkdir("tmp", parent = procDir)
+
+    // Before starting anything, clean up previous running sessions.
+    sys.process.Process(s"pkill -f $simpleName") !
+
+    val java = sys.props("java.home") + "/bin/java"
+    val cmd =
+      Seq(
+        sys.props("java.home") + "/bin/java",
+        "-Dtest.appender=console",
+        "-Djava.io.tmpdir=" + procTmp.getAbsolutePath(),
+        "-cp", childClasspath + File.pathSeparator + configDir.getAbsolutePath(),
+        "-XX:MaxPermSize=256m") ++
+      extraJavaArgs ++
+      Seq(
+        klass.getName().stripSuffix("$"),
+        configDir.getAbsolutePath())
+
+    val logFile = new File(procDir, "output.log")
+    val pb = new ProcessBuilder(cmd.toArray: _*)
+      .directory(procDir)
+      .redirectErrorStream(true)
+      .redirectOutput(ProcessBuilder.Redirect.appendTo(logFile))
+
+    pb.environment().put("LIVY_CONF_DIR", configDir.getAbsolutePath())
+    pb.environment().put("HADOOP_CONF_DIR", configDir.getAbsolutePath())
+    pb.environment().put("SPARK_CONF_DIR", sparkConfDir.getAbsolutePath())
+    pb.environment().put("SPARK_LOCAL_IP", "127.0.0.1")
+
+    val child = pb.start()
+
+    // Wait for the config file to show up before returning, so that dependent services
+    // can see the configuration. Exit early if process dies.
+    eventually(timeout(30 seconds), interval(100 millis)) {
+      assert(configFile.isFile(), s"$simpleName hasn't started yet.")
+
+      try {
+        val exitCode = child.exitValue()
+        throw new IOException(s"Child process exited unexpectedly (exit code $exitCode)")
+      } catch {
+        case _: IllegalThreadStateException => // Try again.
+      }
+    }
+
+    ProcessInfo(child, logFile)
+  }
+
+  private def stop(svc: ProcessInfo): Unit = {
+    svc.process.destroy()
+    svc.process.waitFor()
+  }
+
+}