You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/07/05 03:06:35 UTC

[06/33] incubator-livy git commit: LIVY-375. Change Livy code package name to org.apache.livy

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/org/apache/livy/server/WebServer.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/server/WebServer.scala b/server/src/main/scala/org/apache/livy/server/WebServer.scala
new file mode 100644
index 0000000..549d6ab
--- /dev/null
+++ b/server/src/main/scala/org/apache/livy/server/WebServer.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server
+
+import java.net.InetAddress
+import javax.servlet.ServletContextListener
+
+import org.eclipse.jetty.server._
+import org.eclipse.jetty.server.handler.{HandlerCollection, RequestLogHandler}
+import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler}
+import org.eclipse.jetty.util.ssl.SslContextFactory
+
+import org.apache.livy.{LivyConf, Logging}
+
+class WebServer(livyConf: LivyConf, var host: String, var port: Int) extends Logging {
+  val server = new Server()
+
+  server.setStopTimeout(1000)
+  server.setStopAtShutdown(true)
+
+  val (connector, protocol) = Option(livyConf.get(LivyConf.SSL_KEYSTORE)) match {
+    case None =>
+      val http = new HttpConfiguration()
+      http.setRequestHeaderSize(livyConf.getInt(LivyConf.REQUEST_HEADER_SIZE))
+      http.setResponseHeaderSize(livyConf.getInt(LivyConf.RESPONSE_HEADER_SIZE))
+      (new ServerConnector(server, new HttpConnectionFactory(http)), "http")
+
+    case Some(keystore) =>
+      val https = new HttpConfiguration()
+      https.setRequestHeaderSize(livyConf.getInt(LivyConf.REQUEST_HEADER_SIZE))
+      https.setResponseHeaderSize(livyConf.getInt(LivyConf.RESPONSE_HEADER_SIZE))
+      https.addCustomizer(new SecureRequestCustomizer())
+
+      val sslContextFactory = new SslContextFactory()
+      sslContextFactory.setKeyStorePath(keystore)
+      Option(livyConf.get(LivyConf.SSL_KEYSTORE_PASSWORD))
+        .foreach(sslContextFactory.setKeyStorePassword)
+      Option(livyConf.get(LivyConf.SSL_KEY_PASSWORD))
+        .foreach(sslContextFactory.setKeyManagerPassword)
+
+      (new ServerConnector(server,
+        new SslConnectionFactory(sslContextFactory, "http/1.1"),
+        new HttpConnectionFactory(https)), "https")
+  }
+
+  connector.setHost(host)
+  connector.setPort(port)
+
+  server.setConnectors(Array(connector))
+
+  val context = new ServletContextHandler()
+
+  context.setContextPath("/")
+  context.addServlet(classOf[DefaultServlet], "/")
+
+  val handlers = new HandlerCollection
+  handlers.addHandler(context)
+
+  // Configure the access log
+  val requestLogHandler = new RequestLogHandler
+  val requestLog = new NCSARequestLog(sys.env.getOrElse("LIVY_LOG_DIR",
+    sys.env("LIVY_HOME") + "/logs") + "/yyyy_mm_dd.request.log")
+  requestLog.setAppend(true)
+  requestLog.setExtended(false)
+  requestLog.setLogTimeZone("GMT")
+  requestLog.setRetainDays(livyConf.getInt(LivyConf.REQUEST_LOG_RETAIN_DAYS))
+  requestLogHandler.setRequestLog(requestLog)
+  handlers.addHandler(requestLogHandler)
+
+  server.setHandler(handlers)
+
+  def addEventListener(listener: ServletContextListener): Unit = {
+    context.addEventListener(listener)
+  }
+
+  def start(): Unit = {
+    server.start()
+
+    val connector = server.getConnectors()(0).asInstanceOf[NetworkConnector]
+
+    if (host == "0.0.0.0") {
+      host = InetAddress.getLocalHost.getCanonicalHostName
+    }
+    port = connector.getLocalPort
+
+    info("Starting server on %s://%s:%d" format (protocol, host, port))
+  }
+
+  def join(): Unit = {
+    server.join()
+  }
+
+  def stop(): Unit = {
+    context.stop()
+    server.stop()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
new file mode 100644
index 0000000..2605bf5
--- /dev/null
+++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server.batch
+
+import java.lang.ProcessBuilder.Redirect
+
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
+import scala.util.Random
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties
+
+import org.apache.livy.{LivyConf, Logging}
+import org.apache.livy.server.recovery.SessionStore
+import org.apache.livy.sessions.{Session, SessionState}
+import org.apache.livy.sessions.Session._
+import org.apache.livy.utils.{AppInfo, SparkApp, SparkAppListener, SparkProcessBuilder}
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+case class BatchRecoveryMetadata(
+    id: Int,
+    appId: Option[String],
+    appTag: String,
+    owner: String,
+    proxyUser: Option[String],
+    version: Int = 1)
+  extends RecoveryMetadata
+
+object BatchSession extends Logging {
+  val RECOVERY_SESSION_TYPE = "batch"
+
+  def create(
+      id: Int,
+      request: CreateBatchRequest,
+      livyConf: LivyConf,
+      owner: String,
+      proxyUser: Option[String],
+      sessionStore: SessionStore,
+      mockApp: Option[SparkApp] = None): BatchSession = {
+    val appTag = s"livy-batch-$id-${Random.alphanumeric.take(8).mkString}"
+
+    def createSparkApp(s: BatchSession): SparkApp = {
+      val conf = SparkApp.prepareSparkConf(
+        appTag,
+        livyConf,
+        prepareConf(
+          request.conf, request.jars, request.files, request.archives, request.pyFiles, livyConf))
+      require(request.file != null, "File is required.")
+
+      val builder = new SparkProcessBuilder(livyConf)
+      builder.conf(conf)
+
+      proxyUser.foreach(builder.proxyUser)
+      request.className.foreach(builder.className)
+      request.driverMemory.foreach(builder.driverMemory)
+      request.driverCores.foreach(builder.driverCores)
+      request.executorMemory.foreach(builder.executorMemory)
+      request.executorCores.foreach(builder.executorCores)
+      request.numExecutors.foreach(builder.numExecutors)
+      request.queue.foreach(builder.queue)
+      request.name.foreach(builder.name)
+
+      // Spark 1.x does not support specifying deploy mode in conf and needs special handling.
+      livyConf.sparkDeployMode().foreach(builder.deployMode)
+
+      sessionStore.save(BatchSession.RECOVERY_SESSION_TYPE, s.recoveryMetadata)
+
+      builder.redirectOutput(Redirect.PIPE)
+      builder.redirectErrorStream(true)
+
+      val file = resolveURIs(Seq(request.file), livyConf)(0)
+      val sparkSubmit = builder.start(Some(file), request.args)
+
+      SparkApp.create(appTag, None, Option(sparkSubmit), livyConf, Option(s))
+    }
+
+    info(s"Creating batch session $id: [owner: $owner, request: $request]")
+
+    new BatchSession(
+      id,
+      appTag,
+      SessionState.Starting(),
+      livyConf,
+      owner,
+      proxyUser,
+      sessionStore,
+      mockApp.map { m => (_: BatchSession) => m }.getOrElse(createSparkApp))
+  }
+
+  def recover(
+      m: BatchRecoveryMetadata,
+      livyConf: LivyConf,
+      sessionStore: SessionStore,
+      mockApp: Option[SparkApp] = None): BatchSession = {
+    new BatchSession(
+      m.id,
+      m.appTag,
+      SessionState.Recovering(),
+      livyConf,
+      m.owner,
+      m.proxyUser,
+      sessionStore,
+      mockApp.map { m => (_: BatchSession) => m }.getOrElse { s =>
+        SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s))
+      })
+  }
+}
+
+class BatchSession(
+    id: Int,
+    appTag: String,
+    initialState: SessionState,
+    livyConf: LivyConf,
+    owner: String,
+    override val proxyUser: Option[String],
+    sessionStore: SessionStore,
+    sparkApp: BatchSession => SparkApp)
+  extends Session(id, owner, livyConf) with SparkAppListener {
+  import BatchSession._
+
+  protected implicit def executor: ExecutionContextExecutor = ExecutionContext.global
+
+  private[this] var _state: SessionState = initialState
+  private val app = sparkApp(this)
+
+  override def state: SessionState = _state
+
+  override def logLines(): IndexedSeq[String] = app.log()
+
+  override def stopSession(): Unit = {
+    app.kill()
+  }
+
+  override def appIdKnown(appId: String): Unit = {
+    _appId = Option(appId)
+    sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
+  }
+
+  override def stateChanged(oldState: SparkApp.State, newState: SparkApp.State): Unit = {
+    synchronized {
+      debug(s"$this state changed from $oldState to $newState")
+      newState match {
+        case SparkApp.State.RUNNING =>
+          _state = SessionState.Running()
+          info(s"Batch session $id created [appid: ${appId.orNull}, state: ${state.toString}, " +
+            s"info: ${appInfo.asJavaMap}]")
+        case SparkApp.State.FINISHED => _state = SessionState.Success()
+        case SparkApp.State.KILLED | SparkApp.State.FAILED =>
+          _state = SessionState.Dead()
+        case _ =>
+      }
+    }
+  }
+
+  override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo }
+
+  override def recoveryMetadata: RecoveryMetadata =
+    BatchRecoveryMetadata(id, appId, appTag, owner, proxyUser)
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala
new file mode 100644
index 0000000..db98a24
--- /dev/null
+++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server.batch
+
+import javax.servlet.http.HttpServletRequest
+
+import org.apache.livy.LivyConf
+import org.apache.livy.server.SessionServlet
+import org.apache.livy.server.recovery.SessionStore
+import org.apache.livy.sessions.BatchSessionManager
+import org.apache.livy.utils.AppInfo
+
+case class BatchSessionView(
+  id: Long,
+  state: String,
+  appId: Option[String],
+  appInfo: AppInfo,
+  log: Seq[String])
+
+class BatchSessionServlet(
+    sessionManager: BatchSessionManager,
+    sessionStore: SessionStore,
+    livyConf: LivyConf)
+  extends SessionServlet(sessionManager, livyConf)
+{
+
+  override protected def createSession(req: HttpServletRequest): BatchSession = {
+    val createRequest = bodyAs[CreateBatchRequest](req)
+    val proxyUser = checkImpersonation(createRequest.proxyUser, req)
+    BatchSession.create(
+      sessionManager.nextId(), createRequest, livyConf, remoteUser(req), proxyUser, sessionStore)
+  }
+
+  override protected[batch] def clientSessionView(
+      session: BatchSession,
+      req: HttpServletRequest): Any = {
+    val logs =
+      if (hasAccess(session.owner, req)) {
+        val lines = session.logLines()
+
+        val size = 10
+        val from = math.max(0, lines.length - size)
+        val until = from + size
+
+        lines.view(from, until).toSeq
+      } else {
+        Nil
+      }
+    BatchSessionView(session.id, session.state.toString, session.appId, session.appInfo, logs)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/org/apache/livy/server/batch/CreateBatchRequest.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/server/batch/CreateBatchRequest.scala b/server/src/main/scala/org/apache/livy/server/batch/CreateBatchRequest.scala
new file mode 100644
index 0000000..53b5e1b
--- /dev/null
+++ b/server/src/main/scala/org/apache/livy/server/batch/CreateBatchRequest.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server.batch
+
+class CreateBatchRequest {
+
+  var file: String = _
+  var proxyUser: Option[String] = None
+  var args: List[String] = List()
+  var className: Option[String] = None
+  var jars: List[String] = List()
+  var pyFiles: List[String] = List()
+  var files: List[String] = List()
+  var driverMemory: Option[String] = None
+  var driverCores: Option[Int] = None
+  var executorMemory: Option[String] = None
+  var executorCores: Option[Int] = None
+  var numExecutors: Option[Int] = None
+  var archives: List[String] = List()
+  var queue: Option[String] = None
+  var name: Option[String] = None
+  var conf: Map[String, String] = Map()
+
+  override def toString: String = {
+    s"[proxyUser: $proxyUser, " +
+      s"file: $file, " +
+      (if (args.nonEmpty) s"args: ${args.mkString(",")}, " else "") +
+      (if (jars.nonEmpty) s"jars: ${jars.mkString(",")}, " else "") +
+      (if (pyFiles.nonEmpty) s"pyFiles: ${pyFiles.mkString(",")}, " else "") +
+      (if (files.nonEmpty) s"files: ${files.mkString(",")}, " else "") +
+      (if (archives.nonEmpty) s"archives: ${archives.mkString(",")}, " else "") +
+      (if (driverMemory.isDefined) s"driverMemory: ${driverMemory.get}, " else "") +
+      (if (driverCores.isDefined) s"driverCores: ${driverCores.get}, " else "") +
+      (if (executorMemory.isDefined) s"executorMemory: ${executorMemory.get}, " else "") +
+      (if (executorCores.isDefined) s"executorCores: ${executorCores.get}, " else "") +
+      (if (numExecutors.isDefined) s"numExecutors: ${numExecutors.get}, " else "") +
+      (if (queue.isDefined) s"queue: ${queue.get}, " else "") +
+      (if (name.isDefined) s"name: ${name.get}, " else "") +
+      (if (conf.nonEmpty) s"conf: ${conf.mkString(",")}]" else "]")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/org/apache/livy/server/interactive/CreateInteractiveRequest.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/server/interactive/CreateInteractiveRequest.scala b/server/src/main/scala/org/apache/livy/server/interactive/CreateInteractiveRequest.scala
new file mode 100644
index 0000000..bbb7abd
--- /dev/null
+++ b/server/src/main/scala/org/apache/livy/server/interactive/CreateInteractiveRequest.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server.interactive
+
+import org.apache.livy.sessions.{Kind, Spark}
+
+class CreateInteractiveRequest {
+  var kind: Kind = Spark()
+  var proxyUser: Option[String] = None
+  var jars: List[String] = List()
+  var pyFiles: List[String] = List()
+  var files: List[String] = List()
+  var driverMemory: Option[String] = None
+  var driverCores: Option[Int] = None
+  var executorMemory: Option[String] = None
+  var executorCores: Option[Int] = None
+  var numExecutors: Option[Int] = None
+  var archives: List[String] = List()
+  var queue: Option[String] = None
+  var name: Option[String] = None
+  var conf: Map[String, String] = Map()
+  var heartbeatTimeoutInSecond: Int = 0
+
+  override def toString: String = {
+    s"[kind: $kind, " +
+      s"proxyUser: $proxyUser, " +
+      (if (jars.nonEmpty) s"jars: ${jars.mkString(",")}, " else "") +
+      (if (pyFiles.nonEmpty) s"pyFiles: ${pyFiles.mkString(",")}, " else "") +
+      (if (files.nonEmpty) s"files: ${files.mkString(",")}, " else "") +
+      (if (archives.nonEmpty) s"archives: ${archives.mkString(",")}, " else "") +
+      (if (driverMemory.isDefined) s"driverMemory: ${driverMemory.get}, " else "") +
+      (if (driverCores.isDefined) s"driverCores: ${driverCores.get}, " else "") +
+      (if (executorMemory.isDefined) s"executorMemory: ${executorMemory.get}, " else "") +
+      (if (executorCores.isDefined) s"executorCores: ${executorCores.get}, " else "") +
+      (if (numExecutors.isDefined) s"numExecutors: ${numExecutors.get}, " else "") +
+      (if (queue.isDefined) s"queue: ${queue.get}, " else "") +
+      (if (name.isDefined) s"name: ${name.get}, " else "") +
+      (if (conf.nonEmpty) s"conf: ${conf.mkString(",")}, " else "") +
+      s"heartbeatTimeoutInSecond: $heartbeatTimeoutInSecond]"
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
new file mode 100644
index 0000000..b466d85
--- /dev/null
+++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server.interactive
+
+import java.io.{File, InputStream}
+import java.net.URI
+import java.nio.ByteBuffer
+import java.nio.file.{Files, Paths}
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.util.Random
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties
+import com.google.common.annotations.VisibleForTesting
+import org.apache.hadoop.fs.Path
+import org.apache.spark.launcher.SparkLauncher
+
+import org.apache.livy._
+import org.apache.livy.client.common.HttpMessages._
+import org.apache.livy.rsc.{PingJob, RSCClient, RSCConf}
+import org.apache.livy.rsc.driver.Statement
+import org.apache.livy.server.recovery.SessionStore
+import org.apache.livy.sessions._
+import org.apache.livy.sessions.Session._
+import org.apache.livy.sessions.SessionState.Dead
+import org.apache.livy.utils._
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+case class InteractiveRecoveryMetadata(
+    id: Int,
+    appId: Option[String],
+    appTag: String,
+    kind: Kind,
+    heartbeatTimeoutS: Int,
+    owner: String,
+    proxyUser: Option[String],
+    rscDriverUri: Option[URI],
+    version: Int = 1)
+  extends RecoveryMetadata
+
+object InteractiveSession extends Logging {
+  private[interactive] val SPARK_YARN_IS_PYTHON = "spark.yarn.isPython"
+
+  val RECOVERY_SESSION_TYPE = "interactive"
+
+  def create(
+      id: Int,
+      owner: String,
+      proxyUser: Option[String],
+      livyConf: LivyConf,
+      request: CreateInteractiveRequest,
+      sessionStore: SessionStore,
+      mockApp: Option[SparkApp] = None,
+      mockClient: Option[RSCClient] = None): InteractiveSession = {
+    val appTag = s"livy-session-$id-${Random.alphanumeric.take(8).mkString}"
+
+    val client = mockClient.orElse {
+      val conf = SparkApp.prepareSparkConf(appTag, livyConf, prepareConf(
+        request.conf, request.jars, request.files, request.archives, request.pyFiles, livyConf))
+
+      val builderProperties = prepareBuilderProp(conf, request.kind, livyConf)
+
+      val userOpts: Map[String, Option[String]] = Map(
+        "spark.driver.cores" -> request.driverCores.map(_.toString),
+        SparkLauncher.DRIVER_MEMORY -> request.driverMemory.map(_.toString),
+        SparkLauncher.EXECUTOR_CORES -> request.executorCores.map(_.toString),
+        SparkLauncher.EXECUTOR_MEMORY -> request.executorMemory.map(_.toString),
+        "spark.executor.instances" -> request.numExecutors.map(_.toString),
+        "spark.app.name" -> request.name.map(_.toString),
+        "spark.yarn.queue" -> request.queue
+      )
+
+      userOpts.foreach { case (key, opt) =>
+        opt.foreach { value => builderProperties.put(key, value) }
+      }
+
+      builderProperties.getOrElseUpdate("spark.app.name", s"livy-session-$id")
+
+      info(s"Creating Interactive session $id: [owner: $owner, request: $request]")
+      val builder = new LivyClientBuilder()
+        .setAll(builderProperties.asJava)
+        .setConf("livy.client.session-id", id.toString)
+        .setConf(RSCConf.Entry.DRIVER_CLASS.key(), "org.apache.livy.repl.ReplDriver")
+        .setConf(RSCConf.Entry.PROXY_USER.key(), proxyUser.orNull)
+        .setURI(new URI("rsc:/"))
+
+      Option(builder.build().asInstanceOf[RSCClient])
+    }
+
+    new InteractiveSession(
+      id,
+      None,
+      appTag,
+      client,
+      SessionState.Starting(),
+      request.kind,
+      request.heartbeatTimeoutInSecond,
+      livyConf,
+      owner,
+      proxyUser,
+      sessionStore,
+      mockApp)
+  }
+
+  def recover(
+      metadata: InteractiveRecoveryMetadata,
+      livyConf: LivyConf,
+      sessionStore: SessionStore,
+      mockApp: Option[SparkApp] = None,
+      mockClient: Option[RSCClient] = None): InteractiveSession = {
+    val client = mockClient.orElse(metadata.rscDriverUri.map { uri =>
+      val builder = new LivyClientBuilder().setURI(uri)
+      builder.build().asInstanceOf[RSCClient]
+    })
+
+    new InteractiveSession(
+      metadata.id,
+      metadata.appId,
+      metadata.appTag,
+      client,
+      SessionState.Recovering(),
+      metadata.kind,
+      metadata.heartbeatTimeoutS,
+      livyConf,
+      metadata.owner,
+      metadata.proxyUser,
+      sessionStore,
+      mockApp)
+  }
+
+  @VisibleForTesting
+  private[interactive] def prepareBuilderProp(
+    conf: Map[String, String],
+    kind: Kind,
+    livyConf: LivyConf): mutable.Map[String, String] = {
+
+    val builderProperties = mutable.Map[String, String]()
+    builderProperties ++= conf
+
+    def livyJars(livyConf: LivyConf, scalaVersion: String): List[String] = {
+      Option(livyConf.get(LivyConf.REPL_JARS)).map { jars =>
+        val regex = """[\w-]+_(\d\.\d\d).*\.jar""".r
+        jars.split(",").filter { name => new Path(name).getName match {
+            // Filter out unmatched scala jars
+            case regex(ver) => ver == scalaVersion
+            // Keep all the java jars end with ".jar"
+            case _ => name.endsWith(".jar")
+          }
+        }.toList
+      }.getOrElse {
+        val home = sys.env("LIVY_HOME")
+        val jars = Option(new File(home, s"repl_$scalaVersion-jars"))
+          .filter(_.isDirectory())
+          .getOrElse(new File(home, s"repl/scala-$scalaVersion/target/jars"))
+        require(jars.isDirectory(), "Cannot find Livy REPL jars.")
+        jars.listFiles().map(_.getAbsolutePath()).toList
+      }
+    }
+
+    def findSparkRArchive(): Option[String] = {
+      Option(livyConf.get(RSCConf.Entry.SPARKR_PACKAGE.key())).orElse {
+        sys.env.get("SPARK_HOME").map { case sparkHome =>
+          val path = Seq(sparkHome, "R", "lib", "sparkr.zip").mkString(File.separator)
+          val rArchivesFile = new File(path)
+          require(rArchivesFile.exists(), "sparkr.zip not found; cannot run sparkr application.")
+          rArchivesFile.getAbsolutePath()
+        }
+      }
+    }
+
+    def datanucleusJars(livyConf: LivyConf, sparkMajorVersion: Int): Seq[String] = {
+      if (sys.env.getOrElse("LIVY_INTEGRATION_TEST", "false").toBoolean) {
+        // datanucleus jars has already been in classpath in integration test
+        Seq.empty
+      } else {
+        val sparkHome = livyConf.sparkHome().get
+        val libdir = sparkMajorVersion match {
+          case 1 =>
+            if (new File(sparkHome, "RELEASE").isFile) {
+              new File(sparkHome, "lib")
+            } else {
+              new File(sparkHome, "lib_managed/jars")
+            }
+          case 2 =>
+            if (new File(sparkHome, "RELEASE").isFile) {
+              new File(sparkHome, "jars")
+            } else if (new File(sparkHome, "assembly/target/scala-2.11/jars").isDirectory) {
+              new File(sparkHome, "assembly/target/scala-2.11/jars")
+            } else {
+              new File(sparkHome, "assembly/target/scala-2.10/jars")
+            }
+          case v =>
+            throw new RuntimeException("Unsupported spark major version:" + sparkMajorVersion)
+        }
+        val jars = if (!libdir.isDirectory) {
+          Seq.empty[String]
+        } else {
+          libdir.listFiles().filter(_.getName.startsWith("datanucleus-"))
+            .map(_.getAbsolutePath).toSeq
+        }
+        if (jars.isEmpty) {
+          warn("datanucleus jars can not be found")
+        }
+        jars
+      }
+    }
+
+    /**
+     * Look for hive-site.xml (for now just ignore spark.files defined in spark-defaults.conf)
+     * 1. First look for hive-site.xml in user request
+     * 2. Then look for that under classpath
+     * @param livyConf
+     * @return  (hive-site.xml path, whether it is provided by user)
+     */
+    def hiveSiteFile(sparkFiles: Array[String], livyConf: LivyConf): (Option[File], Boolean) = {
+      if (sparkFiles.exists(_.split("/").last == "hive-site.xml")) {
+        (None, true)
+      } else {
+        val hiveSiteURL = getClass.getResource("/hive-site.xml")
+        if (hiveSiteURL != null && hiveSiteURL.getProtocol == "file") {
+          (Some(new File(hiveSiteURL.toURI)), false)
+        } else {
+          (None, false)
+        }
+      }
+    }
+
+    def findPySparkArchives(): Seq[String] = {
+      Option(livyConf.get(RSCConf.Entry.PYSPARK_ARCHIVES))
+        .map(_.split(",").toSeq)
+        .getOrElse {
+          sys.env.get("SPARK_HOME") .map { case sparkHome =>
+            val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
+            val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
+            require(pyArchivesFile.exists(),
+              "pyspark.zip not found; cannot run pyspark application in YARN mode.")
+
+            val py4jFile = Files.newDirectoryStream(Paths.get(pyLibPath), "py4j-*-src.zip")
+              .iterator()
+              .next()
+              .toFile
+
+            require(py4jFile.exists(),
+              "py4j-*-src.zip not found; cannot run pyspark application in YARN mode.")
+            Seq(pyArchivesFile.getAbsolutePath, py4jFile.getAbsolutePath)
+          }.getOrElse(Seq())
+        }
+    }
+
+    def mergeConfList(list: Seq[String], key: String): Unit = {
+      if (list.nonEmpty) {
+        builderProperties.get(key) match {
+          case None =>
+            builderProperties.put(key, list.mkString(","))
+          case Some(oldList) =>
+            val newList = (oldList :: list.toList).mkString(",")
+            builderProperties.put(key, newList)
+        }
+      }
+    }
+
+    def mergeHiveSiteAndHiveDeps(sparkMajorVersion: Int): Unit = {
+      val sparkFiles = conf.get("spark.files").map(_.split(",")).getOrElse(Array.empty[String])
+      hiveSiteFile(sparkFiles, livyConf) match {
+        case (_, true) =>
+          debug("Enable HiveContext because hive-site.xml is found in user request.")
+          mergeConfList(datanucleusJars(livyConf, sparkMajorVersion), LivyConf.SPARK_JARS)
+        case (Some(file), false) =>
+          debug("Enable HiveContext because hive-site.xml is found under classpath, "
+            + file.getAbsolutePath)
+          mergeConfList(List(file.getAbsolutePath), LivyConf.SPARK_FILES)
+          mergeConfList(datanucleusJars(livyConf, sparkMajorVersion), LivyConf.SPARK_JARS)
+        case (None, false) =>
+          warn("Enable HiveContext but no hive-site.xml found under" +
+            " classpath or user request.")
+      }
+    }
+
+    kind match {
+      case PySpark() | PySpark3() =>
+        val pySparkFiles = if (!LivyConf.TEST_MODE) findPySparkArchives() else Nil
+        mergeConfList(pySparkFiles, LivyConf.SPARK_PY_FILES)
+        builderProperties.put(SPARK_YARN_IS_PYTHON, "true")
+      case SparkR() =>
+        val sparkRArchive = if (!LivyConf.TEST_MODE) findSparkRArchive() else None
+        sparkRArchive.foreach { archive =>
+          builderProperties.put(RSCConf.Entry.SPARKR_PACKAGE.key(), archive + "#sparkr")
+        }
+      case _ =>
+    }
+    builderProperties.put(RSCConf.Entry.SESSION_KIND.key, kind.toString)
+
+    // Set Livy.rsc.jars from livy conf to rsc conf, RSC conf will take precedence if both are set.
+    Option(livyConf.get(LivyConf.RSC_JARS)).foreach(
+      builderProperties.getOrElseUpdate(RSCConf.Entry.LIVY_JARS.key(), _))
+
+    require(livyConf.get(LivyConf.LIVY_SPARK_VERSION) != null)
+    require(livyConf.get(LivyConf.LIVY_SPARK_SCALA_VERSION) != null)
+
+    val (sparkMajorVersion, _) =
+      LivySparkUtils.formatSparkVersion(livyConf.get(LivyConf.LIVY_SPARK_VERSION))
+    val scalaVersion = livyConf.get(LivyConf.LIVY_SPARK_SCALA_VERSION)
+
+    mergeConfList(livyJars(livyConf, scalaVersion), LivyConf.SPARK_JARS)
+    val enableHiveContext = livyConf.getBoolean(LivyConf.ENABLE_HIVE_CONTEXT)
+    // pass spark.livy.spark_major_version to driver
+    builderProperties.put("spark.livy.spark_major_version", sparkMajorVersion.toString)
+
+    if (sparkMajorVersion <= 1) {
+      builderProperties.put("spark.repl.enableHiveContext",
+        livyConf.getBoolean(LivyConf.ENABLE_HIVE_CONTEXT).toString)
+    } else {
+      val confVal = if (enableHiveContext) "hive" else "in-memory"
+      builderProperties.put("spark.sql.catalogImplementation", confVal)
+    }
+
+    if (enableHiveContext) {
+      mergeHiveSiteAndHiveDeps(sparkMajorVersion)
+    }
+
+    builderProperties
+  }
+}
+
+class InteractiveSession(
+    id: Int,
+    appIdHint: Option[String],
+    appTag: String,
+    client: Option[RSCClient],
+    initialState: SessionState,
+    val kind: Kind,
+    heartbeatTimeoutS: Int,
+    livyConf: LivyConf,
+    owner: String,
+    override val proxyUser: Option[String],
+    sessionStore: SessionStore,
+    mockApp: Option[SparkApp]) // For unit test.
+  extends Session(id, owner, livyConf)
+  with SessionHeartbeat
+  with SparkAppListener {
+
+  import InteractiveSession._
+
+  private var serverSideState: SessionState = initialState
+
+  override protected val heartbeatTimeout: FiniteDuration = {
+    val heartbeatTimeoutInSecond = heartbeatTimeoutS
+    Duration(heartbeatTimeoutInSecond, TimeUnit.SECONDS)
+  }
+  private val operations = mutable.Map[Long, String]()
+  private val operationCounter = new AtomicLong(0)
+  private var rscDriverUri: Option[URI] = None
+  private var sessionLog: IndexedSeq[String] = IndexedSeq.empty
+  private val sessionSaveLock = new Object()
+
+  _appId = appIdHint
+  sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
+  heartbeat()
+
+  private val app = mockApp.orElse {
+    if (livyConf.isRunningOnYarn()) {
+      val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
+        .map(new LineBufferedProcess(_))
+      // When Livy is running with YARN, SparkYarnApp can provide better YARN integration.
+      // (e.g. Reflect YARN application state to session state).
+      Option(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
+    } else {
+      // When Livy is running with other cluster manager, SparkApp doesn't provide any
+      // additional benefit over controlling RSCDriver using RSCClient. Don't use it.
+      None
+    }
+  }
+
+  if (client.isEmpty) {
+    transition(Dead())
+    val msg = s"Cannot recover interactive session $id because its RSCDriver URI is unknown."
+    info(msg)
+    sessionLog = IndexedSeq(msg)
+  } else {
+    val uriFuture = Future { client.get.getServerUri.get() }
+
+    uriFuture onSuccess { case url =>
+      rscDriverUri = Option(url)
+      sessionSaveLock.synchronized {
+        sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
+      }
+    }
+    uriFuture onFailure { case e => warn("Fail to get rsc uri", e) }
+
+    // Send a dummy job that will return once the client is ready to be used, and set the
+    // state to "idle" at that point.
+    client.get.submit(new PingJob()).addListener(new JobHandle.Listener[Void]() {
+      override def onJobQueued(job: JobHandle[Void]): Unit = { }
+      override def onJobStarted(job: JobHandle[Void]): Unit = { }
+
+      override def onJobCancelled(job: JobHandle[Void]): Unit = errorOut()
+
+      override def onJobFailed(job: JobHandle[Void], cause: Throwable): Unit = errorOut()
+
+      override def onJobSucceeded(job: JobHandle[Void], result: Void): Unit = {
+        transition(SessionState.Running())
+        info(s"Interactive session $id created [appid: ${appId.orNull}, owner: $owner, proxyUser:" +
+          s" $proxyUser, state: ${state.toString}, kind: ${kind.toString}, " +
+          s"info: ${appInfo.asJavaMap}]")
+      }
+
+      private def errorOut(): Unit = {
+        // Other code might call stop() to close the RPC channel. When RPC channel is closing,
+        // this callback might be triggered. Check and don't call stop() to avoid nested called
+        // if the session is already shutting down.
+        if (serverSideState != SessionState.ShuttingDown()) {
+          transition(SessionState.Error())
+          stop()
+          app.foreach { a =>
+            info(s"Failed to ping RSC driver for session $id. Killing application.")
+            a.kill()
+          }
+        }
+      }
+    })
+  }
+
+  override def logLines(): IndexedSeq[String] = app.map(_.log()).getOrElse(sessionLog)
+
+  override def recoveryMetadata: RecoveryMetadata =
+    InteractiveRecoveryMetadata(
+      id, appId, appTag, kind, heartbeatTimeout.toSeconds.toInt, owner, proxyUser, rscDriverUri)
+
+  override def state: SessionState = {
+    if (serverSideState.isInstanceOf[SessionState.Running]) {
+      // If session is in running state, return the repl state from RSCClient.
+      client
+        .flatMap(s => Option(s.getReplState))
+        .map(SessionState(_))
+        .getOrElse(SessionState.Busy()) // If repl state is unknown, assume repl is busy.
+    } else {
+      serverSideState
+    }
+  }
+
+  override def stopSession(): Unit = {
+    try {
+      transition(SessionState.ShuttingDown())
+      sessionStore.remove(RECOVERY_SESSION_TYPE, id)
+      client.foreach { _.stop(true) }
+    } catch {
+      case _: Exception =>
+        app.foreach {
+          warn(s"Failed to stop RSCDriver. Killing it...")
+          _.kill()
+        }
+    } finally {
+      transition(SessionState.Dead())
+    }
+  }
+
+  def statements: IndexedSeq[Statement] = {
+    ensureActive()
+    val r = client.get.getReplJobResults().get()
+    r.statements.toIndexedSeq
+  }
+
+  def getStatement(stmtId: Int): Option[Statement] = {
+    ensureActive()
+    val r = client.get.getReplJobResults(stmtId, 1).get()
+    if (r.statements.length < 1) {
+      None
+    } else {
+      Option(r.statements(0))
+    }
+  }
+
+  def interrupt(): Future[Unit] = {
+    stop()
+  }
+
+  def executeStatement(content: ExecuteRequest): Statement = {
+    ensureRunning()
+    recordActivity()
+
+    val id = client.get.submitReplCode(content.code).get
+    client.get.getReplJobResults(id, 1).get().statements(0)
+  }
+
+  def cancelStatement(statementId: Int): Unit = {
+    ensureRunning()
+    recordActivity()
+    client.get.cancelReplCode(statementId)
+  }
+
+  def runJob(job: Array[Byte]): Long = {
+    performOperation(job, true)
+  }
+
+  def submitJob(job: Array[Byte]): Long = {
+    performOperation(job, false)
+  }
+
+  def addFile(fileStream: InputStream, fileName: String): Unit = {
+    addFile(copyResourceToHDFS(fileStream, fileName))
+  }
+
+  def addJar(jarStream: InputStream, jarName: String): Unit = {
+    addJar(copyResourceToHDFS(jarStream, jarName))
+  }
+
+  def addFile(uri: URI): Unit = {
+    ensureActive()
+    recordActivity()
+    client.get.addFile(resolveURI(uri, livyConf)).get()
+  }
+
+  def addJar(uri: URI): Unit = {
+    ensureActive()
+    recordActivity()
+    client.get.addJar(resolveURI(uri, livyConf)).get()
+  }
+
+  def jobStatus(id: Long): Any = {
+    ensureActive()
+    val clientJobId = operations(id)
+    recordActivity()
+    // TODO: don't block indefinitely?
+    val status = client.get.getBypassJobStatus(clientJobId).get()
+    new JobStatus(id, status.state, status.result, status.error)
+  }
+
+  def cancelJob(id: Long): Unit = {
+    ensureActive()
+    recordActivity()
+    operations.remove(id).foreach { client.get.cancel }
+  }
+
+  private def transition(newState: SessionState) = synchronized {
+    // When a statement returns an error, the session should transit to error state.
+    // If the session crashed because of the error, the session should instead go to dead state.
+    // Since these 2 transitions are triggered by different threads, there's a race condition.
+    // Make sure we won't transit from dead to error state.
+    val areSameStates = serverSideState.getClass() == newState.getClass()
+    val transitFromInactiveToActive = !serverSideState.isActive && newState.isActive
+    if (!areSameStates && !transitFromInactiveToActive) {
+      debug(s"$this session state change from ${serverSideState} to $newState")
+      serverSideState = newState
+    }
+  }
+
+  private def ensureActive(): Unit = synchronized {
+    require(serverSideState.isActive, "Session isn't active.")
+    require(client.isDefined, "Session is active but client hasn't been created.")
+  }
+
+  private def ensureRunning(): Unit = synchronized {
+    serverSideState match {
+      case SessionState.Running() =>
+      case _ =>
+        throw new IllegalStateException("Session is in state %s" format serverSideState)
+    }
+  }
+
+  private def performOperation(job: Array[Byte], sync: Boolean): Long = {
+    ensureActive()
+    recordActivity()
+    val future = client.get.bypass(ByteBuffer.wrap(job), sync)
+    val opId = operationCounter.incrementAndGet()
+    operations(opId) = future
+    opId
+   }
+
+  override def appIdKnown(appId: String): Unit = {
+    _appId = Option(appId)
+    sessionSaveLock.synchronized {
+      sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
+    }
+  }
+
+  override def stateChanged(oldState: SparkApp.State, newState: SparkApp.State): Unit = {
+    synchronized {
+      debug(s"$this app state changed from $oldState to $newState")
+      newState match {
+        case SparkApp.State.FINISHED | SparkApp.State.KILLED | SparkApp.State.FAILED =>
+          transition(SessionState.Dead())
+        case _ =>
+      }
+    }
+  }
+
+  override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
new file mode 100644
index 0000000..3066a98
--- /dev/null
+++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server.interactive
+
+import java.net.URI
+import javax.servlet.http.HttpServletRequest
+
+import scala.collection.JavaConverters._
+import scala.concurrent._
+import scala.concurrent.duration._
+
+import org.json4s.jackson.Json4sScalaModule
+import org.scalatra._
+import org.scalatra.servlet.FileUploadSupport
+
+import org.apache.livy.{ExecuteRequest, JobHandle, LivyConf, Logging}
+import org.apache.livy.client.common.HttpMessages
+import org.apache.livy.client.common.HttpMessages._
+import org.apache.livy.rsc.driver.Statement
+import org.apache.livy.server.SessionServlet
+import org.apache.livy.server.recovery.SessionStore
+import org.apache.livy.sessions._
+
+object InteractiveSessionServlet extends Logging
+
+class InteractiveSessionServlet(
+    sessionManager: InteractiveSessionManager,
+    sessionStore: SessionStore,
+    livyConf: LivyConf)
+  extends SessionServlet(sessionManager, livyConf)
+  with SessionHeartbeatNotifier[InteractiveSession, InteractiveRecoveryMetadata]
+  with FileUploadSupport
+{
+
+  mapper.registerModule(new SessionKindModule())
+    .registerModule(new Json4sScalaModule())
+
+  override protected def createSession(req: HttpServletRequest): InteractiveSession = {
+    val createRequest = bodyAs[CreateInteractiveRequest](req)
+    val proxyUser = checkImpersonation(createRequest.proxyUser, req)
+    InteractiveSession.create(
+      sessionManager.nextId(),
+      remoteUser(req),
+      proxyUser,
+      livyConf,
+      createRequest,
+      sessionStore)
+  }
+
+  override protected[interactive] def clientSessionView(
+      session: InteractiveSession,
+      req: HttpServletRequest): Any = {
+    val logs =
+      if (hasAccess(session.owner, req)) {
+        Option(session.logLines())
+          .map { lines =>
+            val size = 10
+            var from = math.max(0, lines.length - size)
+            val until = from + size
+
+            lines.view(from, until)
+          }
+          .getOrElse(Nil)
+      } else {
+        Nil
+      }
+
+    new SessionInfo(session.id, session.appId.orNull, session.owner, session.proxyUser.orNull,
+      session.state.toString, session.kind.toString, session.appInfo.asJavaMap, logs.asJava)
+  }
+
+  post("/:id/stop") {
+    withSession { session =>
+      Await.ready(session.stop(), Duration.Inf)
+      NoContent()
+    }
+  }
+
+  post("/:id/interrupt") {
+    withSession { session =>
+      Await.ready(session.interrupt(), Duration.Inf)
+      Ok(Map("msg" -> "interrupted"))
+    }
+  }
+
+  get("/:id/statements") {
+    withSession { session =>
+      val statements = session.statements
+      val from = params.get("from").map(_.toInt).getOrElse(0)
+      val size = params.get("size").map(_.toInt).getOrElse(statements.length)
+
+      Map(
+        "total_statements" -> statements.length,
+        "statements" -> statements.view(from, from + size)
+      )
+    }
+  }
+
+  val getStatement = get("/:id/statements/:statementId") {
+    withSession { session =>
+      val statementId = params("statementId").toInt
+
+      session.getStatement(statementId).getOrElse(NotFound("Statement not found"))
+    }
+  }
+
+  jpost[ExecuteRequest]("/:id/statements") { req =>
+    withSession { session =>
+      val statement = session.executeStatement(req)
+
+      Created(statement,
+        headers = Map(
+          "Location" -> url(getStatement,
+            "id" -> session.id.toString,
+            "statementId" -> statement.id.toString)))
+    }
+  }
+
+  post("/:id/statements/:statementId/cancel") {
+    withSession { session =>
+      val statementId = params("statementId")
+      session.cancelStatement(statementId.toInt)
+      Ok(Map("msg" -> "canceled"))
+    }
+  }
+  // This endpoint is used by the client-http module to "connect" to an existing session and
+  // update its last activity time. It performs authorization checks to make sure the caller
+  // has access to the session, so even though it returns the same data, it behaves differently
+  // from get("/:id").
+  post("/:id/connect") {
+    withSession { session =>
+      session.recordActivity()
+      Ok(clientSessionView(session, request))
+    }
+  }
+
+  jpost[SerializedJob]("/:id/submit-job") { req =>
+    withSession { session =>
+      try {
+      require(req.job != null && req.job.length > 0, "no job provided.")
+      val jobId = session.submitJob(req.job)
+      Created(new JobStatus(jobId, JobHandle.State.SENT, null, null))
+      } catch {
+        case e: Throwable =>
+          e.printStackTrace()
+        throw e
+      }
+    }
+  }
+
+  jpost[SerializedJob]("/:id/run-job") { req =>
+    withSession { session =>
+      require(req.job != null && req.job.length > 0, "no job provided.")
+      val jobId = session.runJob(req.job)
+      Created(new JobStatus(jobId, JobHandle.State.SENT, null, null))
+    }
+  }
+
+  post("/:id/upload-jar") {
+    withSession { lsession =>
+      fileParams.get("jar") match {
+        case Some(file) =>
+          lsession.addJar(file.getInputStream, file.name)
+        case None =>
+          BadRequest("No jar sent!")
+      }
+    }
+  }
+
+  post("/:id/upload-pyfile") {
+    withSession { lsession =>
+      fileParams.get("file") match {
+        case Some(file) =>
+          lsession.addJar(file.getInputStream, file.name)
+        case None =>
+          BadRequest("No file sent!")
+      }
+    }
+  }
+
+  post("/:id/upload-file") {
+    withSession { lsession =>
+      fileParams.get("file") match {
+        case Some(file) =>
+          lsession.addFile(file.getInputStream, file.name)
+        case None =>
+          BadRequest("No file sent!")
+      }
+    }
+  }
+
+  jpost[AddResource]("/:id/add-jar") { req =>
+    withSession { lsession =>
+      addJarOrPyFile(req, lsession)
+    }
+  }
+
+  jpost[AddResource]("/:id/add-pyfile") { req =>
+    withSession { lsession =>
+      lsession.kind match {
+        case PySpark() | PySpark3() => addJarOrPyFile(req, lsession)
+        case _ => BadRequest("Only supported for pyspark sessions.")
+      }
+    }
+  }
+
+  jpost[AddResource]("/:id/add-file") { req =>
+    withSession { lsession =>
+      val uri = new URI(req.uri)
+      lsession.addFile(uri)
+    }
+  }
+
+  get("/:id/jobs/:jobid") {
+    withSession { lsession =>
+      val jobId = params("jobid").toLong
+      Ok(lsession.jobStatus(jobId))
+    }
+  }
+
+  post("/:id/jobs/:jobid/cancel") {
+    withSession { lsession =>
+      val jobId = params("jobid").toLong
+      lsession.cancelJob(jobId)
+    }
+  }
+
+  private def addJarOrPyFile(req: HttpMessages.AddResource, session: InteractiveSession): Unit = {
+    val uri = new URI(req.uri)
+    session.addJar(uri)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/org/apache/livy/server/interactive/SessionHeartbeat.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/server/interactive/SessionHeartbeat.scala b/server/src/main/scala/org/apache/livy/server/interactive/SessionHeartbeat.scala
new file mode 100644
index 0000000..b9a1fd7
--- /dev/null
+++ b/server/src/main/scala/org/apache/livy/server/interactive/SessionHeartbeat.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.livy.server.interactive
+
+import java.util.Date
+
+import scala.concurrent.duration.{Deadline, Duration, FiniteDuration}
+
+import org.apache.livy.sessions.Session.RecoveryMetadata
+import org.apache.livy.LivyConf
+import org.apache.livy.server.SessionServlet
+import org.apache.livy.sessions.{Session, SessionManager}
+
+/**
+  * A session trait to provide heartbeat expiration check.
+  * Note: Session will not expire if heartbeat() was never called.
+  */
+trait SessionHeartbeat {
+  protected val heartbeatTimeout: FiniteDuration
+
+  private var _lastHeartbeat: Date = _ // For reporting purpose
+  private var heartbeatDeadline: Option[Deadline] = None
+
+  def heartbeat(): Unit = synchronized {
+    if (heartbeatTimeout > Duration.Zero) {
+      heartbeatDeadline = Some(heartbeatTimeout.fromNow)
+    }
+
+    _lastHeartbeat = new Date()
+  }
+
+  def lastHeartbeat: Date = synchronized { _lastHeartbeat }
+
+  def heartbeatExpired: Boolean = synchronized { heartbeatDeadline.exists(_.isOverdue()) }
+}
+
+/**
+  * Servlet can mixin this trait to update session's heartbeat
+  * whenever a /sessions/:id REST call is made. e.g. GET /sessions/:id
+  * Note: GET /sessions doesn't update heartbeats.
+  */
+trait SessionHeartbeatNotifier[S <: Session with SessionHeartbeat, R <: RecoveryMetadata]
+  extends SessionServlet[S, R] {
+
+  abstract override protected def withUnprotectedSession(fn: (S => Any)): Any = {
+    super.withUnprotectedSession { s =>
+      s.heartbeat()
+      fn(s)
+    }
+  }
+
+  abstract override protected def withSession(fn: (S => Any)): Any = {
+    super.withSession { s =>
+      s.heartbeat()
+      fn(s)
+    }
+  }
+}
+
+/**
+  * A SessionManager trait.
+  * It will create a thread that periodically deletes sessions with expired heartbeat.
+  */
+trait SessionHeartbeatWatchdog[S <: Session with SessionHeartbeat, R <: RecoveryMetadata] {
+  self: SessionManager[S, R] =>
+
+  private val watchdogThread = new Thread(s"HeartbeatWatchdog-${self.getClass.getName}") {
+    override def run(): Unit = {
+      val interval = livyConf.getTimeAsMs(LivyConf.HEARTBEAT_WATCHDOG_INTERVAL)
+      info("Heartbeat watchdog thread started.")
+      while (true) {
+        deleteExpiredSessions()
+        Thread.sleep(interval)
+      }
+    }
+  }
+
+  protected def start(): Unit = {
+    assert(!watchdogThread.isAlive())
+
+    watchdogThread.setDaemon(true)
+    watchdogThread.start()
+  }
+
+  private[interactive] def deleteExpiredSessions(): Unit = {
+    // Delete takes time. If we use .filter().foreach() here, the time difference between we check
+    // expiration and the time we delete the session might be huge. To avoid that, check expiration
+    // inside the foreach block.
+    sessions.values.foreach { s =>
+      if (s.heartbeatExpired) {
+        info(s"Session ${s.id} expired. Last heartbeat is at ${s.lastHeartbeat}.")
+        try { delete(s) } catch {
+          case t: Throwable =>
+            warn(s"Exception was thrown when deleting expired session ${s.id}", t)
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/org/apache/livy/server/recovery/BlackholeStateStore.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/server/recovery/BlackholeStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/BlackholeStateStore.scala
new file mode 100644
index 0000000..df9a712
--- /dev/null
+++ b/server/src/main/scala/org/apache/livy/server/recovery/BlackholeStateStore.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server.recovery
+
+import scala.reflect.ClassTag
+
+import org.apache.livy.LivyConf
+
+/**
+ * This is a blackhole implementation of StateStore.
+ * Livy will use this when session recovery is disabled.
+ */
+class BlackholeStateStore(livyConf: LivyConf) extends StateStore(livyConf) {
+  def set(key: String, value: Object): Unit = {}
+
+  def get[T: ClassTag](key: String): Option[T] = None
+
+  def getChildren(key: String): Seq[String] = List.empty[String]
+
+  def remove(key: String): Unit = {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala
new file mode 100644
index 0000000..d5f8f3d
--- /dev/null
+++ b/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server.recovery
+
+import java.io.{FileNotFoundException, IOException}
+import java.net.URI
+import java.util
+
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.Options.{CreateOpts, Rename}
+import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
+
+import org.apache.livy.{LivyConf, Logging}
+import org.apache.livy.Utils.usingResource
+
+class FileSystemStateStore(
+    livyConf: LivyConf,
+    mockFileContext: Option[FileContext])
+  extends StateStore(livyConf) with Logging {
+
+  // Constructor defined for StateStore factory to new this class using reflection.
+  def this(livyConf: LivyConf) {
+    this(livyConf, None)
+  }
+
+  private val fsUri = {
+    val fsPath = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL)
+    require(!fsPath.isEmpty, s"Please config ${LivyConf.RECOVERY_STATE_STORE_URL.key}.")
+    new URI(fsPath)
+  }
+
+  private val fileContext: FileContext = mockFileContext.getOrElse {
+    FileContext.getFileContext(fsUri)
+  }
+
+  {
+    // Only Livy user should have access to state files.
+    fileContext.setUMask(new FsPermission("077"))
+
+    // Create state store dir if it doesn't exist.
+    val stateStorePath = absPath(".")
+    try {
+      fileContext.mkdir(stateStorePath, FsPermission.getDirDefault(), true)
+    } catch {
+      case _: FileAlreadyExistsException =>
+        if (!fileContext.getFileStatus(stateStorePath).isDirectory()) {
+          throw new IOException(s"$stateStorePath is not a directory.")
+        }
+    }
+
+    // Check permission of state store dir.
+    val fileStatus = fileContext.getFileStatus(absPath("."))
+    require(fileStatus.getPermission.getUserAction() == FsAction.ALL,
+      s"Livy doesn't have permission to access state store: $fsUri.")
+    if (fileStatus.getPermission.getGroupAction != FsAction.NONE) {
+      warn(s"Group users have permission to access state store: $fsUri. This is insecure.")
+    }
+    if (fileStatus.getPermission.getOtherAction != FsAction.NONE) {
+      warn(s"Other users have permission to access state store: $fsUri. This is in secure.")
+    }
+  }
+
+  override def set(key: String, value: Object): Unit = {
+    // Write to a temp file then rename to avoid file corruption if livy-server crashes
+    // in the middle of the write.
+    val tmpPath = absPath(s"$key.tmp")
+    val createFlag = util.EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+
+    usingResource(fileContext.create(tmpPath, createFlag, CreateOpts.createParent())) { tmpFile =>
+      tmpFile.write(serializeToBytes(value))
+      tmpFile.close()
+      // Assume rename is atomic.
+      fileContext.rename(tmpPath, absPath(key), Rename.OVERWRITE)
+    }
+
+    try {
+      val crcPath = new Path(tmpPath.getParent, s".${tmpPath.getName}.crc")
+      fileContext.delete(crcPath, false)
+    } catch {
+      case NonFatal(e) => // Swallow the exception.
+    }
+  }
+
+  override def get[T: ClassTag](key: String): Option[T] = {
+    try {
+      usingResource(fileContext.open(absPath(key))) { is =>
+        Option(deserialize[T](IOUtils.toByteArray(is)))
+      }
+    } catch {
+      case _: FileNotFoundException => None
+      case e: IOException =>
+        warn(s"Failed to read $key from state store.", e)
+        None
+    }
+  }
+
+  override def getChildren(key: String): Seq[String] = {
+    try {
+      fileContext.util.listStatus(absPath(key)).map(_.getPath.getName)
+    } catch {
+      case _: FileNotFoundException => Seq.empty
+      case e: IOException =>
+        warn(s"Failed to list $key from state store.", e)
+        Seq.empty
+    }
+  }
+
+  override def remove(key: String): Unit = {
+    fileContext.delete(absPath(key), false)
+  }
+
+  private def absPath(key: String): Path = new Path(fsUri.getPath(), key)
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/org/apache/livy/server/recovery/SessionStore.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/server/recovery/SessionStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/SessionStore.scala
new file mode 100644
index 0000000..0429295
--- /dev/null
+++ b/server/src/main/scala/org/apache/livy/server/recovery/SessionStore.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server.recovery
+
+import java.io.IOException
+
+import scala.reflect.ClassTag
+import scala.util.{Failure, Success, Try}
+import scala.util.control.NonFatal
+
+import org.apache.livy.{LivyConf, Logging}
+import org.apache.livy.sessions.Session.RecoveryMetadata
+
+private[recovery] case class SessionManagerState(nextSessionId: Int)
+
+/**
+ * SessionStore provides high level functions to get/save session state from/to StateStore.
+ */
+class SessionStore(
+    livyConf: LivyConf,
+    store: => StateStore = StateStore.get) // For unit testing.
+  extends Logging {
+
+  private val STORE_VERSION: String = "v1"
+
+  /**
+   * Persist a session to the session state store.
+   * @param m RecoveryMetadata for the session.
+   */
+  def save(sessionType: String, m: RecoveryMetadata): Unit = {
+    store.set(sessionPath(sessionType, m.id), m)
+  }
+
+  def saveNextSessionId(sessionType: String, id: Int): Unit = {
+    store.set(sessionManagerPath(sessionType), SessionManagerState(id))
+  }
+
+  /**
+   * Return all sessions stored in the store with specified session type.
+   */
+  def getAllSessions[T <: RecoveryMetadata : ClassTag](sessionType: String): Seq[Try[T]] = {
+    store.getChildren(sessionPath(sessionType))
+      .flatMap { c => Try(c.toInt).toOption } // Ignore all non numerical keys
+      .flatMap { id =>
+        val p = sessionPath(sessionType, id)
+        try {
+          store.get[T](p).map(Success(_))
+        } catch {
+          case NonFatal(e) => Some(Failure(new IOException(s"Error getting session $p", e)))
+        }
+      }
+  }
+
+  /**
+   * Return the next unused session id with specified session type.
+   * If checks the SessionManagerState stored and returns the next free session id.
+   * If no SessionManagerState is stored, it returns 0.
+   *
+   * @throws Exception If SessionManagerState stored is corrupted, it throws an error.
+   */
+  def getNextSessionId(sessionType: String): Int = {
+    store.get[SessionManagerState](sessionManagerPath(sessionType))
+      .map(_.nextSessionId).getOrElse(0)
+  }
+
+  /**
+   * Remove a session from the state store.
+   */
+  def remove(sessionType: String, id: Int): Unit = {
+    store.remove(sessionPath(sessionType, id))
+  }
+
+  private def sessionManagerPath(sessionType: String): String =
+    s"$STORE_VERSION/$sessionType/state"
+
+  private def sessionPath(sessionType: String): String =
+    s"$STORE_VERSION/$sessionType"
+
+  private def sessionPath(sessionType: String, id: Int): String =
+    s"$STORE_VERSION/$sessionType/$id"
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala
new file mode 100644
index 0000000..a6c3275
--- /dev/null
+++ b/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.livy.server.recovery
+
+import scala.reflect.{classTag, ClassTag}
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+import org.apache.livy.{LivyConf, Logging}
+import org.apache.livy.sessions.SessionKindModule
+import org.apache.livy.sessions.SessionManager._
+
+protected trait JsonMapper {
+  protected val mapper = new ObjectMapper()
+    .registerModule(DefaultScalaModule)
+    .registerModule(new SessionKindModule())
+
+  def serializeToBytes(value: Object): Array[Byte] = mapper.writeValueAsBytes(value)
+
+  def deserialize[T: ClassTag](json: Array[Byte]): T =
+    mapper.readValue(json, classTag[T].runtimeClass.asInstanceOf[Class[T]])
+}
+
+/**
+ * Interface of a key-value pair storage for state storage.
+ * It's responsible for de/serialization and retrieving/storing object.
+ * It's the low level interface used by higher level classes like SessionStore.
+ *
+ * Hardcoded to use JSON serialization for now for easier ops. Will add better serialization later.
+ */
+abstract class StateStore(livyConf: LivyConf) extends JsonMapper {
+  /**
+   * Set a key-value pair to this state store. It overwrites existing value.
+   * @throws Exception Throw when persisting the state store fails.
+   */
+  def set(key: String, value: Object): Unit
+
+  /**
+   * Get a key-value pair from this state store.
+   * @return Value if the key exists. None if the key doesn't exist.
+   * @throws Exception Throw when deserialization of the stored value fails.
+   */
+  def get[T: ClassTag](key: String): Option[T]
+
+  /**
+   * Treat keys in this state store as a directory tree and
+   * return names of the direct children of the key.
+   * @return List of names of the direct children of the key.
+   *         Empty list if the key doesn't exist or have no child.
+   */
+  def getChildren(key: String): Seq[String]
+
+  /**
+   * Remove the key from this state store. Does not throw if the key doesn't exist.
+   * @throws Exception Throw when persisting the state store fails.
+   */
+  def remove(key: String): Unit
+}
+
+/**
+ * Factory to create the store chosen in LivyConf.
+ */
+object StateStore extends Logging {
+  private[this] var stateStore: Option[StateStore] = None
+
+  def init(livyConf: LivyConf): Unit = synchronized {
+    if (stateStore.isEmpty) {
+      val fileStateStoreClassTag = pickStateStore(livyConf)
+      stateStore = Option(fileStateStoreClassTag.getDeclaredConstructor(classOf[LivyConf])
+        .newInstance(livyConf).asInstanceOf[StateStore])
+      info(s"Using ${stateStore.get.getClass.getSimpleName} for recovery.")
+    }
+  }
+
+  def cleanup(): Unit = synchronized {
+    stateStore = None
+  }
+
+  def get: StateStore = {
+    assert(stateStore.isDefined, "StateStore hasn't been initialized.")
+    stateStore.get
+  }
+
+  private[recovery] def pickStateStore(livyConf: LivyConf): Class[_] = {
+    livyConf.get(LivyConf.RECOVERY_MODE) match {
+      case SESSION_RECOVERY_MODE_OFF => classOf[BlackholeStateStore]
+      case SESSION_RECOVERY_MODE_RECOVERY =>
+        livyConf.get(LivyConf.RECOVERY_STATE_STORE) match {
+          case "filesystem" => classOf[FileSystemStateStore]
+          case "zookeeper" => classOf[ZooKeeperStateStore]
+          case ss => throw new IllegalArgumentException(s"Unsupported state store $ss")
+        }
+      case rm => throw new IllegalArgumentException(s"Unsupported recovery mode $rm")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala
new file mode 100644
index 0000000..ec6b9df
--- /dev/null
+++ b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.livy.server.recovery
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+import scala.util.Try
+
+import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
+import org.apache.curator.framework.api.UnhandledErrorListener
+import org.apache.curator.retry.RetryNTimes
+import org.apache.zookeeper.KeeperException.NoNodeException
+
+import org.apache.livy.{LivyConf, Logging}
+import org.apache.livy.LivyConf.Entry
+
+object ZooKeeperStateStore {
+  val ZK_KEY_PREFIX_CONF = Entry("livy.server.recovery.zk-state-store.key-prefix", "livy")
+  val ZK_RETRY_CONF = Entry("livy.server.recovery.zk-state-store.retry-policy", "5,100")
+}
+
+class ZooKeeperStateStore(
+    livyConf: LivyConf,
+    mockCuratorClient: Option[CuratorFramework] = None) // For testing
+  extends StateStore(livyConf) with Logging {
+
+  import ZooKeeperStateStore._
+
+  // Constructor defined for StateStore factory to new this class using reflection.
+  def this(livyConf: LivyConf) {
+    this(livyConf, None)
+  }
+
+  private val zkAddress = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL)
+  require(!zkAddress.isEmpty, s"Please config ${LivyConf.RECOVERY_STATE_STORE_URL.key}.")
+  private val zkKeyPrefix = livyConf.get(ZK_KEY_PREFIX_CONF)
+  private val retryValue = livyConf.get(ZK_RETRY_CONF)
+  // a regex to match patterns like "m, n" where m and n both are integer values
+  private val retryPattern = """\s*(\d+)\s*,\s*(\d+)\s*""".r
+  private[recovery] val retryPolicy = retryValue match {
+    case retryPattern(n, sleepMs) => new RetryNTimes(n.toInt, sleepMs.toInt)
+    case _ => throw new IllegalArgumentException(
+      s"$ZK_KEY_PREFIX_CONF contains bad value: $retryValue. " +
+        "Correct format is <max retry count>,<sleep ms between retry>. e.g. 5,100")
+  }
+
+  private val curatorClient = mockCuratorClient.getOrElse {
+    CuratorFrameworkFactory.newClient(zkAddress, retryPolicy)
+  }
+
+  Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
+    override def run(): Unit = {
+      curatorClient.close()
+    }
+  }))
+
+  curatorClient.getUnhandledErrorListenable().addListener(new UnhandledErrorListener {
+    def unhandledError(message: String, e: Throwable): Unit = {
+      error(s"Fatal Zookeeper error. Shutting down Livy server.")
+      System.exit(1)
+    }
+  })
+  curatorClient.start()
+  // TODO Make sure ZK path has proper secure permissions so that other users cannot read its
+  // contents.
+
+  override def set(key: String, value: Object): Unit = {
+    val prefixedKey = prefixKey(key)
+    val data = serializeToBytes(value)
+    if (curatorClient.checkExists().forPath(prefixedKey) == null) {
+      curatorClient.create().creatingParentsIfNeeded().forPath(prefixedKey, data)
+    } else {
+      curatorClient.setData().forPath(prefixedKey, data)
+    }
+  }
+
+  override def get[T: ClassTag](key: String): Option[T] = {
+    val prefixedKey = prefixKey(key)
+    if (curatorClient.checkExists().forPath(prefixedKey) == null) {
+      None
+    } else {
+      Option(deserialize[T](curatorClient.getData().forPath(prefixedKey)))
+    }
+  }
+
+  override def getChildren(key: String): Seq[String] = {
+    val prefixedKey = prefixKey(key)
+    if (curatorClient.checkExists().forPath(prefixedKey) == null) {
+      Seq.empty[String]
+    } else {
+      curatorClient.getChildren.forPath(prefixedKey).asScala
+    }
+  }
+
+  override def remove(key: String): Unit = {
+    try {
+      curatorClient.delete().guaranteed().forPath(prefixKey(key))
+    } catch {
+      case _: NoNodeException =>
+    }
+  }
+
+  private def prefixKey(key: String) = s"/$zkKeyPrefix/$key"
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/org/apache/livy/server/ui/UIServlet.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/server/ui/UIServlet.scala b/server/src/main/scala/org/apache/livy/server/ui/UIServlet.scala
new file mode 100644
index 0000000..7396bdc
--- /dev/null
+++ b/server/src/main/scala/org/apache/livy/server/ui/UIServlet.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server.ui
+
+import scala.xml.Node
+
+import org.scalatra.ScalatraServlet
+
+class UIServlet extends ScalatraServlet {
+  before() { contentType = "text/html" }
+
+  def getHeader(title: String): Seq[Node] =
+    <head>
+      <link rel="stylesheet" href="/static/bootstrap.min.css" type="text/css"/>
+      <link rel="stylesheet" href="/static/dataTables.bootstrap.min.css" type="text/css"/>
+      <link rel="stylesheet" href="/static/livy-ui.css" type="text/css"/>
+      <script src="/static/jquery-3.2.1.min.js"></script>
+      <script src="/static/bootstrap.min.js"></script>
+      <script src="/static/jquery.dataTables.min.js"></script>
+      <script src="/static/dataTables.bootstrap.min.js"></script>
+      <script src="/static/all-sessions.js"></script>
+      <title>{title}</title>
+    </head>
+
+  def navBar(pageName: String): Seq[Node] =
+    <nav class="navbar navbar-default">
+      <div class="container-fluid">
+        <div class="navbar-header">
+          <a class="navbar-brand" href="#">
+            <img alt="Livy" src="/static/livy-mini-logo.png"/>
+          </a>
+        </div>
+        <div class="collapse navbar-collapse">
+          <ul class="nav navbar-nav">
+            <li><a href="#">{pageName}</a></li>
+          </ul>
+        </div>
+      </div>
+    </nav>
+
+  def createPage(pageName: String, pageContents: Seq[Node]): Seq[Node] =
+    <html>
+      {getHeader("Livy - " + pageName)}
+      <body>
+        <div class="container">
+          {navBar(pageName)}
+          {pageContents}
+        </div>
+      </body>
+    </html>
+
+  get("/") {
+    val content =
+      <div id="all-sessions">
+        <div id="interactive-sessions"></div>
+        <div id="batches"></div>
+      </div>
+
+    createPage("Sessions", content)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/org/apache/livy/sessions/Session.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/sessions/Session.scala b/server/src/main/scala/org/apache/livy/sessions/Session.scala
new file mode 100644
index 0000000..d467076
--- /dev/null
+++ b/server/src/main/scala/org/apache/livy/sessions/Session.scala
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.sessions
+
+import java.io.InputStream
+import java.net.{URI, URISyntaxException}
+import java.security.PrivilegedExceptionAction
+import java.util.UUID
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.livy.{LivyConf, Logging, Utils}
+import org.apache.livy.utils.AppInfo
+
+object Session {
+  trait RecoveryMetadata { val id: Int }
+
+  lazy val configBlackList: Set[String] = {
+    val url = getClass.getResource("/spark-blacklist.conf")
+    if (url != null) Utils.loadProperties(url).keySet else Set()
+  }
+
+  /**
+   * Validates and prepares a user-provided configuration for submission.
+   *
+   * - Verifies that no blacklisted configurations are provided.
+   * - Merges file lists in the configuration with the explicit lists provided in the request
+   * - Resolve file URIs to make sure they reference the default FS
+   * - Verify that file URIs don't reference non-whitelisted local resources
+   */
+  def prepareConf(
+      conf: Map[String, String],
+      jars: Seq[String],
+      files: Seq[String],
+      archives: Seq[String],
+      pyFiles: Seq[String],
+      livyConf: LivyConf): Map[String, String] = {
+    if (conf == null) {
+      return Map()
+    }
+
+    val errors = conf.keySet.filter(configBlackList.contains)
+    if (errors.nonEmpty) {
+      throw new IllegalArgumentException(
+        "Blacklisted configuration values in session config: " + errors.mkString(", "))
+    }
+
+    val confLists: Map[String, Seq[String]] = livyConf.sparkFileLists
+      .map { key => (key -> Nil) }.toMap
+
+    val userLists = confLists ++ Map(
+      LivyConf.SPARK_JARS -> jars,
+      LivyConf.SPARK_FILES -> files,
+      LivyConf.SPARK_ARCHIVES -> archives,
+      LivyConf.SPARK_PY_FILES -> pyFiles)
+
+    val merged = userLists.flatMap { case (key, list) =>
+      val confList = conf.get(key)
+        .map { list =>
+          resolveURIs(list.split("[, ]+").toSeq, livyConf)
+        }
+        .getOrElse(Nil)
+      val userList = resolveURIs(list, livyConf)
+      if (confList.nonEmpty || userList.nonEmpty) {
+        Some(key -> (userList ++ confList).mkString(","))
+      } else {
+        None
+      }
+    }
+
+    val masterConfList = Map(LivyConf.SPARK_MASTER -> livyConf.sparkMaster()) ++
+      livyConf.sparkDeployMode().map(LivyConf.SPARK_DEPLOY_MODE -> _).toMap
+
+    conf ++ masterConfList ++ merged
+  }
+
+  /**
+   * Prepends the value of the "fs.defaultFS" configuration to any URIs that do not have a
+   * scheme. URIs are required to at least be absolute paths.
+   *
+   * @throws IllegalArgumentException If an invalid URI is found in the given list.
+   */
+  def resolveURIs(uris: Seq[String], livyConf: LivyConf): Seq[String] = {
+    val defaultFS = livyConf.hadoopConf.get("fs.defaultFS").stripSuffix("/")
+    uris.filter(_.nonEmpty).map { _uri =>
+      val uri = try {
+        new URI(_uri)
+      } catch {
+        case e: URISyntaxException => throw new IllegalArgumentException(e)
+      }
+      resolveURI(uri, livyConf).toString()
+    }
+  }
+
+  def resolveURI(uri: URI, livyConf: LivyConf): URI = {
+    val defaultFS = livyConf.hadoopConf.get("fs.defaultFS").stripSuffix("/")
+    val resolved =
+      if (uri.getScheme() == null) {
+        require(uri.getPath().startsWith("/"), s"Path '${uri.getPath()}' is not absolute.")
+        new URI(defaultFS + uri.getPath())
+      } else {
+        uri
+      }
+
+    if (resolved.getScheme() == "file") {
+      // Make sure the location is whitelisted before allowing local files to be added.
+      require(livyConf.localFsWhitelist.find(resolved.getPath().startsWith).isDefined,
+        s"Local path ${uri.getPath()} cannot be added to user sessions.")
+    }
+
+    resolved
+  }
+}
+
+abstract class Session(val id: Int, val owner: String, val livyConf: LivyConf)
+  extends Logging {
+
+  import Session._
+
+  protected implicit val executionContext = ExecutionContext.global
+
+  protected var _appId: Option[String] = None
+
+  private var _lastActivity = System.nanoTime()
+
+  // Directory where the session's staging files are created. The directory is only accessible
+  // to the session's effective user.
+  private var stagingDir: Path = null
+
+  def appId: Option[String] = _appId
+
+  var appInfo: AppInfo = AppInfo()
+
+  def lastActivity: Long = state match {
+    case SessionState.Error(time) => time
+    case SessionState.Dead(time) => time
+    case SessionState.Success(time) => time
+    case _ => _lastActivity
+  }
+
+  def logLines(): IndexedSeq[String]
+
+  def recordActivity(): Unit = {
+    _lastActivity = System.nanoTime()
+  }
+
+  def recoveryMetadata: RecoveryMetadata
+
+  def state: SessionState
+
+  def stop(): Future[Unit] = Future {
+    try {
+      info(s"Stopping $this...")
+      stopSession()
+      info(s"Stopped $this.")
+    } catch {
+      case e: Exception =>
+        warn(s"Error stopping session $id.", e)
+    }
+
+    try {
+      if (stagingDir != null) {
+        debug(s"Deleting session $id staging directory $stagingDir")
+        doAsOwner {
+          val fs = FileSystem.newInstance(livyConf.hadoopConf)
+          try {
+            fs.delete(stagingDir, true)
+          } finally {
+            fs.close()
+          }
+        }
+      }
+    } catch {
+      case e: Exception =>
+        warn(s"Error cleaning up session $id staging dir.", e)
+    }
+  }
+
+
+  override def toString(): String = s"${this.getClass.getSimpleName} $id"
+
+  protected def stopSession(): Unit
+
+  protected val proxyUser: Option[String]
+
+  protected def doAsOwner[T](fn: => T): T = {
+    val user = proxyUser.getOrElse(owner)
+    if (user != null) {
+      val ugi = if (UserGroupInformation.isSecurityEnabled) {
+        if (livyConf.getBoolean(LivyConf.IMPERSONATION_ENABLED)) {
+          UserGroupInformation.createProxyUser(user, UserGroupInformation.getCurrentUser())
+        } else {
+          UserGroupInformation.getCurrentUser()
+        }
+      } else {
+        UserGroupInformation.createRemoteUser(user)
+      }
+      ugi.doAs(new PrivilegedExceptionAction[T] {
+        override def run(): T = fn
+      })
+    } else {
+      fn
+    }
+  }
+
+  protected def copyResourceToHDFS(dataStream: InputStream, name: String): URI = doAsOwner {
+    val fs = FileSystem.newInstance(livyConf.hadoopConf)
+
+    try {
+      val filePath = new Path(getStagingDir(fs), name)
+      debug(s"Uploading user file to $filePath")
+
+      val outFile = fs.create(filePath, true)
+      val buffer = new Array[Byte](512 * 1024)
+      var read = -1
+      try {
+        while ({read = dataStream.read(buffer); read != -1}) {
+          outFile.write(buffer, 0, read)
+        }
+      } finally {
+        outFile.close()
+      }
+      filePath.toUri
+    } finally {
+      fs.close()
+    }
+  }
+
+  private def getStagingDir(fs: FileSystem): Path = synchronized {
+    if (stagingDir == null) {
+      val stagingRoot = Option(livyConf.get(LivyConf.SESSION_STAGING_DIR)).getOrElse {
+        new Path(fs.getHomeDirectory(), ".livy-sessions").toString()
+      }
+
+      val sessionDir = new Path(stagingRoot, UUID.randomUUID().toString())
+      fs.mkdirs(sessionDir)
+      fs.setPermission(sessionDir, new FsPermission("700"))
+      stagingDir = sessionDir
+      debug(s"Session $id staging directory is $stagingDir")
+    }
+    stagingDir
+  }
+
+}