You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/07/05 03:06:37 UTC

[08/33] incubator-livy git commit: LIVY-375. Change Livy code package name to org.apache.livy

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala
deleted file mode 100644
index 7d7b94e..0000000
--- a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala
+++ /dev/null
@@ -1,609 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.server.interactive
-
-import java.io.{File, InputStream}
-import java.net.URI
-import java.nio.ByteBuffer
-import java.nio.file.{Files, Paths}
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicLong
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.concurrent.Future
-import scala.concurrent.duration.{Duration, FiniteDuration}
-import scala.util.Random
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties
-import com.google.common.annotations.VisibleForTesting
-import org.apache.hadoop.fs.Path
-import org.apache.spark.launcher.SparkLauncher
-
-import com.cloudera.livy._
-import com.cloudera.livy.client.common.HttpMessages._
-import com.cloudera.livy.rsc.{PingJob, RSCClient, RSCConf}
-import com.cloudera.livy.rsc.driver.Statement
-import com.cloudera.livy.server.recovery.SessionStore
-import com.cloudera.livy.sessions._
-import com.cloudera.livy.sessions.Session._
-import com.cloudera.livy.sessions.SessionState.Dead
-import com.cloudera.livy.util.LineBufferedProcess
-import com.cloudera.livy.utils.{AppInfo, LivySparkUtils, SparkApp, SparkAppListener}
-
-@JsonIgnoreProperties(ignoreUnknown = true)
-case class InteractiveRecoveryMetadata(
-    id: Int,
-    appId: Option[String],
-    appTag: String,
-    kind: Kind,
-    heartbeatTimeoutS: Int,
-    owner: String,
-    proxyUser: Option[String],
-    rscDriverUri: Option[URI],
-    version: Int = 1)
-  extends RecoveryMetadata
-
-object InteractiveSession extends Logging {
-  private[interactive] val SPARK_YARN_IS_PYTHON = "spark.yarn.isPython"
-
-  val RECOVERY_SESSION_TYPE = "interactive"
-
-  def create(
-      id: Int,
-      owner: String,
-      proxyUser: Option[String],
-      livyConf: LivyConf,
-      request: CreateInteractiveRequest,
-      sessionStore: SessionStore,
-      mockApp: Option[SparkApp] = None,
-      mockClient: Option[RSCClient] = None): InteractiveSession = {
-    val appTag = s"livy-session-$id-${Random.alphanumeric.take(8).mkString}"
-
-    val client = mockClient.orElse {
-      val conf = SparkApp.prepareSparkConf(appTag, livyConf, prepareConf(
-        request.conf, request.jars, request.files, request.archives, request.pyFiles, livyConf))
-
-      val builderProperties = prepareBuilderProp(conf, request.kind, livyConf)
-
-      val userOpts: Map[String, Option[String]] = Map(
-        "spark.driver.cores" -> request.driverCores.map(_.toString),
-        SparkLauncher.DRIVER_MEMORY -> request.driverMemory.map(_.toString),
-        SparkLauncher.EXECUTOR_CORES -> request.executorCores.map(_.toString),
-        SparkLauncher.EXECUTOR_MEMORY -> request.executorMemory.map(_.toString),
-        "spark.executor.instances" -> request.numExecutors.map(_.toString),
-        "spark.app.name" -> request.name.map(_.toString),
-        "spark.yarn.queue" -> request.queue
-      )
-
-      userOpts.foreach { case (key, opt) =>
-        opt.foreach { value => builderProperties.put(key, value) }
-      }
-
-      builderProperties.getOrElseUpdate("spark.app.name", s"livy-session-$id")
-
-      info(s"Creating Interactive session $id: [owner: $owner, request: $request]")
-      val builder = new LivyClientBuilder()
-        .setAll(builderProperties.asJava)
-        .setConf("livy.client.session-id", id.toString)
-        .setConf(RSCConf.Entry.DRIVER_CLASS.key(), "com.cloudera.livy.repl.ReplDriver")
-        .setConf(RSCConf.Entry.PROXY_USER.key(), proxyUser.orNull)
-        .setURI(new URI("rsc:/"))
-
-      Option(builder.build().asInstanceOf[RSCClient])
-    }
-
-    new InteractiveSession(
-      id,
-      None,
-      appTag,
-      client,
-      SessionState.Starting(),
-      request.kind,
-      request.heartbeatTimeoutInSecond,
-      livyConf,
-      owner,
-      proxyUser,
-      sessionStore,
-      mockApp)
-  }
-
-  def recover(
-      metadata: InteractiveRecoveryMetadata,
-      livyConf: LivyConf,
-      sessionStore: SessionStore,
-      mockApp: Option[SparkApp] = None,
-      mockClient: Option[RSCClient] = None): InteractiveSession = {
-    val client = mockClient.orElse(metadata.rscDriverUri.map { uri =>
-      val builder = new LivyClientBuilder().setURI(uri)
-      builder.build().asInstanceOf[RSCClient]
-    })
-
-    new InteractiveSession(
-      metadata.id,
-      metadata.appId,
-      metadata.appTag,
-      client,
-      SessionState.Recovering(),
-      metadata.kind,
-      metadata.heartbeatTimeoutS,
-      livyConf,
-      metadata.owner,
-      metadata.proxyUser,
-      sessionStore,
-      mockApp)
-  }
-
-  @VisibleForTesting
-  private[interactive] def prepareBuilderProp(
-    conf: Map[String, String],
-    kind: Kind,
-    livyConf: LivyConf): mutable.Map[String, String] = {
-
-    val builderProperties = mutable.Map[String, String]()
-    builderProperties ++= conf
-
-    def livyJars(livyConf: LivyConf, scalaVersion: String): List[String] = {
-      Option(livyConf.get(LivyConf.REPL_JARS)).map { jars =>
-        val regex = """[\w-]+_(\d\.\d\d).*\.jar""".r
-        jars.split(",").filter { name => new Path(name).getName match {
-            // Filter out unmatched scala jars
-            case regex(ver) => ver == scalaVersion
-            // Keep all the java jars end with ".jar"
-            case _ => name.endsWith(".jar")
-          }
-        }.toList
-      }.getOrElse {
-        val home = sys.env("LIVY_HOME")
-        val jars = Option(new File(home, s"repl_$scalaVersion-jars"))
-          .filter(_.isDirectory())
-          .getOrElse(new File(home, s"repl/scala-$scalaVersion/target/jars"))
-        require(jars.isDirectory(), "Cannot find Livy REPL jars.")
-        jars.listFiles().map(_.getAbsolutePath()).toList
-      }
-    }
-
-    def findSparkRArchive(): Option[String] = {
-      Option(livyConf.get(RSCConf.Entry.SPARKR_PACKAGE.key())).orElse {
-        sys.env.get("SPARK_HOME").map { case sparkHome =>
-          val path = Seq(sparkHome, "R", "lib", "sparkr.zip").mkString(File.separator)
-          val rArchivesFile = new File(path)
-          require(rArchivesFile.exists(), "sparkr.zip not found; cannot run sparkr application.")
-          rArchivesFile.getAbsolutePath()
-        }
-      }
-    }
-
-    def datanucleusJars(livyConf: LivyConf, sparkMajorVersion: Int): Seq[String] = {
-      if (sys.env.getOrElse("LIVY_INTEGRATION_TEST", "false").toBoolean) {
-        // datanucleus jars has already been in classpath in integration test
-        Seq.empty
-      } else {
-        val sparkHome = livyConf.sparkHome().get
-        val libdir = sparkMajorVersion match {
-          case 1 =>
-            if (new File(sparkHome, "RELEASE").isFile) {
-              new File(sparkHome, "lib")
-            } else {
-              new File(sparkHome, "lib_managed/jars")
-            }
-          case 2 =>
-            if (new File(sparkHome, "RELEASE").isFile) {
-              new File(sparkHome, "jars")
-            } else if (new File(sparkHome, "assembly/target/scala-2.11/jars").isDirectory) {
-              new File(sparkHome, "assembly/target/scala-2.11/jars")
-            } else {
-              new File(sparkHome, "assembly/target/scala-2.10/jars")
-            }
-          case v =>
-            throw new RuntimeException("Unsupported spark major version:" + sparkMajorVersion)
-        }
-        val jars = if (!libdir.isDirectory) {
-          Seq.empty[String]
-        } else {
-          libdir.listFiles().filter(_.getName.startsWith("datanucleus-"))
-            .map(_.getAbsolutePath).toSeq
-        }
-        if (jars.isEmpty) {
-          warn("datanucleus jars can not be found")
-        }
-        jars
-      }
-    }
-
-    /**
-     * Look for hive-site.xml (for now just ignore spark.files defined in spark-defaults.conf)
-     * 1. First look for hive-site.xml in user request
-     * 2. Then look for that under classpath
-     * @param livyConf
-     * @return  (hive-site.xml path, whether it is provided by user)
-     */
-    def hiveSiteFile(sparkFiles: Array[String], livyConf: LivyConf): (Option[File], Boolean) = {
-      if (sparkFiles.exists(_.split("/").last == "hive-site.xml")) {
-        (None, true)
-      } else {
-        val hiveSiteURL = getClass.getResource("/hive-site.xml")
-        if (hiveSiteURL != null && hiveSiteURL.getProtocol == "file") {
-          (Some(new File(hiveSiteURL.toURI)), false)
-        } else {
-          (None, false)
-        }
-      }
-    }
-
-    def findPySparkArchives(): Seq[String] = {
-      Option(livyConf.get(RSCConf.Entry.PYSPARK_ARCHIVES))
-        .map(_.split(",").toSeq)
-        .getOrElse {
-          sys.env.get("SPARK_HOME") .map { case sparkHome =>
-            val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
-            val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
-            require(pyArchivesFile.exists(),
-              "pyspark.zip not found; cannot run pyspark application in YARN mode.")
-
-            val py4jFile = Files.newDirectoryStream(Paths.get(pyLibPath), "py4j-*-src.zip")
-              .iterator()
-              .next()
-              .toFile
-
-            require(py4jFile.exists(),
-              "py4j-*-src.zip not found; cannot run pyspark application in YARN mode.")
-            Seq(pyArchivesFile.getAbsolutePath, py4jFile.getAbsolutePath)
-          }.getOrElse(Seq())
-        }
-    }
-
-    def mergeConfList(list: Seq[String], key: String): Unit = {
-      if (list.nonEmpty) {
-        builderProperties.get(key) match {
-          case None =>
-            builderProperties.put(key, list.mkString(","))
-          case Some(oldList) =>
-            val newList = (oldList :: list.toList).mkString(",")
-            builderProperties.put(key, newList)
-        }
-      }
-    }
-
-    def mergeHiveSiteAndHiveDeps(sparkMajorVersion: Int): Unit = {
-      val sparkFiles = conf.get("spark.files").map(_.split(",")).getOrElse(Array.empty[String])
-      hiveSiteFile(sparkFiles, livyConf) match {
-        case (_, true) =>
-          debug("Enable HiveContext because hive-site.xml is found in user request.")
-          mergeConfList(datanucleusJars(livyConf, sparkMajorVersion), LivyConf.SPARK_JARS)
-        case (Some(file), false) =>
-          debug("Enable HiveContext because hive-site.xml is found under classpath, "
-            + file.getAbsolutePath)
-          mergeConfList(List(file.getAbsolutePath), LivyConf.SPARK_FILES)
-          mergeConfList(datanucleusJars(livyConf, sparkMajorVersion), LivyConf.SPARK_JARS)
-        case (None, false) =>
-          warn("Enable HiveContext but no hive-site.xml found under" +
-            " classpath or user request.")
-      }
-    }
-
-    kind match {
-      case PySpark() | PySpark3() =>
-        val pySparkFiles = if (!LivyConf.TEST_MODE) findPySparkArchives() else Nil
-        mergeConfList(pySparkFiles, LivyConf.SPARK_PY_FILES)
-        builderProperties.put(SPARK_YARN_IS_PYTHON, "true")
-      case SparkR() =>
-        val sparkRArchive = if (!LivyConf.TEST_MODE) findSparkRArchive() else None
-        sparkRArchive.foreach { archive =>
-          builderProperties.put(RSCConf.Entry.SPARKR_PACKAGE.key(), archive + "#sparkr")
-        }
-      case _ =>
-    }
-    builderProperties.put(RSCConf.Entry.SESSION_KIND.key, kind.toString)
-
-    // Set Livy.rsc.jars from livy conf to rsc conf, RSC conf will take precedence if both are set.
-    Option(livyConf.get(LivyConf.RSC_JARS)).foreach(
-      builderProperties.getOrElseUpdate(RSCConf.Entry.LIVY_JARS.key(), _))
-
-    require(livyConf.get(LivyConf.LIVY_SPARK_VERSION) != null)
-    require(livyConf.get(LivyConf.LIVY_SPARK_SCALA_VERSION) != null)
-
-    val (sparkMajorVersion, _) =
-      LivySparkUtils.formatSparkVersion(livyConf.get(LivyConf.LIVY_SPARK_VERSION))
-    val scalaVersion = livyConf.get(LivyConf.LIVY_SPARK_SCALA_VERSION)
-
-    mergeConfList(livyJars(livyConf, scalaVersion), LivyConf.SPARK_JARS)
-    val enableHiveContext = livyConf.getBoolean(LivyConf.ENABLE_HIVE_CONTEXT)
-    // pass spark.livy.spark_major_version to driver
-    builderProperties.put("spark.livy.spark_major_version", sparkMajorVersion.toString)
-
-    if (sparkMajorVersion <= 1) {
-      builderProperties.put("spark.repl.enableHiveContext",
-        livyConf.getBoolean(LivyConf.ENABLE_HIVE_CONTEXT).toString)
-    } else {
-      val confVal = if (enableHiveContext) "hive" else "in-memory"
-      builderProperties.put("spark.sql.catalogImplementation", confVal)
-    }
-
-    if (enableHiveContext) {
-      mergeHiveSiteAndHiveDeps(sparkMajorVersion)
-    }
-
-    builderProperties
-  }
-}
-
-class InteractiveSession(
-    id: Int,
-    appIdHint: Option[String],
-    appTag: String,
-    client: Option[RSCClient],
-    initialState: SessionState,
-    val kind: Kind,
-    heartbeatTimeoutS: Int,
-    livyConf: LivyConf,
-    owner: String,
-    override val proxyUser: Option[String],
-    sessionStore: SessionStore,
-    mockApp: Option[SparkApp]) // For unit test.
-  extends Session(id, owner, livyConf)
-  with SessionHeartbeat
-  with SparkAppListener {
-
-  import InteractiveSession._
-
-  private var serverSideState: SessionState = initialState
-
-  override protected val heartbeatTimeout: FiniteDuration = {
-    val heartbeatTimeoutInSecond = heartbeatTimeoutS
-    Duration(heartbeatTimeoutInSecond, TimeUnit.SECONDS)
-  }
-  private val operations = mutable.Map[Long, String]()
-  private val operationCounter = new AtomicLong(0)
-  private var rscDriverUri: Option[URI] = None
-  private var sessionLog: IndexedSeq[String] = IndexedSeq.empty
-  private val sessionSaveLock = new Object()
-
-  _appId = appIdHint
-  sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
-  heartbeat()
-
-  private val app = mockApp.orElse {
-    if (livyConf.isRunningOnYarn()) {
-      val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
-        .map(new LineBufferedProcess(_))
-      // When Livy is running with YARN, SparkYarnApp can provide better YARN integration.
-      // (e.g. Reflect YARN application state to session state).
-      Option(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
-    } else {
-      // When Livy is running with other cluster manager, SparkApp doesn't provide any
-      // additional benefit over controlling RSCDriver using RSCClient. Don't use it.
-      None
-    }
-  }
-
-  if (client.isEmpty) {
-    transition(Dead())
-    val msg = s"Cannot recover interactive session $id because its RSCDriver URI is unknown."
-    info(msg)
-    sessionLog = IndexedSeq(msg)
-  } else {
-    val uriFuture = Future { client.get.getServerUri.get() }
-
-    uriFuture onSuccess { case url =>
-      rscDriverUri = Option(url)
-      sessionSaveLock.synchronized {
-        sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
-      }
-    }
-    uriFuture onFailure { case e => warn("Fail to get rsc uri", e) }
-
-    // Send a dummy job that will return once the client is ready to be used, and set the
-    // state to "idle" at that point.
-    client.get.submit(new PingJob()).addListener(new JobHandle.Listener[Void]() {
-      override def onJobQueued(job: JobHandle[Void]): Unit = { }
-      override def onJobStarted(job: JobHandle[Void]): Unit = { }
-
-      override def onJobCancelled(job: JobHandle[Void]): Unit = errorOut()
-
-      override def onJobFailed(job: JobHandle[Void], cause: Throwable): Unit = errorOut()
-
-      override def onJobSucceeded(job: JobHandle[Void], result: Void): Unit = {
-        transition(SessionState.Running())
-        info(s"Interactive session $id created [appid: ${appId.orNull}, owner: $owner, proxyUser:" +
-          s" $proxyUser, state: ${state.toString}, kind: ${kind.toString}, " +
-          s"info: ${appInfo.asJavaMap}]")
-      }
-
-      private def errorOut(): Unit = {
-        // Other code might call stop() to close the RPC channel. When RPC channel is closing,
-        // this callback might be triggered. Check and don't call stop() to avoid nested called
-        // if the session is already shutting down.
-        if (serverSideState != SessionState.ShuttingDown()) {
-          transition(SessionState.Error())
-          stop()
-          app.foreach { a =>
-            info(s"Failed to ping RSC driver for session $id. Killing application.")
-            a.kill()
-          }
-        }
-      }
-    })
-  }
-
-  override def logLines(): IndexedSeq[String] = app.map(_.log()).getOrElse(sessionLog)
-
-  override def recoveryMetadata: RecoveryMetadata =
-    InteractiveRecoveryMetadata(
-      id, appId, appTag, kind, heartbeatTimeout.toSeconds.toInt, owner, proxyUser, rscDriverUri)
-
-  override def state: SessionState = {
-    if (serverSideState.isInstanceOf[SessionState.Running]) {
-      // If session is in running state, return the repl state from RSCClient.
-      client
-        .flatMap(s => Option(s.getReplState))
-        .map(SessionState(_))
-        .getOrElse(SessionState.Busy()) // If repl state is unknown, assume repl is busy.
-    } else {
-      serverSideState
-    }
-  }
-
-  override def stopSession(): Unit = {
-    try {
-      transition(SessionState.ShuttingDown())
-      sessionStore.remove(RECOVERY_SESSION_TYPE, id)
-      client.foreach { _.stop(true) }
-    } catch {
-      case _: Exception =>
-        app.foreach {
-          warn(s"Failed to stop RSCDriver. Killing it...")
-          _.kill()
-        }
-    } finally {
-      transition(SessionState.Dead())
-    }
-  }
-
-  def statements: IndexedSeq[Statement] = {
-    ensureActive()
-    val r = client.get.getReplJobResults().get()
-    r.statements.toIndexedSeq
-  }
-
-  def getStatement(stmtId: Int): Option[Statement] = {
-    ensureActive()
-    val r = client.get.getReplJobResults(stmtId, 1).get()
-    if (r.statements.length < 1) {
-      None
-    } else {
-      Option(r.statements(0))
-    }
-  }
-
-  def interrupt(): Future[Unit] = {
-    stop()
-  }
-
-  def executeStatement(content: ExecuteRequest): Statement = {
-    ensureRunning()
-    recordActivity()
-
-    val id = client.get.submitReplCode(content.code).get
-    client.get.getReplJobResults(id, 1).get().statements(0)
-  }
-
-  def cancelStatement(statementId: Int): Unit = {
-    ensureRunning()
-    recordActivity()
-    client.get.cancelReplCode(statementId)
-  }
-
-  def runJob(job: Array[Byte]): Long = {
-    performOperation(job, true)
-  }
-
-  def submitJob(job: Array[Byte]): Long = {
-    performOperation(job, false)
-  }
-
-  def addFile(fileStream: InputStream, fileName: String): Unit = {
-    addFile(copyResourceToHDFS(fileStream, fileName))
-  }
-
-  def addJar(jarStream: InputStream, jarName: String): Unit = {
-    addJar(copyResourceToHDFS(jarStream, jarName))
-  }
-
-  def addFile(uri: URI): Unit = {
-    ensureActive()
-    recordActivity()
-    client.get.addFile(resolveURI(uri, livyConf)).get()
-  }
-
-  def addJar(uri: URI): Unit = {
-    ensureActive()
-    recordActivity()
-    client.get.addJar(resolveURI(uri, livyConf)).get()
-  }
-
-  def jobStatus(id: Long): Any = {
-    ensureActive()
-    val clientJobId = operations(id)
-    recordActivity()
-    // TODO: don't block indefinitely?
-    val status = client.get.getBypassJobStatus(clientJobId).get()
-    new JobStatus(id, status.state, status.result, status.error)
-  }
-
-  def cancelJob(id: Long): Unit = {
-    ensureActive()
-    recordActivity()
-    operations.remove(id).foreach { client.get.cancel }
-  }
-
-  private def transition(newState: SessionState) = synchronized {
-    // When a statement returns an error, the session should transit to error state.
-    // If the session crashed because of the error, the session should instead go to dead state.
-    // Since these 2 transitions are triggered by different threads, there's a race condition.
-    // Make sure we won't transit from dead to error state.
-    val areSameStates = serverSideState.getClass() == newState.getClass()
-    val transitFromInactiveToActive = !serverSideState.isActive && newState.isActive
-    if (!areSameStates && !transitFromInactiveToActive) {
-      debug(s"$this session state change from ${serverSideState} to $newState")
-      serverSideState = newState
-    }
-  }
-
-  private def ensureActive(): Unit = synchronized {
-    require(serverSideState.isActive, "Session isn't active.")
-    require(client.isDefined, "Session is active but client hasn't been created.")
-  }
-
-  private def ensureRunning(): Unit = synchronized {
-    serverSideState match {
-      case SessionState.Running() =>
-      case _ =>
-        throw new IllegalStateException("Session is in state %s" format serverSideState)
-    }
-  }
-
-  private def performOperation(job: Array[Byte], sync: Boolean): Long = {
-    ensureActive()
-    recordActivity()
-    val future = client.get.bypass(ByteBuffer.wrap(job), sync)
-    val opId = operationCounter.incrementAndGet()
-    operations(opId) = future
-    opId
-   }
-
-  override def appIdKnown(appId: String): Unit = {
-    _appId = Option(appId)
-    sessionSaveLock.synchronized {
-      sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
-    }
-  }
-
-  override def stateChanged(oldState: SparkApp.State, newState: SparkApp.State): Unit = {
-    synchronized {
-      debug(s"$this app state changed from $oldState to $newState")
-      newState match {
-        case SparkApp.State.FINISHED | SparkApp.State.KILLED | SparkApp.State.FAILED =>
-          transition(SessionState.Dead())
-        case _ =>
-      }
-    }
-  }
-
-  override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSessionServlet.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSessionServlet.scala b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSessionServlet.scala
deleted file mode 100644
index c2263db..0000000
--- a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSessionServlet.scala
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.server.interactive
-
-import java.net.URI
-import javax.servlet.http.HttpServletRequest
-
-import scala.collection.JavaConverters._
-import scala.concurrent._
-import scala.concurrent.duration._
-
-import org.json4s.jackson.Json4sScalaModule
-import org.scalatra._
-import org.scalatra.servlet.FileUploadSupport
-
-import com.cloudera.livy.{ExecuteRequest, JobHandle, LivyConf, Logging}
-import com.cloudera.livy.client.common.HttpMessages
-import com.cloudera.livy.client.common.HttpMessages._
-import com.cloudera.livy.rsc.driver.Statement
-import com.cloudera.livy.server.SessionServlet
-import com.cloudera.livy.server.recovery.SessionStore
-import com.cloudera.livy.sessions._
-
-object InteractiveSessionServlet extends Logging
-
-class InteractiveSessionServlet(
-    sessionManager: InteractiveSessionManager,
-    sessionStore: SessionStore,
-    livyConf: LivyConf)
-  extends SessionServlet(sessionManager, livyConf)
-  with SessionHeartbeatNotifier[InteractiveSession, InteractiveRecoveryMetadata]
-  with FileUploadSupport
-{
-
-  mapper.registerModule(new SessionKindModule())
-    .registerModule(new Json4sScalaModule())
-
-  override protected def createSession(req: HttpServletRequest): InteractiveSession = {
-    val createRequest = bodyAs[CreateInteractiveRequest](req)
-    val proxyUser = checkImpersonation(createRequest.proxyUser, req)
-    InteractiveSession.create(
-      sessionManager.nextId(),
-      remoteUser(req),
-      proxyUser,
-      livyConf,
-      createRequest,
-      sessionStore)
-  }
-
-  override protected[interactive] def clientSessionView(
-      session: InteractiveSession,
-      req: HttpServletRequest): Any = {
-    val logs =
-      if (hasAccess(session.owner, req)) {
-        Option(session.logLines())
-          .map { lines =>
-            val size = 10
-            var from = math.max(0, lines.length - size)
-            val until = from + size
-
-            lines.view(from, until)
-          }
-          .getOrElse(Nil)
-      } else {
-        Nil
-      }
-
-    new SessionInfo(session.id, session.appId.orNull, session.owner, session.proxyUser.orNull,
-      session.state.toString, session.kind.toString, session.appInfo.asJavaMap, logs.asJava)
-  }
-
-  post("/:id/stop") {
-    withSession { session =>
-      Await.ready(session.stop(), Duration.Inf)
-      NoContent()
-    }
-  }
-
-  post("/:id/interrupt") {
-    withSession { session =>
-      Await.ready(session.interrupt(), Duration.Inf)
-      Ok(Map("msg" -> "interrupted"))
-    }
-  }
-
-  get("/:id/statements") {
-    withSession { session =>
-      val statements = session.statements
-      val from = params.get("from").map(_.toInt).getOrElse(0)
-      val size = params.get("size").map(_.toInt).getOrElse(statements.length)
-
-      Map(
-        "total_statements" -> statements.length,
-        "statements" -> statements.view(from, from + size)
-      )
-    }
-  }
-
-  val getStatement = get("/:id/statements/:statementId") {
-    withSession { session =>
-      val statementId = params("statementId").toInt
-
-      session.getStatement(statementId).getOrElse(NotFound("Statement not found"))
-    }
-  }
-
-  jpost[ExecuteRequest]("/:id/statements") { req =>
-    withSession { session =>
-      val statement = session.executeStatement(req)
-
-      Created(statement,
-        headers = Map(
-          "Location" -> url(getStatement,
-            "id" -> session.id.toString,
-            "statementId" -> statement.id.toString)))
-    }
-  }
-
-  post("/:id/statements/:statementId/cancel") {
-    withSession { session =>
-      val statementId = params("statementId")
-      session.cancelStatement(statementId.toInt)
-      Ok(Map("msg" -> "canceled"))
-    }
-  }
-  // This endpoint is used by the client-http module to "connect" to an existing session and
-  // update its last activity time. It performs authorization checks to make sure the caller
-  // has access to the session, so even though it returns the same data, it behaves differently
-  // from get("/:id").
-  post("/:id/connect") {
-    withSession { session =>
-      session.recordActivity()
-      Ok(clientSessionView(session, request))
-    }
-  }
-
-  jpost[SerializedJob]("/:id/submit-job") { req =>
-    withSession { session =>
-      try {
-      require(req.job != null && req.job.length > 0, "no job provided.")
-      val jobId = session.submitJob(req.job)
-      Created(new JobStatus(jobId, JobHandle.State.SENT, null, null))
-      } catch {
-        case e: Throwable =>
-          e.printStackTrace()
-        throw e
-      }
-    }
-  }
-
-  jpost[SerializedJob]("/:id/run-job") { req =>
-    withSession { session =>
-      require(req.job != null && req.job.length > 0, "no job provided.")
-      val jobId = session.runJob(req.job)
-      Created(new JobStatus(jobId, JobHandle.State.SENT, null, null))
-    }
-  }
-
-  post("/:id/upload-jar") {
-    withSession { lsession =>
-      fileParams.get("jar") match {
-        case Some(file) =>
-          lsession.addJar(file.getInputStream, file.name)
-        case None =>
-          BadRequest("No jar sent!")
-      }
-    }
-  }
-
-  post("/:id/upload-pyfile") {
-    withSession { lsession =>
-      fileParams.get("file") match {
-        case Some(file) =>
-          lsession.addJar(file.getInputStream, file.name)
-        case None =>
-          BadRequest("No file sent!")
-      }
-    }
-  }
-
-  post("/:id/upload-file") {
-    withSession { lsession =>
-      fileParams.get("file") match {
-        case Some(file) =>
-          lsession.addFile(file.getInputStream, file.name)
-        case None =>
-          BadRequest("No file sent!")
-      }
-    }
-  }
-
-  jpost[AddResource]("/:id/add-jar") { req =>
-    withSession { lsession =>
-      addJarOrPyFile(req, lsession)
-    }
-  }
-
-  jpost[AddResource]("/:id/add-pyfile") { req =>
-    withSession { lsession =>
-      lsession.kind match {
-        case PySpark() | PySpark3() => addJarOrPyFile(req, lsession)
-        case _ => BadRequest("Only supported for pyspark sessions.")
-      }
-    }
-  }
-
-  jpost[AddResource]("/:id/add-file") { req =>
-    withSession { lsession =>
-      val uri = new URI(req.uri)
-      lsession.addFile(uri)
-    }
-  }
-
-  get("/:id/jobs/:jobid") {
-    withSession { lsession =>
-      val jobId = params("jobid").toLong
-      Ok(lsession.jobStatus(jobId))
-    }
-  }
-
-  post("/:id/jobs/:jobid/cancel") {
-    withSession { lsession =>
-      val jobId = params("jobid").toLong
-      lsession.cancelJob(jobId)
-    }
-  }
-
-  private def addJarOrPyFile(req: HttpMessages.AddResource, session: InteractiveSession): Unit = {
-    val uri = new URI(req.uri)
-    session.addJar(uri)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/interactive/SessionHeartbeat.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/server/interactive/SessionHeartbeat.scala b/server/src/main/scala/com/cloudera/livy/server/interactive/SessionHeartbeat.scala
deleted file mode 100644
index 20bc582..0000000
--- a/server/src/main/scala/com/cloudera/livy/server/interactive/SessionHeartbeat.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.cloudera.livy.server.interactive
-
-import java.util.Date
-
-import scala.concurrent.duration.{Deadline, Duration, FiniteDuration}
-
-import com.cloudera.livy.sessions.Session.RecoveryMetadata
-import com.cloudera.livy.LivyConf
-import com.cloudera.livy.server.SessionServlet
-import com.cloudera.livy.sessions.{Session, SessionManager}
-
-/**
-  * A session trait to provide heartbeat expiration check.
-  * Note: Session will not expire if heartbeat() was never called.
-  */
-trait SessionHeartbeat {
-  protected val heartbeatTimeout: FiniteDuration
-
-  private var _lastHeartbeat: Date = _ // For reporting purpose
-  private var heartbeatDeadline: Option[Deadline] = None
-
-  def heartbeat(): Unit = synchronized {
-    if (heartbeatTimeout > Duration.Zero) {
-      heartbeatDeadline = Some(heartbeatTimeout.fromNow)
-    }
-
-    _lastHeartbeat = new Date()
-  }
-
-  def lastHeartbeat: Date = synchronized { _lastHeartbeat }
-
-  def heartbeatExpired: Boolean = synchronized { heartbeatDeadline.exists(_.isOverdue()) }
-}
-
-/**
-  * Servlet can mixin this trait to update session's heartbeat
-  * whenever a /sessions/:id REST call is made. e.g. GET /sessions/:id
-  * Note: GET /sessions doesn't update heartbeats.
-  */
-trait SessionHeartbeatNotifier[S <: Session with SessionHeartbeat, R <: RecoveryMetadata]
-  extends SessionServlet[S, R] {
-
-  abstract override protected def withUnprotectedSession(fn: (S => Any)): Any = {
-    super.withUnprotectedSession { s =>
-      s.heartbeat()
-      fn(s)
-    }
-  }
-
-  abstract override protected def withSession(fn: (S => Any)): Any = {
-    super.withSession { s =>
-      s.heartbeat()
-      fn(s)
-    }
-  }
-}
-
-/**
-  * A SessionManager trait.
-  * It will create a thread that periodically deletes sessions with expired heartbeat.
-  */
-trait SessionHeartbeatWatchdog[S <: Session with SessionHeartbeat, R <: RecoveryMetadata] {
-  self: SessionManager[S, R] =>
-
-  private val watchdogThread = new Thread(s"HeartbeatWatchdog-${self.getClass.getName}") {
-    override def run(): Unit = {
-      val interval = livyConf.getTimeAsMs(LivyConf.HEARTBEAT_WATCHDOG_INTERVAL)
-      info("Heartbeat watchdog thread started.")
-      while (true) {
-        deleteExpiredSessions()
-        Thread.sleep(interval)
-      }
-    }
-  }
-
-  protected def start(): Unit = {
-    assert(!watchdogThread.isAlive())
-
-    watchdogThread.setDaemon(true)
-    watchdogThread.start()
-  }
-
-  private[interactive] def deleteExpiredSessions(): Unit = {
-    // Delete takes time. If we use .filter().foreach() here, the time difference between we check
-    // expiration and the time we delete the session might be huge. To avoid that, check expiration
-    // inside the foreach block.
-    sessions.values.foreach { s =>
-      if (s.heartbeatExpired) {
-        info(s"Session ${s.id} expired. Last heartbeat is at ${s.lastHeartbeat}.")
-        try { delete(s) } catch {
-          case t: Throwable =>
-            warn(s"Exception was thrown when deleting expired session ${s.id}", t)
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/recovery/BlackholeStateStore.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/server/recovery/BlackholeStateStore.scala b/server/src/main/scala/com/cloudera/livy/server/recovery/BlackholeStateStore.scala
deleted file mode 100644
index 89b133e..0000000
--- a/server/src/main/scala/com/cloudera/livy/server/recovery/BlackholeStateStore.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.server.recovery
-
-import scala.reflect.ClassTag
-
-import com.cloudera.livy.LivyConf
-
-/**
- * This is a blackhole implementation of StateStore.
- * Livy will use this when session recovery is disabled.
- */
-class BlackholeStateStore(livyConf: LivyConf) extends StateStore(livyConf) {
-  def set(key: String, value: Object): Unit = {}
-
-  def get[T: ClassTag](key: String): Option[T] = None
-
-  def getChildren(key: String): Seq[String] = List.empty[String]
-
-  def remove(key: String): Unit = {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/recovery/FileSystemStateStore.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/server/recovery/FileSystemStateStore.scala b/server/src/main/scala/com/cloudera/livy/server/recovery/FileSystemStateStore.scala
deleted file mode 100644
index 5e17678..0000000
--- a/server/src/main/scala/com/cloudera/livy/server/recovery/FileSystemStateStore.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.server.recovery
-
-import java.io.{FileNotFoundException, IOException}
-import java.net.URI
-import java.util
-
-import scala.reflect.ClassTag
-import scala.util.control.NonFatal
-
-import org.apache.commons.io.IOUtils
-import org.apache.hadoop.fs._
-import org.apache.hadoop.fs.Options.{CreateOpts, Rename}
-import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
-
-import com.cloudera.livy.{LivyConf, Logging}
-import com.cloudera.livy.Utils.usingResource
-
-class FileSystemStateStore(
-    livyConf: LivyConf,
-    mockFileContext: Option[FileContext])
-  extends StateStore(livyConf) with Logging {
-
-  // Constructor defined for StateStore factory to new this class using reflection.
-  def this(livyConf: LivyConf) {
-    this(livyConf, None)
-  }
-
-  private val fsUri = {
-    val fsPath = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL)
-    require(!fsPath.isEmpty, s"Please config ${LivyConf.RECOVERY_STATE_STORE_URL.key}.")
-    new URI(fsPath)
-  }
-
-  private val fileContext: FileContext = mockFileContext.getOrElse {
-    FileContext.getFileContext(fsUri)
-  }
-
-  {
-    // Only Livy user should have access to state files.
-    fileContext.setUMask(new FsPermission("077"))
-
-    // Create state store dir if it doesn't exist.
-    val stateStorePath = absPath(".")
-    try {
-      fileContext.mkdir(stateStorePath, FsPermission.getDirDefault(), true)
-    } catch {
-      case _: FileAlreadyExistsException =>
-        if (!fileContext.getFileStatus(stateStorePath).isDirectory()) {
-          throw new IOException(s"$stateStorePath is not a directory.")
-        }
-    }
-
-    // Check permission of state store dir.
-    val fileStatus = fileContext.getFileStatus(absPath("."))
-    require(fileStatus.getPermission.getUserAction() == FsAction.ALL,
-      s"Livy doesn't have permission to access state store: $fsUri.")
-    if (fileStatus.getPermission.getGroupAction != FsAction.NONE) {
-      warn(s"Group users have permission to access state store: $fsUri. This is insecure.")
-    }
-    if (fileStatus.getPermission.getOtherAction != FsAction.NONE) {
-      warn(s"Other users have permission to access state store: $fsUri. This is in secure.")
-    }
-  }
-
-  override def set(key: String, value: Object): Unit = {
-    // Write to a temp file then rename to avoid file corruption if livy-server crashes
-    // in the middle of the write.
-    val tmpPath = absPath(s"$key.tmp")
-    val createFlag = util.EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
-
-    usingResource(fileContext.create(tmpPath, createFlag, CreateOpts.createParent())) { tmpFile =>
-      tmpFile.write(serializeToBytes(value))
-      tmpFile.close()
-      // Assume rename is atomic.
-      fileContext.rename(tmpPath, absPath(key), Rename.OVERWRITE)
-    }
-
-    try {
-      val crcPath = new Path(tmpPath.getParent, s".${tmpPath.getName}.crc")
-      fileContext.delete(crcPath, false)
-    } catch {
-      case NonFatal(e) => // Swallow the exception.
-    }
-  }
-
-  override def get[T: ClassTag](key: String): Option[T] = {
-    try {
-      usingResource(fileContext.open(absPath(key))) { is =>
-        Option(deserialize[T](IOUtils.toByteArray(is)))
-      }
-    } catch {
-      case _: FileNotFoundException => None
-      case e: IOException =>
-        warn(s"Failed to read $key from state store.", e)
-        None
-    }
-  }
-
-  override def getChildren(key: String): Seq[String] = {
-    try {
-      fileContext.util.listStatus(absPath(key)).map(_.getPath.getName)
-    } catch {
-      case _: FileNotFoundException => Seq.empty
-      case e: IOException =>
-        warn(s"Failed to list $key from state store.", e)
-        Seq.empty
-    }
-  }
-
-  override def remove(key: String): Unit = {
-    fileContext.delete(absPath(key), false)
-  }
-
-  private def absPath(key: String): Path = new Path(fsUri.getPath(), key)
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/recovery/SessionStore.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/server/recovery/SessionStore.scala b/server/src/main/scala/com/cloudera/livy/server/recovery/SessionStore.scala
deleted file mode 100644
index b7a178c..0000000
--- a/server/src/main/scala/com/cloudera/livy/server/recovery/SessionStore.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.server.recovery
-
-import java.io.IOException
-
-import scala.reflect.ClassTag
-import scala.util.{Failure, Success, Try}
-import scala.util.control.NonFatal
-
-import com.cloudera.livy.{LivyConf, Logging}
-import com.cloudera.livy.sessions.Session.RecoveryMetadata
-
-private[recovery] case class SessionManagerState(nextSessionId: Int)
-
-/**
- * SessionStore provides high level functions to get/save session state from/to StateStore.
- */
-class SessionStore(
-    livyConf: LivyConf,
-    store: => StateStore = StateStore.get) // For unit testing.
-  extends Logging {
-
-  private val STORE_VERSION: String = "v1"
-
-  /**
-   * Persist a session to the session state store.
-   * @param m RecoveryMetadata for the session.
-   */
-  def save(sessionType: String, m: RecoveryMetadata): Unit = {
-    store.set(sessionPath(sessionType, m.id), m)
-  }
-
-  def saveNextSessionId(sessionType: String, id: Int): Unit = {
-    store.set(sessionManagerPath(sessionType), SessionManagerState(id))
-  }
-
-  /**
-   * Return all sessions stored in the store with specified session type.
-   */
-  def getAllSessions[T <: RecoveryMetadata : ClassTag](sessionType: String): Seq[Try[T]] = {
-    store.getChildren(sessionPath(sessionType))
-      .flatMap { c => Try(c.toInt).toOption } // Ignore all non numerical keys
-      .flatMap { id =>
-        val p = sessionPath(sessionType, id)
-        try {
-          store.get[T](p).map(Success(_))
-        } catch {
-          case NonFatal(e) => Some(Failure(new IOException(s"Error getting session $p", e)))
-        }
-      }
-  }
-
-  /**
-   * Return the next unused session id with specified session type.
-   * If checks the SessionManagerState stored and returns the next free session id.
-   * If no SessionManagerState is stored, it returns 0.
-   *
-   * @throws Exception If SessionManagerState stored is corrupted, it throws an error.
-   */
-  def getNextSessionId(sessionType: String): Int = {
-    store.get[SessionManagerState](sessionManagerPath(sessionType))
-      .map(_.nextSessionId).getOrElse(0)
-  }
-
-  /**
-   * Remove a session from the state store.
-   */
-  def remove(sessionType: String, id: Int): Unit = {
-    store.remove(sessionPath(sessionType, id))
-  }
-
-  private def sessionManagerPath(sessionType: String): String =
-    s"$STORE_VERSION/$sessionType/state"
-
-  private def sessionPath(sessionType: String): String =
-    s"$STORE_VERSION/$sessionType"
-
-  private def sessionPath(sessionType: String, id: Int): String =
-    s"$STORE_VERSION/$sessionType/$id"
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/recovery/StateStore.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/server/recovery/StateStore.scala b/server/src/main/scala/com/cloudera/livy/server/recovery/StateStore.scala
deleted file mode 100644
index 25eb238..0000000
--- a/server/src/main/scala/com/cloudera/livy/server/recovery/StateStore.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.cloudera.livy.server.recovery
-
-import scala.reflect.{classTag, ClassTag}
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-
-import com.cloudera.livy.{LivyConf, Logging}
-import com.cloudera.livy.sessions.SessionKindModule
-import com.cloudera.livy.sessions.SessionManager._
-
-protected trait JsonMapper {
-  protected val mapper = new ObjectMapper()
-    .registerModule(DefaultScalaModule)
-    .registerModule(new SessionKindModule())
-
-  def serializeToBytes(value: Object): Array[Byte] = mapper.writeValueAsBytes(value)
-
-  def deserialize[T: ClassTag](json: Array[Byte]): T =
-    mapper.readValue(json, classTag[T].runtimeClass.asInstanceOf[Class[T]])
-}
-
-/**
- * Interface of a key-value pair storage for state storage.
- * It's responsible for de/serialization and retrieving/storing object.
- * It's the low level interface used by higher level classes like SessionStore.
- *
- * Hardcoded to use JSON serialization for now for easier ops. Will add better serialization later.
- */
-abstract class StateStore(livyConf: LivyConf) extends JsonMapper {
-  /**
-   * Set a key-value pair to this state store. It overwrites existing value.
-   * @throws Exception Throw when persisting the state store fails.
-   */
-  def set(key: String, value: Object): Unit
-
-  /**
-   * Get a key-value pair from this state store.
-   * @return Value if the key exists. None if the key doesn't exist.
-   * @throws Exception Throw when deserialization of the stored value fails.
-   */
-  def get[T: ClassTag](key: String): Option[T]
-
-  /**
-   * Treat keys in this state store as a directory tree and
-   * return names of the direct children of the key.
-   * @return List of names of the direct children of the key.
-   *         Empty list if the key doesn't exist or have no child.
-   */
-  def getChildren(key: String): Seq[String]
-
-  /**
-   * Remove the key from this state store. Does not throw if the key doesn't exist.
-   * @throws Exception Throw when persisting the state store fails.
-   */
-  def remove(key: String): Unit
-}
-
-/**
- * Factory to create the store chosen in LivyConf.
- */
-object StateStore extends Logging {
-  private[this] var stateStore: Option[StateStore] = None
-
-  def init(livyConf: LivyConf): Unit = synchronized {
-    if (stateStore.isEmpty) {
-      val fileStateStoreClassTag = pickStateStore(livyConf)
-      stateStore = Option(fileStateStoreClassTag.getDeclaredConstructor(classOf[LivyConf])
-        .newInstance(livyConf).asInstanceOf[StateStore])
-      info(s"Using ${stateStore.get.getClass.getSimpleName} for recovery.")
-    }
-  }
-
-  def cleanup(): Unit = synchronized {
-    stateStore = None
-  }
-
-  def get: StateStore = {
-    assert(stateStore.isDefined, "StateStore hasn't been initialized.")
-    stateStore.get
-  }
-
-  private[recovery] def pickStateStore(livyConf: LivyConf): Class[_] = {
-    livyConf.get(LivyConf.RECOVERY_MODE) match {
-      case SESSION_RECOVERY_MODE_OFF => classOf[BlackholeStateStore]
-      case SESSION_RECOVERY_MODE_RECOVERY =>
-        livyConf.get(LivyConf.RECOVERY_STATE_STORE) match {
-          case "filesystem" => classOf[FileSystemStateStore]
-          case "zookeeper" => classOf[ZooKeeperStateStore]
-          case ss => throw new IllegalArgumentException(s"Unsupported state store $ss")
-        }
-      case rm => throw new IllegalArgumentException(s"Unsupported recovery mode $rm")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStore.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStore.scala b/server/src/main/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStore.scala
deleted file mode 100644
index 753ea22..0000000
--- a/server/src/main/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStore.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.cloudera.livy.server.recovery
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-import scala.util.Try
-
-import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-import org.apache.curator.framework.api.UnhandledErrorListener
-import org.apache.curator.retry.RetryNTimes
-import org.apache.zookeeper.KeeperException.NoNodeException
-
-import com.cloudera.livy.{LivyConf, Logging}
-import com.cloudera.livy.LivyConf.Entry
-
-object ZooKeeperStateStore {
-  val ZK_KEY_PREFIX_CONF = Entry("livy.server.recovery.zk-state-store.key-prefix", "livy")
-  val ZK_RETRY_CONF = Entry("livy.server.recovery.zk-state-store.retry-policy", "5,100")
-}
-
-class ZooKeeperStateStore(
-    livyConf: LivyConf,
-    mockCuratorClient: Option[CuratorFramework] = None) // For testing
-  extends StateStore(livyConf) with Logging {
-
-  import ZooKeeperStateStore._
-
-  // Constructor defined for StateStore factory to new this class using reflection.
-  def this(livyConf: LivyConf) {
-    this(livyConf, None)
-  }
-
-  private val zkAddress = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL)
-  require(!zkAddress.isEmpty, s"Please config ${LivyConf.RECOVERY_STATE_STORE_URL.key}.")
-  private val zkKeyPrefix = livyConf.get(ZK_KEY_PREFIX_CONF)
-  private val retryValue = livyConf.get(ZK_RETRY_CONF)
-  // a regex to match patterns like "m, n" where m and n both are integer values
-  private val retryPattern = """\s*(\d+)\s*,\s*(\d+)\s*""".r
-  private[recovery] val retryPolicy = retryValue match {
-    case retryPattern(n, sleepMs) => new RetryNTimes(n.toInt, sleepMs.toInt)
-    case _ => throw new IllegalArgumentException(
-      s"$ZK_KEY_PREFIX_CONF contains bad value: $retryValue. " +
-        "Correct format is <max retry count>,<sleep ms between retry>. e.g. 5,100")
-  }
-
-  private val curatorClient = mockCuratorClient.getOrElse {
-    CuratorFrameworkFactory.newClient(zkAddress, retryPolicy)
-  }
-
-  Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
-    override def run(): Unit = {
-      curatorClient.close()
-    }
-  }))
-
-  curatorClient.getUnhandledErrorListenable().addListener(new UnhandledErrorListener {
-    def unhandledError(message: String, e: Throwable): Unit = {
-      error(s"Fatal Zookeeper error. Shutting down Livy server.")
-      System.exit(1)
-    }
-  })
-  curatorClient.start()
-  // TODO Make sure ZK path has proper secure permissions so that other users cannot read its
-  // contents.
-
-  override def set(key: String, value: Object): Unit = {
-    val prefixedKey = prefixKey(key)
-    val data = serializeToBytes(value)
-    if (curatorClient.checkExists().forPath(prefixedKey) == null) {
-      curatorClient.create().creatingParentsIfNeeded().forPath(prefixedKey, data)
-    } else {
-      curatorClient.setData().forPath(prefixedKey, data)
-    }
-  }
-
-  override def get[T: ClassTag](key: String): Option[T] = {
-    val prefixedKey = prefixKey(key)
-    if (curatorClient.checkExists().forPath(prefixedKey) == null) {
-      None
-    } else {
-      Option(deserialize[T](curatorClient.getData().forPath(prefixedKey)))
-    }
-  }
-
-  override def getChildren(key: String): Seq[String] = {
-    val prefixedKey = prefixKey(key)
-    if (curatorClient.checkExists().forPath(prefixedKey) == null) {
-      Seq.empty[String]
-    } else {
-      curatorClient.getChildren.forPath(prefixedKey).asScala
-    }
-  }
-
-  override def remove(key: String): Unit = {
-    try {
-      curatorClient.delete().guaranteed().forPath(prefixKey(key))
-    } catch {
-      case _: NoNodeException =>
-    }
-  }
-
-  private def prefixKey(key: String) = s"/$zkKeyPrefix/$key"
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/ui/UIServlet.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/server/ui/UIServlet.scala b/server/src/main/scala/com/cloudera/livy/server/ui/UIServlet.scala
deleted file mode 100644
index 26c5e26..0000000
--- a/server/src/main/scala/com/cloudera/livy/server/ui/UIServlet.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.server.ui
-
-import scala.xml.Node
-
-import org.scalatra.ScalatraServlet
-
-import com.cloudera.livy.LivyConf
-
-class UIServlet extends ScalatraServlet {
-  before() { contentType = "text/html" }
-
-  def getHeader(title: String): Seq[Node] =
-    <head>
-      <link rel="stylesheet" href="/static/bootstrap.min.css" type="text/css"/>
-      <link rel="stylesheet" href="/static/dataTables.bootstrap.min.css" type="text/css"/>
-      <link rel="stylesheet" href="/static/livy-ui.css" type="text/css"/>
-      <script src="/static/jquery-3.2.1.min.js"></script>
-      <script src="/static/bootstrap.min.js"></script>
-      <script src="/static/jquery.dataTables.min.js"></script>
-      <script src="/static/dataTables.bootstrap.min.js"></script>
-      <script src="/static/all-sessions.js"></script>
-      <title>{title}</title>
-    </head>
-
-  def navBar(pageName: String): Seq[Node] =
-    <nav class="navbar navbar-default">
-      <div class="container-fluid">
-        <div class="navbar-header">
-          <a class="navbar-brand" href="#">
-            <img alt="Livy" src="/static/livy-mini-logo.png"/>
-          </a>
-        </div>
-        <div class="collapse navbar-collapse">
-          <ul class="nav navbar-nav">
-            <li><a href="#">{pageName}</a></li>
-          </ul>
-        </div>
-      </div>
-    </nav>
-
-  def createPage(pageName: String, pageContents: Seq[Node]): Seq[Node] =
-    <html>
-      {getHeader("Livy - " + pageName)}
-      <body>
-        <div class="container">
-          {navBar(pageName)}
-          {pageContents}
-        </div>
-      </body>
-    </html>
-
-  get("/") {
-    val content =
-      <div id="all-sessions">
-        <div id="interactive-sessions"></div>
-        <div id="batches"></div>
-      </div>
-
-    createPage("Sessions", content)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/sessions/Session.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/sessions/Session.scala b/server/src/main/scala/com/cloudera/livy/sessions/Session.scala
deleted file mode 100644
index af9ae73..0000000
--- a/server/src/main/scala/com/cloudera/livy/sessions/Session.scala
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.sessions
-
-import java.io.InputStream
-import java.net.{URI, URISyntaxException}
-import java.security.PrivilegedExceptionAction
-import java.util.UUID
-import java.util.concurrent.TimeUnit
-
-import scala.concurrent.{ExecutionContext, Future}
-
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.fs.permission.FsPermission
-import org.apache.hadoop.security.UserGroupInformation
-
-import com.cloudera.livy.{LivyConf, Logging, Utils}
-import com.cloudera.livy.utils.AppInfo
-
-object Session {
-  trait RecoveryMetadata { val id: Int }
-
-  lazy val configBlackList: Set[String] = {
-    val url = getClass.getResource("/spark-blacklist.conf")
-    if (url != null) Utils.loadProperties(url).keySet else Set()
-  }
-
-  /**
-   * Validates and prepares a user-provided configuration for submission.
-   *
-   * - Verifies that no blacklisted configurations are provided.
-   * - Merges file lists in the configuration with the explicit lists provided in the request
-   * - Resolve file URIs to make sure they reference the default FS
-   * - Verify that file URIs don't reference non-whitelisted local resources
-   */
-  def prepareConf(
-      conf: Map[String, String],
-      jars: Seq[String],
-      files: Seq[String],
-      archives: Seq[String],
-      pyFiles: Seq[String],
-      livyConf: LivyConf): Map[String, String] = {
-    if (conf == null) {
-      return Map()
-    }
-
-    val errors = conf.keySet.filter(configBlackList.contains)
-    if (errors.nonEmpty) {
-      throw new IllegalArgumentException(
-        "Blacklisted configuration values in session config: " + errors.mkString(", "))
-    }
-
-    val confLists: Map[String, Seq[String]] = livyConf.sparkFileLists
-      .map { key => (key -> Nil) }.toMap
-
-    val userLists = confLists ++ Map(
-      LivyConf.SPARK_JARS -> jars,
-      LivyConf.SPARK_FILES -> files,
-      LivyConf.SPARK_ARCHIVES -> archives,
-      LivyConf.SPARK_PY_FILES -> pyFiles)
-
-    val merged = userLists.flatMap { case (key, list) =>
-      val confList = conf.get(key)
-        .map { list =>
-          resolveURIs(list.split("[, ]+").toSeq, livyConf)
-        }
-        .getOrElse(Nil)
-      val userList = resolveURIs(list, livyConf)
-      if (confList.nonEmpty || userList.nonEmpty) {
-        Some(key -> (userList ++ confList).mkString(","))
-      } else {
-        None
-      }
-    }
-
-    val masterConfList = Map(LivyConf.SPARK_MASTER -> livyConf.sparkMaster()) ++
-      livyConf.sparkDeployMode().map(LivyConf.SPARK_DEPLOY_MODE -> _).toMap
-
-    conf ++ masterConfList ++ merged
-  }
-
-  /**
-   * Prepends the value of the "fs.defaultFS" configuration to any URIs that do not have a
-   * scheme. URIs are required to at least be absolute paths.
-   *
-   * @throws IllegalArgumentException If an invalid URI is found in the given list.
-   */
-  def resolveURIs(uris: Seq[String], livyConf: LivyConf): Seq[String] = {
-    val defaultFS = livyConf.hadoopConf.get("fs.defaultFS").stripSuffix("/")
-    uris.filter(_.nonEmpty).map { _uri =>
-      val uri = try {
-        new URI(_uri)
-      } catch {
-        case e: URISyntaxException => throw new IllegalArgumentException(e)
-      }
-      resolveURI(uri, livyConf).toString()
-    }
-  }
-
-  def resolveURI(uri: URI, livyConf: LivyConf): URI = {
-    val defaultFS = livyConf.hadoopConf.get("fs.defaultFS").stripSuffix("/")
-    val resolved =
-      if (uri.getScheme() == null) {
-        require(uri.getPath().startsWith("/"), s"Path '${uri.getPath()}' is not absolute.")
-        new URI(defaultFS + uri.getPath())
-      } else {
-        uri
-      }
-
-    if (resolved.getScheme() == "file") {
-      // Make sure the location is whitelisted before allowing local files to be added.
-      require(livyConf.localFsWhitelist.find(resolved.getPath().startsWith).isDefined,
-        s"Local path ${uri.getPath()} cannot be added to user sessions.")
-    }
-
-    resolved
-  }
-}
-
-abstract class Session(val id: Int, val owner: String, val livyConf: LivyConf)
-  extends Logging {
-
-  import Session._
-
-  protected implicit val executionContext = ExecutionContext.global
-
-  protected var _appId: Option[String] = None
-
-  private var _lastActivity = System.nanoTime()
-
-  // Directory where the session's staging files are created. The directory is only accessible
-  // to the session's effective user.
-  private var stagingDir: Path = null
-
-  def appId: Option[String] = _appId
-
-  var appInfo: AppInfo = AppInfo()
-
-  def lastActivity: Long = state match {
-    case SessionState.Error(time) => time
-    case SessionState.Dead(time) => time
-    case SessionState.Success(time) => time
-    case _ => _lastActivity
-  }
-
-  def logLines(): IndexedSeq[String]
-
-  def recordActivity(): Unit = {
-    _lastActivity = System.nanoTime()
-  }
-
-  def recoveryMetadata: RecoveryMetadata
-
-  def state: SessionState
-
-  def stop(): Future[Unit] = Future {
-    try {
-      info(s"Stopping $this...")
-      stopSession()
-      info(s"Stopped $this.")
-    } catch {
-      case e: Exception =>
-        warn(s"Error stopping session $id.", e)
-    }
-
-    try {
-      if (stagingDir != null) {
-        debug(s"Deleting session $id staging directory $stagingDir")
-        doAsOwner {
-          val fs = FileSystem.newInstance(livyConf.hadoopConf)
-          try {
-            fs.delete(stagingDir, true)
-          } finally {
-            fs.close()
-          }
-        }
-      }
-    } catch {
-      case e: Exception =>
-        warn(s"Error cleaning up session $id staging dir.", e)
-    }
-  }
-
-
-  override def toString(): String = s"${this.getClass.getSimpleName} $id"
-
-  protected def stopSession(): Unit
-
-  protected val proxyUser: Option[String]
-
-  protected def doAsOwner[T](fn: => T): T = {
-    val user = proxyUser.getOrElse(owner)
-    if (user != null) {
-      val ugi = if (UserGroupInformation.isSecurityEnabled) {
-        if (livyConf.getBoolean(LivyConf.IMPERSONATION_ENABLED)) {
-          UserGroupInformation.createProxyUser(user, UserGroupInformation.getCurrentUser())
-        } else {
-          UserGroupInformation.getCurrentUser()
-        }
-      } else {
-        UserGroupInformation.createRemoteUser(user)
-      }
-      ugi.doAs(new PrivilegedExceptionAction[T] {
-        override def run(): T = fn
-      })
-    } else {
-      fn
-    }
-  }
-
-  protected def copyResourceToHDFS(dataStream: InputStream, name: String): URI = doAsOwner {
-    val fs = FileSystem.newInstance(livyConf.hadoopConf)
-
-    try {
-      val filePath = new Path(getStagingDir(fs), name)
-      debug(s"Uploading user file to $filePath")
-
-      val outFile = fs.create(filePath, true)
-      val buffer = new Array[Byte](512 * 1024)
-      var read = -1
-      try {
-        while ({read = dataStream.read(buffer); read != -1}) {
-          outFile.write(buffer, 0, read)
-        }
-      } finally {
-        outFile.close()
-      }
-      filePath.toUri
-    } finally {
-      fs.close()
-    }
-  }
-
-  private def getStagingDir(fs: FileSystem): Path = synchronized {
-    if (stagingDir == null) {
-      val stagingRoot = Option(livyConf.get(LivyConf.SESSION_STAGING_DIR)).getOrElse {
-        new Path(fs.getHomeDirectory(), ".livy-sessions").toString()
-      }
-
-      val sessionDir = new Path(stagingRoot, UUID.randomUUID().toString())
-      fs.mkdirs(sessionDir)
-      fs.setPermission(sessionDir, new FsPermission("700"))
-      stagingDir = sessionDir
-      debug(s"Session $id staging directory is $stagingDir")
-    }
-    stagingDir
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala b/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala
deleted file mode 100644
index 93f162e..0000000
--- a/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.sessions
-
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.mutable
-import scala.concurrent.{Await, ExecutionContext, Future}
-import scala.concurrent.duration.Duration
-import scala.reflect.ClassTag
-import scala.util.control.NonFatal
-
-import com.cloudera.livy.{LivyConf, Logging}
-import com.cloudera.livy.server.batch.{BatchRecoveryMetadata, BatchSession}
-import com.cloudera.livy.server.interactive.{InteractiveRecoveryMetadata, InteractiveSession, SessionHeartbeatWatchdog}
-import com.cloudera.livy.server.recovery.SessionStore
-import com.cloudera.livy.sessions.Session.RecoveryMetadata
-
-object SessionManager {
-  val SESSION_RECOVERY_MODE_OFF = "off"
-  val SESSION_RECOVERY_MODE_RECOVERY = "recovery"
-}
-
-class BatchSessionManager(
-    livyConf: LivyConf,
-    sessionStore: SessionStore,
-    mockSessions: Option[Seq[BatchSession]] = None)
-  extends SessionManager[BatchSession, BatchRecoveryMetadata] (
-    livyConf, BatchSession.recover(_, livyConf, sessionStore), sessionStore, "batch", mockSessions)
-
-class InteractiveSessionManager(
-  livyConf: LivyConf,
-  sessionStore: SessionStore,
-  mockSessions: Option[Seq[InteractiveSession]] = None)
-  extends SessionManager[InteractiveSession, InteractiveRecoveryMetadata] (
-    livyConf,
-    InteractiveSession.recover(_, livyConf, sessionStore),
-    sessionStore,
-    "interactive",
-    mockSessions)
-  with SessionHeartbeatWatchdog[InteractiveSession, InteractiveRecoveryMetadata]
-  {
-    start()
-  }
-
-class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
-    protected val livyConf: LivyConf,
-    sessionRecovery: R => S,
-    sessionStore: SessionStore,
-    sessionType: String,
-    mockSessions: Option[Seq[S]] = None)
-  extends Logging {
-
-  import SessionManager._
-
-  protected implicit def executor: ExecutionContext = ExecutionContext.global
-
-  protected[this] final val idCounter = new AtomicInteger(0)
-  protected[this] final val sessions = mutable.LinkedHashMap[Int, S]()
-
-  private[this] final val sessionTimeoutCheck = livyConf.getBoolean(LivyConf.SESSION_TIMEOUT_CHECK)
-  private[this] final val sessionTimeout =
-    TimeUnit.MILLISECONDS.toNanos(livyConf.getTimeAsMs(LivyConf.SESSION_TIMEOUT))
-  private[this] final val sessionStateRetainedInSec =
-    TimeUnit.MILLISECONDS.toNanos(livyConf.getTimeAsMs(LivyConf.SESSION_STATE_RETAIN_TIME))
-
-  mockSessions.getOrElse(recover()).foreach(register)
-  new GarbageCollector().start()
-
-  def nextId(): Int = synchronized {
-    val id = idCounter.getAndIncrement()
-    sessionStore.saveNextSessionId(sessionType, idCounter.get())
-    id
-  }
-
-  def register(session: S): S = {
-    info(s"Registering new session ${session.id}")
-    synchronized {
-      sessions.put(session.id, session)
-    }
-    session
-  }
-
-  def get(id: Int): Option[S] = sessions.get(id)
-
-  def size(): Int = sessions.size
-
-  def all(): Iterable[S] = sessions.values
-
-  def delete(id: Int): Option[Future[Unit]] = {
-    get(id).map(delete)
-  }
-
-  def delete(session: S): Future[Unit] = {
-    session.stop().map { case _ =>
-      try {
-        sessionStore.remove(sessionType, session.id)
-        synchronized {
-          sessions.remove(session.id)
-        }
-      } catch {
-        case NonFatal(e) =>
-          error("Exception was thrown during stop session:", e)
-          throw e
-      }
-    }
-  }
-
-  def shutdown(): Unit = {
-    val recoveryEnabled = livyConf.get(LivyConf.RECOVERY_MODE) != SESSION_RECOVERY_MODE_OFF
-    if (!recoveryEnabled) {
-      sessions.values.map(_.stop).foreach { future =>
-        Await.ready(future, Duration.Inf)
-      }
-    }
-  }
-
-  def collectGarbage(): Future[Iterable[Unit]] = {
-    def expired(session: Session): Boolean = {
-      session.state match {
-        case s: FinishedSessionState =>
-          val currentTime = System.nanoTime()
-          currentTime - s.time > sessionStateRetainedInSec
-        case _ =>
-          if (!sessionTimeoutCheck) {
-            false
-          } else if (session.isInstanceOf[BatchSession]) {
-            false
-          } else {
-            val currentTime = System.nanoTime()
-            currentTime - session.lastActivity > sessionTimeout
-          }
-      }
-    }
-
-    Future.sequence(all().filter(expired).map(delete))
-  }
-
-  private def recover(): Seq[S] = {
-    // Recover next session id from state store and create SessionManager.
-    idCounter.set(sessionStore.getNextSessionId(sessionType))
-
-    // Retrieve session recovery metadata from state store.
-    val sessionMetadata = sessionStore.getAllSessions[R](sessionType)
-
-    // Recover session from session recovery metadata.
-    val recoveredSessions = sessionMetadata.flatMap(_.toOption).map(sessionRecovery)
-
-    info(s"Recovered ${recoveredSessions.length} $sessionType sessions." +
-      s" Next session id: $idCounter")
-
-    // Print recovery error.
-    val recoveryFailure = sessionMetadata.filter(_.isFailure).map(_.failed.get)
-    recoveryFailure.foreach(ex => error(ex.getMessage, ex.getCause))
-
-    recoveredSessions
-  }
-
-  private class GarbageCollector extends Thread("session gc thread") {
-
-    setDaemon(true)
-
-    override def run(): Unit = {
-      while (true) {
-        collectGarbage()
-        Thread.sleep(60 * 1000)
-      }
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/utils/Clock.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/utils/Clock.scala b/server/src/main/scala/com/cloudera/livy/utils/Clock.scala
deleted file mode 100644
index 76ad9d4..0000000
--- a/server/src/main/scala/com/cloudera/livy/utils/Clock.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.cloudera.livy.utils
-
-/**
- * A lot of Livy code relies on time related functions like Thread.sleep.
- * To timing effects from unit test, this class is created to mock out time.
- *
- * Code in Livy should not call Thread.sleep() directly. It should call this class instead.
- */
-object Clock {
-  private var _sleep: Long => Unit = Thread.sleep
-
-  def withSleepMethod(mockSleep: Long => Unit)(f: => Unit): Unit = {
-    try {
-      _sleep = mockSleep
-      f
-    } finally {
-      _sleep = Thread.sleep
-    }
-  }
-
-  def sleep: Long => Unit = _sleep
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/utils/LineBufferedProcess.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/utils/LineBufferedProcess.scala b/server/src/main/scala/com/cloudera/livy/utils/LineBufferedProcess.scala
deleted file mode 100644
index a07ab4b..0000000
--- a/server/src/main/scala/com/cloudera/livy/utils/LineBufferedProcess.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.util
-
-import com.cloudera.livy.{Logging, Utils}
-
-class LineBufferedProcess(process: Process) extends Logging {
-
-  private[this] val _inputStream = new LineBufferedStream(process.getInputStream)
-  private[this] val _errorStream = new LineBufferedStream(process.getErrorStream)
-
-  def inputLines: IndexedSeq[String] = _inputStream.lines
-  def errorLines: IndexedSeq[String] = _errorStream.lines
-
-  def inputIterator: Iterator[String] = _inputStream.iterator
-  def errorIterator: Iterator[String] = _errorStream.iterator
-
-  def destroy(): Unit = {
-    process.destroy()
-  }
-
-  /** Returns if the process is still actively running. */
-  def isAlive: Boolean = Utils.isProcessAlive(process)
-
-  def exitValue(): Int = {
-    process.exitValue()
-  }
-
-  def waitFor(): Int = {
-    val returnCode = process.waitFor()
-    _inputStream.waitUntilClose()
-    _errorStream.waitUntilClose()
-    returnCode
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala b/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala
deleted file mode 100644
index 5f79ca1..0000000
--- a/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.util
-
-import java.io.InputStream
-import java.util.concurrent.locks.ReentrantLock
-
-import scala.io.Source
-
-import com.cloudera.livy.Logging
-
-class LineBufferedStream(inputStream: InputStream) extends Logging {
-
-  private[this] var _lines: IndexedSeq[String] = IndexedSeq()
-
-  private[this] val _lock = new ReentrantLock()
-  private[this] val _condition = _lock.newCondition()
-  private[this] var _finished = false
-
-  private val thread = new Thread {
-    override def run() = {
-      val lines = Source.fromInputStream(inputStream).getLines()
-      for (line <- lines) {
-        _lock.lock()
-        try {
-          _lines = _lines :+ line
-          _condition.signalAll()
-        } finally {
-          _lock.unlock()
-        }
-      }
-
-      _lines.map { line => info("stdout: ", line) }
-      _lock.lock()
-      try {
-        _finished = true
-        _condition.signalAll()
-      } finally {
-        _lock.unlock()
-      }
-    }
-  }
-  thread.setDaemon(true)
-  thread.start()
-
-  def lines: IndexedSeq[String] = _lines
-
-  def iterator: Iterator[String] = {
-    new LinesIterator
-  }
-
-  def waitUntilClose(): Unit = thread.join()
-
-  private class LinesIterator extends Iterator[String] {
-    private[this] var index = 0
-
-    override def hasNext: Boolean = {
-      if (index < _lines.length) {
-        true
-      } else {
-        // Otherwise we might still have more data.
-        _lock.lock()
-        try {
-          if (_finished) {
-            false
-          } else {
-            _condition.await()
-            index < _lines.length
-          }
-        } finally {
-          _lock.unlock()
-        }
-      }
-    }
-
-    override def next(): String = {
-      val line = _lines(index)
-      index += 1
-      line
-    }
-  }
-}