You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/07/05 03:06:37 UTC
[08/33] incubator-livy git commit: LIVY-375. Change Livy code package
name to org.apache.livy
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala
deleted file mode 100644
index 7d7b94e..0000000
--- a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala
+++ /dev/null
@@ -1,609 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.server.interactive
-
-import java.io.{File, InputStream}
-import java.net.URI
-import java.nio.ByteBuffer
-import java.nio.file.{Files, Paths}
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicLong
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.concurrent.Future
-import scala.concurrent.duration.{Duration, FiniteDuration}
-import scala.util.Random
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties
-import com.google.common.annotations.VisibleForTesting
-import org.apache.hadoop.fs.Path
-import org.apache.spark.launcher.SparkLauncher
-
-import com.cloudera.livy._
-import com.cloudera.livy.client.common.HttpMessages._
-import com.cloudera.livy.rsc.{PingJob, RSCClient, RSCConf}
-import com.cloudera.livy.rsc.driver.Statement
-import com.cloudera.livy.server.recovery.SessionStore
-import com.cloudera.livy.sessions._
-import com.cloudera.livy.sessions.Session._
-import com.cloudera.livy.sessions.SessionState.Dead
-import com.cloudera.livy.util.LineBufferedProcess
-import com.cloudera.livy.utils.{AppInfo, LivySparkUtils, SparkApp, SparkAppListener}
-
-@JsonIgnoreProperties(ignoreUnknown = true)
-case class InteractiveRecoveryMetadata(
- id: Int,
- appId: Option[String],
- appTag: String,
- kind: Kind,
- heartbeatTimeoutS: Int,
- owner: String,
- proxyUser: Option[String],
- rscDriverUri: Option[URI],
- version: Int = 1)
- extends RecoveryMetadata
-
-object InteractiveSession extends Logging {
- private[interactive] val SPARK_YARN_IS_PYTHON = "spark.yarn.isPython"
-
- val RECOVERY_SESSION_TYPE = "interactive"
-
- def create(
- id: Int,
- owner: String,
- proxyUser: Option[String],
- livyConf: LivyConf,
- request: CreateInteractiveRequest,
- sessionStore: SessionStore,
- mockApp: Option[SparkApp] = None,
- mockClient: Option[RSCClient] = None): InteractiveSession = {
- val appTag = s"livy-session-$id-${Random.alphanumeric.take(8).mkString}"
-
- val client = mockClient.orElse {
- val conf = SparkApp.prepareSparkConf(appTag, livyConf, prepareConf(
- request.conf, request.jars, request.files, request.archives, request.pyFiles, livyConf))
-
- val builderProperties = prepareBuilderProp(conf, request.kind, livyConf)
-
- val userOpts: Map[String, Option[String]] = Map(
- "spark.driver.cores" -> request.driverCores.map(_.toString),
- SparkLauncher.DRIVER_MEMORY -> request.driverMemory.map(_.toString),
- SparkLauncher.EXECUTOR_CORES -> request.executorCores.map(_.toString),
- SparkLauncher.EXECUTOR_MEMORY -> request.executorMemory.map(_.toString),
- "spark.executor.instances" -> request.numExecutors.map(_.toString),
- "spark.app.name" -> request.name.map(_.toString),
- "spark.yarn.queue" -> request.queue
- )
-
- userOpts.foreach { case (key, opt) =>
- opt.foreach { value => builderProperties.put(key, value) }
- }
-
- builderProperties.getOrElseUpdate("spark.app.name", s"livy-session-$id")
-
- info(s"Creating Interactive session $id: [owner: $owner, request: $request]")
- val builder = new LivyClientBuilder()
- .setAll(builderProperties.asJava)
- .setConf("livy.client.session-id", id.toString)
- .setConf(RSCConf.Entry.DRIVER_CLASS.key(), "com.cloudera.livy.repl.ReplDriver")
- .setConf(RSCConf.Entry.PROXY_USER.key(), proxyUser.orNull)
- .setURI(new URI("rsc:/"))
-
- Option(builder.build().asInstanceOf[RSCClient])
- }
-
- new InteractiveSession(
- id,
- None,
- appTag,
- client,
- SessionState.Starting(),
- request.kind,
- request.heartbeatTimeoutInSecond,
- livyConf,
- owner,
- proxyUser,
- sessionStore,
- mockApp)
- }
-
- def recover(
- metadata: InteractiveRecoveryMetadata,
- livyConf: LivyConf,
- sessionStore: SessionStore,
- mockApp: Option[SparkApp] = None,
- mockClient: Option[RSCClient] = None): InteractiveSession = {
- val client = mockClient.orElse(metadata.rscDriverUri.map { uri =>
- val builder = new LivyClientBuilder().setURI(uri)
- builder.build().asInstanceOf[RSCClient]
- })
-
- new InteractiveSession(
- metadata.id,
- metadata.appId,
- metadata.appTag,
- client,
- SessionState.Recovering(),
- metadata.kind,
- metadata.heartbeatTimeoutS,
- livyConf,
- metadata.owner,
- metadata.proxyUser,
- sessionStore,
- mockApp)
- }
-
- @VisibleForTesting
- private[interactive] def prepareBuilderProp(
- conf: Map[String, String],
- kind: Kind,
- livyConf: LivyConf): mutable.Map[String, String] = {
-
- val builderProperties = mutable.Map[String, String]()
- builderProperties ++= conf
-
- def livyJars(livyConf: LivyConf, scalaVersion: String): List[String] = {
- Option(livyConf.get(LivyConf.REPL_JARS)).map { jars =>
- val regex = """[\w-]+_(\d\.\d\d).*\.jar""".r
- jars.split(",").filter { name => new Path(name).getName match {
- // Filter out unmatched scala jars
- case regex(ver) => ver == scalaVersion
- // Keep all the java jars end with ".jar"
- case _ => name.endsWith(".jar")
- }
- }.toList
- }.getOrElse {
- val home = sys.env("LIVY_HOME")
- val jars = Option(new File(home, s"repl_$scalaVersion-jars"))
- .filter(_.isDirectory())
- .getOrElse(new File(home, s"repl/scala-$scalaVersion/target/jars"))
- require(jars.isDirectory(), "Cannot find Livy REPL jars.")
- jars.listFiles().map(_.getAbsolutePath()).toList
- }
- }
-
- def findSparkRArchive(): Option[String] = {
- Option(livyConf.get(RSCConf.Entry.SPARKR_PACKAGE.key())).orElse {
- sys.env.get("SPARK_HOME").map { case sparkHome =>
- val path = Seq(sparkHome, "R", "lib", "sparkr.zip").mkString(File.separator)
- val rArchivesFile = new File(path)
- require(rArchivesFile.exists(), "sparkr.zip not found; cannot run sparkr application.")
- rArchivesFile.getAbsolutePath()
- }
- }
- }
-
- def datanucleusJars(livyConf: LivyConf, sparkMajorVersion: Int): Seq[String] = {
- if (sys.env.getOrElse("LIVY_INTEGRATION_TEST", "false").toBoolean) {
- // datanucleus jars has already been in classpath in integration test
- Seq.empty
- } else {
- val sparkHome = livyConf.sparkHome().get
- val libdir = sparkMajorVersion match {
- case 1 =>
- if (new File(sparkHome, "RELEASE").isFile) {
- new File(sparkHome, "lib")
- } else {
- new File(sparkHome, "lib_managed/jars")
- }
- case 2 =>
- if (new File(sparkHome, "RELEASE").isFile) {
- new File(sparkHome, "jars")
- } else if (new File(sparkHome, "assembly/target/scala-2.11/jars").isDirectory) {
- new File(sparkHome, "assembly/target/scala-2.11/jars")
- } else {
- new File(sparkHome, "assembly/target/scala-2.10/jars")
- }
- case v =>
- throw new RuntimeException("Unsupported spark major version:" + sparkMajorVersion)
- }
- val jars = if (!libdir.isDirectory) {
- Seq.empty[String]
- } else {
- libdir.listFiles().filter(_.getName.startsWith("datanucleus-"))
- .map(_.getAbsolutePath).toSeq
- }
- if (jars.isEmpty) {
- warn("datanucleus jars can not be found")
- }
- jars
- }
- }
-
- /**
- * Look for hive-site.xml (for now just ignore spark.files defined in spark-defaults.conf)
- * 1. First look for hive-site.xml in user request
- * 2. Then look for that under classpath
- * @param livyConf
- * @return (hive-site.xml path, whether it is provided by user)
- */
- def hiveSiteFile(sparkFiles: Array[String], livyConf: LivyConf): (Option[File], Boolean) = {
- if (sparkFiles.exists(_.split("/").last == "hive-site.xml")) {
- (None, true)
- } else {
- val hiveSiteURL = getClass.getResource("/hive-site.xml")
- if (hiveSiteURL != null && hiveSiteURL.getProtocol == "file") {
- (Some(new File(hiveSiteURL.toURI)), false)
- } else {
- (None, false)
- }
- }
- }
-
- def findPySparkArchives(): Seq[String] = {
- Option(livyConf.get(RSCConf.Entry.PYSPARK_ARCHIVES))
- .map(_.split(",").toSeq)
- .getOrElse {
- sys.env.get("SPARK_HOME") .map { case sparkHome =>
- val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
- val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
- require(pyArchivesFile.exists(),
- "pyspark.zip not found; cannot run pyspark application in YARN mode.")
-
- val py4jFile = Files.newDirectoryStream(Paths.get(pyLibPath), "py4j-*-src.zip")
- .iterator()
- .next()
- .toFile
-
- require(py4jFile.exists(),
- "py4j-*-src.zip not found; cannot run pyspark application in YARN mode.")
- Seq(pyArchivesFile.getAbsolutePath, py4jFile.getAbsolutePath)
- }.getOrElse(Seq())
- }
- }
-
- def mergeConfList(list: Seq[String], key: String): Unit = {
- if (list.nonEmpty) {
- builderProperties.get(key) match {
- case None =>
- builderProperties.put(key, list.mkString(","))
- case Some(oldList) =>
- val newList = (oldList :: list.toList).mkString(",")
- builderProperties.put(key, newList)
- }
- }
- }
-
- def mergeHiveSiteAndHiveDeps(sparkMajorVersion: Int): Unit = {
- val sparkFiles = conf.get("spark.files").map(_.split(",")).getOrElse(Array.empty[String])
- hiveSiteFile(sparkFiles, livyConf) match {
- case (_, true) =>
- debug("Enable HiveContext because hive-site.xml is found in user request.")
- mergeConfList(datanucleusJars(livyConf, sparkMajorVersion), LivyConf.SPARK_JARS)
- case (Some(file), false) =>
- debug("Enable HiveContext because hive-site.xml is found under classpath, "
- + file.getAbsolutePath)
- mergeConfList(List(file.getAbsolutePath), LivyConf.SPARK_FILES)
- mergeConfList(datanucleusJars(livyConf, sparkMajorVersion), LivyConf.SPARK_JARS)
- case (None, false) =>
- warn("Enable HiveContext but no hive-site.xml found under" +
- " classpath or user request.")
- }
- }
-
- kind match {
- case PySpark() | PySpark3() =>
- val pySparkFiles = if (!LivyConf.TEST_MODE) findPySparkArchives() else Nil
- mergeConfList(pySparkFiles, LivyConf.SPARK_PY_FILES)
- builderProperties.put(SPARK_YARN_IS_PYTHON, "true")
- case SparkR() =>
- val sparkRArchive = if (!LivyConf.TEST_MODE) findSparkRArchive() else None
- sparkRArchive.foreach { archive =>
- builderProperties.put(RSCConf.Entry.SPARKR_PACKAGE.key(), archive + "#sparkr")
- }
- case _ =>
- }
- builderProperties.put(RSCConf.Entry.SESSION_KIND.key, kind.toString)
-
- // Set Livy.rsc.jars from livy conf to rsc conf, RSC conf will take precedence if both are set.
- Option(livyConf.get(LivyConf.RSC_JARS)).foreach(
- builderProperties.getOrElseUpdate(RSCConf.Entry.LIVY_JARS.key(), _))
-
- require(livyConf.get(LivyConf.LIVY_SPARK_VERSION) != null)
- require(livyConf.get(LivyConf.LIVY_SPARK_SCALA_VERSION) != null)
-
- val (sparkMajorVersion, _) =
- LivySparkUtils.formatSparkVersion(livyConf.get(LivyConf.LIVY_SPARK_VERSION))
- val scalaVersion = livyConf.get(LivyConf.LIVY_SPARK_SCALA_VERSION)
-
- mergeConfList(livyJars(livyConf, scalaVersion), LivyConf.SPARK_JARS)
- val enableHiveContext = livyConf.getBoolean(LivyConf.ENABLE_HIVE_CONTEXT)
- // pass spark.livy.spark_major_version to driver
- builderProperties.put("spark.livy.spark_major_version", sparkMajorVersion.toString)
-
- if (sparkMajorVersion <= 1) {
- builderProperties.put("spark.repl.enableHiveContext",
- livyConf.getBoolean(LivyConf.ENABLE_HIVE_CONTEXT).toString)
- } else {
- val confVal = if (enableHiveContext) "hive" else "in-memory"
- builderProperties.put("spark.sql.catalogImplementation", confVal)
- }
-
- if (enableHiveContext) {
- mergeHiveSiteAndHiveDeps(sparkMajorVersion)
- }
-
- builderProperties
- }
-}
-
-class InteractiveSession(
- id: Int,
- appIdHint: Option[String],
- appTag: String,
- client: Option[RSCClient],
- initialState: SessionState,
- val kind: Kind,
- heartbeatTimeoutS: Int,
- livyConf: LivyConf,
- owner: String,
- override val proxyUser: Option[String],
- sessionStore: SessionStore,
- mockApp: Option[SparkApp]) // For unit test.
- extends Session(id, owner, livyConf)
- with SessionHeartbeat
- with SparkAppListener {
-
- import InteractiveSession._
-
- private var serverSideState: SessionState = initialState
-
- override protected val heartbeatTimeout: FiniteDuration = {
- val heartbeatTimeoutInSecond = heartbeatTimeoutS
- Duration(heartbeatTimeoutInSecond, TimeUnit.SECONDS)
- }
- private val operations = mutable.Map[Long, String]()
- private val operationCounter = new AtomicLong(0)
- private var rscDriverUri: Option[URI] = None
- private var sessionLog: IndexedSeq[String] = IndexedSeq.empty
- private val sessionSaveLock = new Object()
-
- _appId = appIdHint
- sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
- heartbeat()
-
- private val app = mockApp.orElse {
- if (livyConf.isRunningOnYarn()) {
- val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
- .map(new LineBufferedProcess(_))
- // When Livy is running with YARN, SparkYarnApp can provide better YARN integration.
- // (e.g. Reflect YARN application state to session state).
- Option(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
- } else {
- // When Livy is running with other cluster manager, SparkApp doesn't provide any
- // additional benefit over controlling RSCDriver using RSCClient. Don't use it.
- None
- }
- }
-
- if (client.isEmpty) {
- transition(Dead())
- val msg = s"Cannot recover interactive session $id because its RSCDriver URI is unknown."
- info(msg)
- sessionLog = IndexedSeq(msg)
- } else {
- val uriFuture = Future { client.get.getServerUri.get() }
-
- uriFuture onSuccess { case url =>
- rscDriverUri = Option(url)
- sessionSaveLock.synchronized {
- sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
- }
- }
- uriFuture onFailure { case e => warn("Fail to get rsc uri", e) }
-
- // Send a dummy job that will return once the client is ready to be used, and set the
- // state to "idle" at that point.
- client.get.submit(new PingJob()).addListener(new JobHandle.Listener[Void]() {
- override def onJobQueued(job: JobHandle[Void]): Unit = { }
- override def onJobStarted(job: JobHandle[Void]): Unit = { }
-
- override def onJobCancelled(job: JobHandle[Void]): Unit = errorOut()
-
- override def onJobFailed(job: JobHandle[Void], cause: Throwable): Unit = errorOut()
-
- override def onJobSucceeded(job: JobHandle[Void], result: Void): Unit = {
- transition(SessionState.Running())
- info(s"Interactive session $id created [appid: ${appId.orNull}, owner: $owner, proxyUser:" +
- s" $proxyUser, state: ${state.toString}, kind: ${kind.toString}, " +
- s"info: ${appInfo.asJavaMap}]")
- }
-
- private def errorOut(): Unit = {
- // Other code might call stop() to close the RPC channel. When RPC channel is closing,
- // this callback might be triggered. Check and don't call stop() to avoid nested called
- // if the session is already shutting down.
- if (serverSideState != SessionState.ShuttingDown()) {
- transition(SessionState.Error())
- stop()
- app.foreach { a =>
- info(s"Failed to ping RSC driver for session $id. Killing application.")
- a.kill()
- }
- }
- }
- })
- }
-
- override def logLines(): IndexedSeq[String] = app.map(_.log()).getOrElse(sessionLog)
-
- override def recoveryMetadata: RecoveryMetadata =
- InteractiveRecoveryMetadata(
- id, appId, appTag, kind, heartbeatTimeout.toSeconds.toInt, owner, proxyUser, rscDriverUri)
-
- override def state: SessionState = {
- if (serverSideState.isInstanceOf[SessionState.Running]) {
- // If session is in running state, return the repl state from RSCClient.
- client
- .flatMap(s => Option(s.getReplState))
- .map(SessionState(_))
- .getOrElse(SessionState.Busy()) // If repl state is unknown, assume repl is busy.
- } else {
- serverSideState
- }
- }
-
- override def stopSession(): Unit = {
- try {
- transition(SessionState.ShuttingDown())
- sessionStore.remove(RECOVERY_SESSION_TYPE, id)
- client.foreach { _.stop(true) }
- } catch {
- case _: Exception =>
- app.foreach {
- warn(s"Failed to stop RSCDriver. Killing it...")
- _.kill()
- }
- } finally {
- transition(SessionState.Dead())
- }
- }
-
- def statements: IndexedSeq[Statement] = {
- ensureActive()
- val r = client.get.getReplJobResults().get()
- r.statements.toIndexedSeq
- }
-
- def getStatement(stmtId: Int): Option[Statement] = {
- ensureActive()
- val r = client.get.getReplJobResults(stmtId, 1).get()
- if (r.statements.length < 1) {
- None
- } else {
- Option(r.statements(0))
- }
- }
-
- def interrupt(): Future[Unit] = {
- stop()
- }
-
- def executeStatement(content: ExecuteRequest): Statement = {
- ensureRunning()
- recordActivity()
-
- val id = client.get.submitReplCode(content.code).get
- client.get.getReplJobResults(id, 1).get().statements(0)
- }
-
- def cancelStatement(statementId: Int): Unit = {
- ensureRunning()
- recordActivity()
- client.get.cancelReplCode(statementId)
- }
-
- def runJob(job: Array[Byte]): Long = {
- performOperation(job, true)
- }
-
- def submitJob(job: Array[Byte]): Long = {
- performOperation(job, false)
- }
-
- def addFile(fileStream: InputStream, fileName: String): Unit = {
- addFile(copyResourceToHDFS(fileStream, fileName))
- }
-
- def addJar(jarStream: InputStream, jarName: String): Unit = {
- addJar(copyResourceToHDFS(jarStream, jarName))
- }
-
- def addFile(uri: URI): Unit = {
- ensureActive()
- recordActivity()
- client.get.addFile(resolveURI(uri, livyConf)).get()
- }
-
- def addJar(uri: URI): Unit = {
- ensureActive()
- recordActivity()
- client.get.addJar(resolveURI(uri, livyConf)).get()
- }
-
- def jobStatus(id: Long): Any = {
- ensureActive()
- val clientJobId = operations(id)
- recordActivity()
- // TODO: don't block indefinitely?
- val status = client.get.getBypassJobStatus(clientJobId).get()
- new JobStatus(id, status.state, status.result, status.error)
- }
-
- def cancelJob(id: Long): Unit = {
- ensureActive()
- recordActivity()
- operations.remove(id).foreach { client.get.cancel }
- }
-
- private def transition(newState: SessionState) = synchronized {
- // When a statement returns an error, the session should transit to error state.
- // If the session crashed because of the error, the session should instead go to dead state.
- // Since these 2 transitions are triggered by different threads, there's a race condition.
- // Make sure we won't transit from dead to error state.
- val areSameStates = serverSideState.getClass() == newState.getClass()
- val transitFromInactiveToActive = !serverSideState.isActive && newState.isActive
- if (!areSameStates && !transitFromInactiveToActive) {
- debug(s"$this session state change from ${serverSideState} to $newState")
- serverSideState = newState
- }
- }
-
- private def ensureActive(): Unit = synchronized {
- require(serverSideState.isActive, "Session isn't active.")
- require(client.isDefined, "Session is active but client hasn't been created.")
- }
-
- private def ensureRunning(): Unit = synchronized {
- serverSideState match {
- case SessionState.Running() =>
- case _ =>
- throw new IllegalStateException("Session is in state %s" format serverSideState)
- }
- }
-
- private def performOperation(job: Array[Byte], sync: Boolean): Long = {
- ensureActive()
- recordActivity()
- val future = client.get.bypass(ByteBuffer.wrap(job), sync)
- val opId = operationCounter.incrementAndGet()
- operations(opId) = future
- opId
- }
-
- override def appIdKnown(appId: String): Unit = {
- _appId = Option(appId)
- sessionSaveLock.synchronized {
- sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
- }
- }
-
- override def stateChanged(oldState: SparkApp.State, newState: SparkApp.State): Unit = {
- synchronized {
- debug(s"$this app state changed from $oldState to $newState")
- newState match {
- case SparkApp.State.FINISHED | SparkApp.State.KILLED | SparkApp.State.FAILED =>
- transition(SessionState.Dead())
- case _ =>
- }
- }
- }
-
- override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSessionServlet.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSessionServlet.scala b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSessionServlet.scala
deleted file mode 100644
index c2263db..0000000
--- a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSessionServlet.scala
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.server.interactive
-
-import java.net.URI
-import javax.servlet.http.HttpServletRequest
-
-import scala.collection.JavaConverters._
-import scala.concurrent._
-import scala.concurrent.duration._
-
-import org.json4s.jackson.Json4sScalaModule
-import org.scalatra._
-import org.scalatra.servlet.FileUploadSupport
-
-import com.cloudera.livy.{ExecuteRequest, JobHandle, LivyConf, Logging}
-import com.cloudera.livy.client.common.HttpMessages
-import com.cloudera.livy.client.common.HttpMessages._
-import com.cloudera.livy.rsc.driver.Statement
-import com.cloudera.livy.server.SessionServlet
-import com.cloudera.livy.server.recovery.SessionStore
-import com.cloudera.livy.sessions._
-
-object InteractiveSessionServlet extends Logging
-
-class InteractiveSessionServlet(
- sessionManager: InteractiveSessionManager,
- sessionStore: SessionStore,
- livyConf: LivyConf)
- extends SessionServlet(sessionManager, livyConf)
- with SessionHeartbeatNotifier[InteractiveSession, InteractiveRecoveryMetadata]
- with FileUploadSupport
-{
-
- mapper.registerModule(new SessionKindModule())
- .registerModule(new Json4sScalaModule())
-
- override protected def createSession(req: HttpServletRequest): InteractiveSession = {
- val createRequest = bodyAs[CreateInteractiveRequest](req)
- val proxyUser = checkImpersonation(createRequest.proxyUser, req)
- InteractiveSession.create(
- sessionManager.nextId(),
- remoteUser(req),
- proxyUser,
- livyConf,
- createRequest,
- sessionStore)
- }
-
- override protected[interactive] def clientSessionView(
- session: InteractiveSession,
- req: HttpServletRequest): Any = {
- val logs =
- if (hasAccess(session.owner, req)) {
- Option(session.logLines())
- .map { lines =>
- val size = 10
- var from = math.max(0, lines.length - size)
- val until = from + size
-
- lines.view(from, until)
- }
- .getOrElse(Nil)
- } else {
- Nil
- }
-
- new SessionInfo(session.id, session.appId.orNull, session.owner, session.proxyUser.orNull,
- session.state.toString, session.kind.toString, session.appInfo.asJavaMap, logs.asJava)
- }
-
- post("/:id/stop") {
- withSession { session =>
- Await.ready(session.stop(), Duration.Inf)
- NoContent()
- }
- }
-
- post("/:id/interrupt") {
- withSession { session =>
- Await.ready(session.interrupt(), Duration.Inf)
- Ok(Map("msg" -> "interrupted"))
- }
- }
-
- get("/:id/statements") {
- withSession { session =>
- val statements = session.statements
- val from = params.get("from").map(_.toInt).getOrElse(0)
- val size = params.get("size").map(_.toInt).getOrElse(statements.length)
-
- Map(
- "total_statements" -> statements.length,
- "statements" -> statements.view(from, from + size)
- )
- }
- }
-
- val getStatement = get("/:id/statements/:statementId") {
- withSession { session =>
- val statementId = params("statementId").toInt
-
- session.getStatement(statementId).getOrElse(NotFound("Statement not found"))
- }
- }
-
- jpost[ExecuteRequest]("/:id/statements") { req =>
- withSession { session =>
- val statement = session.executeStatement(req)
-
- Created(statement,
- headers = Map(
- "Location" -> url(getStatement,
- "id" -> session.id.toString,
- "statementId" -> statement.id.toString)))
- }
- }
-
- post("/:id/statements/:statementId/cancel") {
- withSession { session =>
- val statementId = params("statementId")
- session.cancelStatement(statementId.toInt)
- Ok(Map("msg" -> "canceled"))
- }
- }
- // This endpoint is used by the client-http module to "connect" to an existing session and
- // update its last activity time. It performs authorization checks to make sure the caller
- // has access to the session, so even though it returns the same data, it behaves differently
- // from get("/:id").
- post("/:id/connect") {
- withSession { session =>
- session.recordActivity()
- Ok(clientSessionView(session, request))
- }
- }
-
- jpost[SerializedJob]("/:id/submit-job") { req =>
- withSession { session =>
- try {
- require(req.job != null && req.job.length > 0, "no job provided.")
- val jobId = session.submitJob(req.job)
- Created(new JobStatus(jobId, JobHandle.State.SENT, null, null))
- } catch {
- case e: Throwable =>
- e.printStackTrace()
- throw e
- }
- }
- }
-
- jpost[SerializedJob]("/:id/run-job") { req =>
- withSession { session =>
- require(req.job != null && req.job.length > 0, "no job provided.")
- val jobId = session.runJob(req.job)
- Created(new JobStatus(jobId, JobHandle.State.SENT, null, null))
- }
- }
-
- post("/:id/upload-jar") {
- withSession { lsession =>
- fileParams.get("jar") match {
- case Some(file) =>
- lsession.addJar(file.getInputStream, file.name)
- case None =>
- BadRequest("No jar sent!")
- }
- }
- }
-
- post("/:id/upload-pyfile") {
- withSession { lsession =>
- fileParams.get("file") match {
- case Some(file) =>
- lsession.addJar(file.getInputStream, file.name)
- case None =>
- BadRequest("No file sent!")
- }
- }
- }
-
- post("/:id/upload-file") {
- withSession { lsession =>
- fileParams.get("file") match {
- case Some(file) =>
- lsession.addFile(file.getInputStream, file.name)
- case None =>
- BadRequest("No file sent!")
- }
- }
- }
-
- jpost[AddResource]("/:id/add-jar") { req =>
- withSession { lsession =>
- addJarOrPyFile(req, lsession)
- }
- }
-
- jpost[AddResource]("/:id/add-pyfile") { req =>
- withSession { lsession =>
- lsession.kind match {
- case PySpark() | PySpark3() => addJarOrPyFile(req, lsession)
- case _ => BadRequest("Only supported for pyspark sessions.")
- }
- }
- }
-
- jpost[AddResource]("/:id/add-file") { req =>
- withSession { lsession =>
- val uri = new URI(req.uri)
- lsession.addFile(uri)
- }
- }
-
- get("/:id/jobs/:jobid") {
- withSession { lsession =>
- val jobId = params("jobid").toLong
- Ok(lsession.jobStatus(jobId))
- }
- }
-
- post("/:id/jobs/:jobid/cancel") {
- withSession { lsession =>
- val jobId = params("jobid").toLong
- lsession.cancelJob(jobId)
- }
- }
-
- private def addJarOrPyFile(req: HttpMessages.AddResource, session: InteractiveSession): Unit = {
- val uri = new URI(req.uri)
- session.addJar(uri)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/interactive/SessionHeartbeat.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/server/interactive/SessionHeartbeat.scala b/server/src/main/scala/com/cloudera/livy/server/interactive/SessionHeartbeat.scala
deleted file mode 100644
index 20bc582..0000000
--- a/server/src/main/scala/com/cloudera/livy/server/interactive/SessionHeartbeat.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.cloudera.livy.server.interactive
-
-import java.util.Date
-
-import scala.concurrent.duration.{Deadline, Duration, FiniteDuration}
-
-import com.cloudera.livy.sessions.Session.RecoveryMetadata
-import com.cloudera.livy.LivyConf
-import com.cloudera.livy.server.SessionServlet
-import com.cloudera.livy.sessions.{Session, SessionManager}
-
-/**
- * A session trait to provide heartbeat expiration check.
- * Note: Session will not expire if heartbeat() was never called.
- */
-trait SessionHeartbeat {
- protected val heartbeatTimeout: FiniteDuration
-
- private var _lastHeartbeat: Date = _ // For reporting purpose
- private var heartbeatDeadline: Option[Deadline] = None
-
- def heartbeat(): Unit = synchronized {
- if (heartbeatTimeout > Duration.Zero) {
- heartbeatDeadline = Some(heartbeatTimeout.fromNow)
- }
-
- _lastHeartbeat = new Date()
- }
-
- def lastHeartbeat: Date = synchronized { _lastHeartbeat }
-
- def heartbeatExpired: Boolean = synchronized { heartbeatDeadline.exists(_.isOverdue()) }
-}
-
-/**
- * Servlet can mixin this trait to update session's heartbeat
- * whenever a /sessions/:id REST call is made. e.g. GET /sessions/:id
- * Note: GET /sessions doesn't update heartbeats.
- */
-trait SessionHeartbeatNotifier[S <: Session with SessionHeartbeat, R <: RecoveryMetadata]
- extends SessionServlet[S, R] {
-
- abstract override protected def withUnprotectedSession(fn: (S => Any)): Any = {
- super.withUnprotectedSession { s =>
- s.heartbeat()
- fn(s)
- }
- }
-
- abstract override protected def withSession(fn: (S => Any)): Any = {
- super.withSession { s =>
- s.heartbeat()
- fn(s)
- }
- }
-}
-
-/**
- * A SessionManager trait.
- * It will create a thread that periodically deletes sessions with expired heartbeat.
- */
-trait SessionHeartbeatWatchdog[S <: Session with SessionHeartbeat, R <: RecoveryMetadata] {
- self: SessionManager[S, R] =>
-
- private val watchdogThread = new Thread(s"HeartbeatWatchdog-${self.getClass.getName}") {
- override def run(): Unit = {
- val interval = livyConf.getTimeAsMs(LivyConf.HEARTBEAT_WATCHDOG_INTERVAL)
- info("Heartbeat watchdog thread started.")
- while (true) {
- deleteExpiredSessions()
- Thread.sleep(interval)
- }
- }
- }
-
- protected def start(): Unit = {
- assert(!watchdogThread.isAlive())
-
- watchdogThread.setDaemon(true)
- watchdogThread.start()
- }
-
- private[interactive] def deleteExpiredSessions(): Unit = {
- // Delete takes time. If we use .filter().foreach() here, the time difference between we check
- // expiration and the time we delete the session might be huge. To avoid that, check expiration
- // inside the foreach block.
- sessions.values.foreach { s =>
- if (s.heartbeatExpired) {
- info(s"Session ${s.id} expired. Last heartbeat is at ${s.lastHeartbeat}.")
- try { delete(s) } catch {
- case t: Throwable =>
- warn(s"Exception was thrown when deleting expired session ${s.id}", t)
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/recovery/BlackholeStateStore.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/server/recovery/BlackholeStateStore.scala b/server/src/main/scala/com/cloudera/livy/server/recovery/BlackholeStateStore.scala
deleted file mode 100644
index 89b133e..0000000
--- a/server/src/main/scala/com/cloudera/livy/server/recovery/BlackholeStateStore.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.server.recovery
-
-import scala.reflect.ClassTag
-
-import com.cloudera.livy.LivyConf
-
-/**
- * This is a blackhole implementation of StateStore.
- * Livy will use this when session recovery is disabled.
- */
-class BlackholeStateStore(livyConf: LivyConf) extends StateStore(livyConf) {
- def set(key: String, value: Object): Unit = {}
-
- def get[T: ClassTag](key: String): Option[T] = None
-
- def getChildren(key: String): Seq[String] = List.empty[String]
-
- def remove(key: String): Unit = {}
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/recovery/FileSystemStateStore.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/server/recovery/FileSystemStateStore.scala b/server/src/main/scala/com/cloudera/livy/server/recovery/FileSystemStateStore.scala
deleted file mode 100644
index 5e17678..0000000
--- a/server/src/main/scala/com/cloudera/livy/server/recovery/FileSystemStateStore.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.server.recovery
-
-import java.io.{FileNotFoundException, IOException}
-import java.net.URI
-import java.util
-
-import scala.reflect.ClassTag
-import scala.util.control.NonFatal
-
-import org.apache.commons.io.IOUtils
-import org.apache.hadoop.fs._
-import org.apache.hadoop.fs.Options.{CreateOpts, Rename}
-import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
-
-import com.cloudera.livy.{LivyConf, Logging}
-import com.cloudera.livy.Utils.usingResource
-
-class FileSystemStateStore(
- livyConf: LivyConf,
- mockFileContext: Option[FileContext])
- extends StateStore(livyConf) with Logging {
-
- // Constructor defined for StateStore factory to new this class using reflection.
- def this(livyConf: LivyConf) {
- this(livyConf, None)
- }
-
- private val fsUri = {
- val fsPath = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL)
- require(!fsPath.isEmpty, s"Please config ${LivyConf.RECOVERY_STATE_STORE_URL.key}.")
- new URI(fsPath)
- }
-
- private val fileContext: FileContext = mockFileContext.getOrElse {
- FileContext.getFileContext(fsUri)
- }
-
- {
- // Only Livy user should have access to state files.
- fileContext.setUMask(new FsPermission("077"))
-
- // Create state store dir if it doesn't exist.
- val stateStorePath = absPath(".")
- try {
- fileContext.mkdir(stateStorePath, FsPermission.getDirDefault(), true)
- } catch {
- case _: FileAlreadyExistsException =>
- if (!fileContext.getFileStatus(stateStorePath).isDirectory()) {
- throw new IOException(s"$stateStorePath is not a directory.")
- }
- }
-
- // Check permission of state store dir.
- val fileStatus = fileContext.getFileStatus(absPath("."))
- require(fileStatus.getPermission.getUserAction() == FsAction.ALL,
- s"Livy doesn't have permission to access state store: $fsUri.")
- if (fileStatus.getPermission.getGroupAction != FsAction.NONE) {
- warn(s"Group users have permission to access state store: $fsUri. This is insecure.")
- }
- if (fileStatus.getPermission.getOtherAction != FsAction.NONE) {
- warn(s"Other users have permission to access state store: $fsUri. This is in secure.")
- }
- }
-
- override def set(key: String, value: Object): Unit = {
- // Write to a temp file then rename to avoid file corruption if livy-server crashes
- // in the middle of the write.
- val tmpPath = absPath(s"$key.tmp")
- val createFlag = util.EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
-
- usingResource(fileContext.create(tmpPath, createFlag, CreateOpts.createParent())) { tmpFile =>
- tmpFile.write(serializeToBytes(value))
- tmpFile.close()
- // Assume rename is atomic.
- fileContext.rename(tmpPath, absPath(key), Rename.OVERWRITE)
- }
-
- try {
- val crcPath = new Path(tmpPath.getParent, s".${tmpPath.getName}.crc")
- fileContext.delete(crcPath, false)
- } catch {
- case NonFatal(e) => // Swallow the exception.
- }
- }
-
- override def get[T: ClassTag](key: String): Option[T] = {
- try {
- usingResource(fileContext.open(absPath(key))) { is =>
- Option(deserialize[T](IOUtils.toByteArray(is)))
- }
- } catch {
- case _: FileNotFoundException => None
- case e: IOException =>
- warn(s"Failed to read $key from state store.", e)
- None
- }
- }
-
- override def getChildren(key: String): Seq[String] = {
- try {
- fileContext.util.listStatus(absPath(key)).map(_.getPath.getName)
- } catch {
- case _: FileNotFoundException => Seq.empty
- case e: IOException =>
- warn(s"Failed to list $key from state store.", e)
- Seq.empty
- }
- }
-
- override def remove(key: String): Unit = {
- fileContext.delete(absPath(key), false)
- }
-
- private def absPath(key: String): Path = new Path(fsUri.getPath(), key)
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/recovery/SessionStore.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/server/recovery/SessionStore.scala b/server/src/main/scala/com/cloudera/livy/server/recovery/SessionStore.scala
deleted file mode 100644
index b7a178c..0000000
--- a/server/src/main/scala/com/cloudera/livy/server/recovery/SessionStore.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.server.recovery
-
-import java.io.IOException
-
-import scala.reflect.ClassTag
-import scala.util.{Failure, Success, Try}
-import scala.util.control.NonFatal
-
-import com.cloudera.livy.{LivyConf, Logging}
-import com.cloudera.livy.sessions.Session.RecoveryMetadata
-
-private[recovery] case class SessionManagerState(nextSessionId: Int)
-
-/**
- * SessionStore provides high level functions to get/save session state from/to StateStore.
- */
-class SessionStore(
- livyConf: LivyConf,
- store: => StateStore = StateStore.get) // For unit testing.
- extends Logging {
-
- private val STORE_VERSION: String = "v1"
-
- /**
- * Persist a session to the session state store.
- * @param m RecoveryMetadata for the session.
- */
- def save(sessionType: String, m: RecoveryMetadata): Unit = {
- store.set(sessionPath(sessionType, m.id), m)
- }
-
- def saveNextSessionId(sessionType: String, id: Int): Unit = {
- store.set(sessionManagerPath(sessionType), SessionManagerState(id))
- }
-
- /**
- * Return all sessions stored in the store with specified session type.
- */
- def getAllSessions[T <: RecoveryMetadata : ClassTag](sessionType: String): Seq[Try[T]] = {
- store.getChildren(sessionPath(sessionType))
- .flatMap { c => Try(c.toInt).toOption } // Ignore all non numerical keys
- .flatMap { id =>
- val p = sessionPath(sessionType, id)
- try {
- store.get[T](p).map(Success(_))
- } catch {
- case NonFatal(e) => Some(Failure(new IOException(s"Error getting session $p", e)))
- }
- }
- }
-
- /**
- * Return the next unused session id with specified session type.
- * If checks the SessionManagerState stored and returns the next free session id.
- * If no SessionManagerState is stored, it returns 0.
- *
- * @throws Exception If SessionManagerState stored is corrupted, it throws an error.
- */
- def getNextSessionId(sessionType: String): Int = {
- store.get[SessionManagerState](sessionManagerPath(sessionType))
- .map(_.nextSessionId).getOrElse(0)
- }
-
- /**
- * Remove a session from the state store.
- */
- def remove(sessionType: String, id: Int): Unit = {
- store.remove(sessionPath(sessionType, id))
- }
-
- private def sessionManagerPath(sessionType: String): String =
- s"$STORE_VERSION/$sessionType/state"
-
- private def sessionPath(sessionType: String): String =
- s"$STORE_VERSION/$sessionType"
-
- private def sessionPath(sessionType: String, id: Int): String =
- s"$STORE_VERSION/$sessionType/$id"
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/recovery/StateStore.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/server/recovery/StateStore.scala b/server/src/main/scala/com/cloudera/livy/server/recovery/StateStore.scala
deleted file mode 100644
index 25eb238..0000000
--- a/server/src/main/scala/com/cloudera/livy/server/recovery/StateStore.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.cloudera.livy.server.recovery
-
-import scala.reflect.{classTag, ClassTag}
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-
-import com.cloudera.livy.{LivyConf, Logging}
-import com.cloudera.livy.sessions.SessionKindModule
-import com.cloudera.livy.sessions.SessionManager._
-
-protected trait JsonMapper {
- protected val mapper = new ObjectMapper()
- .registerModule(DefaultScalaModule)
- .registerModule(new SessionKindModule())
-
- def serializeToBytes(value: Object): Array[Byte] = mapper.writeValueAsBytes(value)
-
- def deserialize[T: ClassTag](json: Array[Byte]): T =
- mapper.readValue(json, classTag[T].runtimeClass.asInstanceOf[Class[T]])
-}
-
-/**
- * Interface of a key-value pair storage for state storage.
- * It's responsible for de/serialization and retrieving/storing object.
- * It's the low level interface used by higher level classes like SessionStore.
- *
- * Hardcoded to use JSON serialization for now for easier ops. Will add better serialization later.
- */
-abstract class StateStore(livyConf: LivyConf) extends JsonMapper {
- /**
- * Set a key-value pair to this state store. It overwrites existing value.
- * @throws Exception Throw when persisting the state store fails.
- */
- def set(key: String, value: Object): Unit
-
- /**
- * Get a key-value pair from this state store.
- * @return Value if the key exists. None if the key doesn't exist.
- * @throws Exception Throw when deserialization of the stored value fails.
- */
- def get[T: ClassTag](key: String): Option[T]
-
- /**
- * Treat keys in this state store as a directory tree and
- * return names of the direct children of the key.
- * @return List of names of the direct children of the key.
- * Empty list if the key doesn't exist or have no child.
- */
- def getChildren(key: String): Seq[String]
-
- /**
- * Remove the key from this state store. Does not throw if the key doesn't exist.
- * @throws Exception Throw when persisting the state store fails.
- */
- def remove(key: String): Unit
-}
-
-/**
- * Factory to create the store chosen in LivyConf.
- */
-object StateStore extends Logging {
- private[this] var stateStore: Option[StateStore] = None
-
- def init(livyConf: LivyConf): Unit = synchronized {
- if (stateStore.isEmpty) {
- val fileStateStoreClassTag = pickStateStore(livyConf)
- stateStore = Option(fileStateStoreClassTag.getDeclaredConstructor(classOf[LivyConf])
- .newInstance(livyConf).asInstanceOf[StateStore])
- info(s"Using ${stateStore.get.getClass.getSimpleName} for recovery.")
- }
- }
-
- def cleanup(): Unit = synchronized {
- stateStore = None
- }
-
- def get: StateStore = {
- assert(stateStore.isDefined, "StateStore hasn't been initialized.")
- stateStore.get
- }
-
- private[recovery] def pickStateStore(livyConf: LivyConf): Class[_] = {
- livyConf.get(LivyConf.RECOVERY_MODE) match {
- case SESSION_RECOVERY_MODE_OFF => classOf[BlackholeStateStore]
- case SESSION_RECOVERY_MODE_RECOVERY =>
- livyConf.get(LivyConf.RECOVERY_STATE_STORE) match {
- case "filesystem" => classOf[FileSystemStateStore]
- case "zookeeper" => classOf[ZooKeeperStateStore]
- case ss => throw new IllegalArgumentException(s"Unsupported state store $ss")
- }
- case rm => throw new IllegalArgumentException(s"Unsupported recovery mode $rm")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStore.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStore.scala b/server/src/main/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStore.scala
deleted file mode 100644
index 753ea22..0000000
--- a/server/src/main/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStore.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.cloudera.livy.server.recovery
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-import scala.util.Try
-
-import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-import org.apache.curator.framework.api.UnhandledErrorListener
-import org.apache.curator.retry.RetryNTimes
-import org.apache.zookeeper.KeeperException.NoNodeException
-
-import com.cloudera.livy.{LivyConf, Logging}
-import com.cloudera.livy.LivyConf.Entry
-
-object ZooKeeperStateStore {
- val ZK_KEY_PREFIX_CONF = Entry("livy.server.recovery.zk-state-store.key-prefix", "livy")
- val ZK_RETRY_CONF = Entry("livy.server.recovery.zk-state-store.retry-policy", "5,100")
-}
-
-class ZooKeeperStateStore(
- livyConf: LivyConf,
- mockCuratorClient: Option[CuratorFramework] = None) // For testing
- extends StateStore(livyConf) with Logging {
-
- import ZooKeeperStateStore._
-
- // Constructor defined for StateStore factory to new this class using reflection.
- def this(livyConf: LivyConf) {
- this(livyConf, None)
- }
-
- private val zkAddress = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL)
- require(!zkAddress.isEmpty, s"Please config ${LivyConf.RECOVERY_STATE_STORE_URL.key}.")
- private val zkKeyPrefix = livyConf.get(ZK_KEY_PREFIX_CONF)
- private val retryValue = livyConf.get(ZK_RETRY_CONF)
- // a regex to match patterns like "m, n" where m and n both are integer values
- private val retryPattern = """\s*(\d+)\s*,\s*(\d+)\s*""".r
- private[recovery] val retryPolicy = retryValue match {
- case retryPattern(n, sleepMs) => new RetryNTimes(n.toInt, sleepMs.toInt)
- case _ => throw new IllegalArgumentException(
- s"$ZK_KEY_PREFIX_CONF contains bad value: $retryValue. " +
- "Correct format is <max retry count>,<sleep ms between retry>. e.g. 5,100")
- }
-
- private val curatorClient = mockCuratorClient.getOrElse {
- CuratorFrameworkFactory.newClient(zkAddress, retryPolicy)
- }
-
- Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
- override def run(): Unit = {
- curatorClient.close()
- }
- }))
-
- curatorClient.getUnhandledErrorListenable().addListener(new UnhandledErrorListener {
- def unhandledError(message: String, e: Throwable): Unit = {
- error(s"Fatal Zookeeper error. Shutting down Livy server.")
- System.exit(1)
- }
- })
- curatorClient.start()
- // TODO Make sure ZK path has proper secure permissions so that other users cannot read its
- // contents.
-
- override def set(key: String, value: Object): Unit = {
- val prefixedKey = prefixKey(key)
- val data = serializeToBytes(value)
- if (curatorClient.checkExists().forPath(prefixedKey) == null) {
- curatorClient.create().creatingParentsIfNeeded().forPath(prefixedKey, data)
- } else {
- curatorClient.setData().forPath(prefixedKey, data)
- }
- }
-
- override def get[T: ClassTag](key: String): Option[T] = {
- val prefixedKey = prefixKey(key)
- if (curatorClient.checkExists().forPath(prefixedKey) == null) {
- None
- } else {
- Option(deserialize[T](curatorClient.getData().forPath(prefixedKey)))
- }
- }
-
- override def getChildren(key: String): Seq[String] = {
- val prefixedKey = prefixKey(key)
- if (curatorClient.checkExists().forPath(prefixedKey) == null) {
- Seq.empty[String]
- } else {
- curatorClient.getChildren.forPath(prefixedKey).asScala
- }
- }
-
- override def remove(key: String): Unit = {
- try {
- curatorClient.delete().guaranteed().forPath(prefixKey(key))
- } catch {
- case _: NoNodeException =>
- }
- }
-
- private def prefixKey(key: String) = s"/$zkKeyPrefix/$key"
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/server/ui/UIServlet.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/server/ui/UIServlet.scala b/server/src/main/scala/com/cloudera/livy/server/ui/UIServlet.scala
deleted file mode 100644
index 26c5e26..0000000
--- a/server/src/main/scala/com/cloudera/livy/server/ui/UIServlet.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.server.ui
-
-import scala.xml.Node
-
-import org.scalatra.ScalatraServlet
-
-import com.cloudera.livy.LivyConf
-
-class UIServlet extends ScalatraServlet {
- before() { contentType = "text/html" }
-
- def getHeader(title: String): Seq[Node] =
- <head>
- <link rel="stylesheet" href="/static/bootstrap.min.css" type="text/css"/>
- <link rel="stylesheet" href="/static/dataTables.bootstrap.min.css" type="text/css"/>
- <link rel="stylesheet" href="/static/livy-ui.css" type="text/css"/>
- <script src="/static/jquery-3.2.1.min.js"></script>
- <script src="/static/bootstrap.min.js"></script>
- <script src="/static/jquery.dataTables.min.js"></script>
- <script src="/static/dataTables.bootstrap.min.js"></script>
- <script src="/static/all-sessions.js"></script>
- <title>{title}</title>
- </head>
-
- def navBar(pageName: String): Seq[Node] =
- <nav class="navbar navbar-default">
- <div class="container-fluid">
- <div class="navbar-header">
- <a class="navbar-brand" href="#">
- <img alt="Livy" src="/static/livy-mini-logo.png"/>
- </a>
- </div>
- <div class="collapse navbar-collapse">
- <ul class="nav navbar-nav">
- <li><a href="#">{pageName}</a></li>
- </ul>
- </div>
- </div>
- </nav>
-
- def createPage(pageName: String, pageContents: Seq[Node]): Seq[Node] =
- <html>
- {getHeader("Livy - " + pageName)}
- <body>
- <div class="container">
- {navBar(pageName)}
- {pageContents}
- </div>
- </body>
- </html>
-
- get("/") {
- val content =
- <div id="all-sessions">
- <div id="interactive-sessions"></div>
- <div id="batches"></div>
- </div>
-
- createPage("Sessions", content)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/sessions/Session.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/sessions/Session.scala b/server/src/main/scala/com/cloudera/livy/sessions/Session.scala
deleted file mode 100644
index af9ae73..0000000
--- a/server/src/main/scala/com/cloudera/livy/sessions/Session.scala
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.sessions
-
-import java.io.InputStream
-import java.net.{URI, URISyntaxException}
-import java.security.PrivilegedExceptionAction
-import java.util.UUID
-import java.util.concurrent.TimeUnit
-
-import scala.concurrent.{ExecutionContext, Future}
-
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.fs.permission.FsPermission
-import org.apache.hadoop.security.UserGroupInformation
-
-import com.cloudera.livy.{LivyConf, Logging, Utils}
-import com.cloudera.livy.utils.AppInfo
-
-object Session {
- trait RecoveryMetadata { val id: Int }
-
- lazy val configBlackList: Set[String] = {
- val url = getClass.getResource("/spark-blacklist.conf")
- if (url != null) Utils.loadProperties(url).keySet else Set()
- }
-
- /**
- * Validates and prepares a user-provided configuration for submission.
- *
- * - Verifies that no blacklisted configurations are provided.
- * - Merges file lists in the configuration with the explicit lists provided in the request
- * - Resolve file URIs to make sure they reference the default FS
- * - Verify that file URIs don't reference non-whitelisted local resources
- */
- def prepareConf(
- conf: Map[String, String],
- jars: Seq[String],
- files: Seq[String],
- archives: Seq[String],
- pyFiles: Seq[String],
- livyConf: LivyConf): Map[String, String] = {
- if (conf == null) {
- return Map()
- }
-
- val errors = conf.keySet.filter(configBlackList.contains)
- if (errors.nonEmpty) {
- throw new IllegalArgumentException(
- "Blacklisted configuration values in session config: " + errors.mkString(", "))
- }
-
- val confLists: Map[String, Seq[String]] = livyConf.sparkFileLists
- .map { key => (key -> Nil) }.toMap
-
- val userLists = confLists ++ Map(
- LivyConf.SPARK_JARS -> jars,
- LivyConf.SPARK_FILES -> files,
- LivyConf.SPARK_ARCHIVES -> archives,
- LivyConf.SPARK_PY_FILES -> pyFiles)
-
- val merged = userLists.flatMap { case (key, list) =>
- val confList = conf.get(key)
- .map { list =>
- resolveURIs(list.split("[, ]+").toSeq, livyConf)
- }
- .getOrElse(Nil)
- val userList = resolveURIs(list, livyConf)
- if (confList.nonEmpty || userList.nonEmpty) {
- Some(key -> (userList ++ confList).mkString(","))
- } else {
- None
- }
- }
-
- val masterConfList = Map(LivyConf.SPARK_MASTER -> livyConf.sparkMaster()) ++
- livyConf.sparkDeployMode().map(LivyConf.SPARK_DEPLOY_MODE -> _).toMap
-
- conf ++ masterConfList ++ merged
- }
-
- /**
- * Prepends the value of the "fs.defaultFS" configuration to any URIs that do not have a
- * scheme. URIs are required to at least be absolute paths.
- *
- * @throws IllegalArgumentException If an invalid URI is found in the given list.
- */
- def resolveURIs(uris: Seq[String], livyConf: LivyConf): Seq[String] = {
- val defaultFS = livyConf.hadoopConf.get("fs.defaultFS").stripSuffix("/")
- uris.filter(_.nonEmpty).map { _uri =>
- val uri = try {
- new URI(_uri)
- } catch {
- case e: URISyntaxException => throw new IllegalArgumentException(e)
- }
- resolveURI(uri, livyConf).toString()
- }
- }
-
- def resolveURI(uri: URI, livyConf: LivyConf): URI = {
- val defaultFS = livyConf.hadoopConf.get("fs.defaultFS").stripSuffix("/")
- val resolved =
- if (uri.getScheme() == null) {
- require(uri.getPath().startsWith("/"), s"Path '${uri.getPath()}' is not absolute.")
- new URI(defaultFS + uri.getPath())
- } else {
- uri
- }
-
- if (resolved.getScheme() == "file") {
- // Make sure the location is whitelisted before allowing local files to be added.
- require(livyConf.localFsWhitelist.find(resolved.getPath().startsWith).isDefined,
- s"Local path ${uri.getPath()} cannot be added to user sessions.")
- }
-
- resolved
- }
-}
-
-abstract class Session(val id: Int, val owner: String, val livyConf: LivyConf)
- extends Logging {
-
- import Session._
-
- protected implicit val executionContext = ExecutionContext.global
-
- protected var _appId: Option[String] = None
-
- private var _lastActivity = System.nanoTime()
-
- // Directory where the session's staging files are created. The directory is only accessible
- // to the session's effective user.
- private var stagingDir: Path = null
-
- def appId: Option[String] = _appId
-
- var appInfo: AppInfo = AppInfo()
-
- def lastActivity: Long = state match {
- case SessionState.Error(time) => time
- case SessionState.Dead(time) => time
- case SessionState.Success(time) => time
- case _ => _lastActivity
- }
-
- def logLines(): IndexedSeq[String]
-
- def recordActivity(): Unit = {
- _lastActivity = System.nanoTime()
- }
-
- def recoveryMetadata: RecoveryMetadata
-
- def state: SessionState
-
- def stop(): Future[Unit] = Future {
- try {
- info(s"Stopping $this...")
- stopSession()
- info(s"Stopped $this.")
- } catch {
- case e: Exception =>
- warn(s"Error stopping session $id.", e)
- }
-
- try {
- if (stagingDir != null) {
- debug(s"Deleting session $id staging directory $stagingDir")
- doAsOwner {
- val fs = FileSystem.newInstance(livyConf.hadoopConf)
- try {
- fs.delete(stagingDir, true)
- } finally {
- fs.close()
- }
- }
- }
- } catch {
- case e: Exception =>
- warn(s"Error cleaning up session $id staging dir.", e)
- }
- }
-
-
- override def toString(): String = s"${this.getClass.getSimpleName} $id"
-
- protected def stopSession(): Unit
-
- protected val proxyUser: Option[String]
-
- protected def doAsOwner[T](fn: => T): T = {
- val user = proxyUser.getOrElse(owner)
- if (user != null) {
- val ugi = if (UserGroupInformation.isSecurityEnabled) {
- if (livyConf.getBoolean(LivyConf.IMPERSONATION_ENABLED)) {
- UserGroupInformation.createProxyUser(user, UserGroupInformation.getCurrentUser())
- } else {
- UserGroupInformation.getCurrentUser()
- }
- } else {
- UserGroupInformation.createRemoteUser(user)
- }
- ugi.doAs(new PrivilegedExceptionAction[T] {
- override def run(): T = fn
- })
- } else {
- fn
- }
- }
-
- protected def copyResourceToHDFS(dataStream: InputStream, name: String): URI = doAsOwner {
- val fs = FileSystem.newInstance(livyConf.hadoopConf)
-
- try {
- val filePath = new Path(getStagingDir(fs), name)
- debug(s"Uploading user file to $filePath")
-
- val outFile = fs.create(filePath, true)
- val buffer = new Array[Byte](512 * 1024)
- var read = -1
- try {
- while ({read = dataStream.read(buffer); read != -1}) {
- outFile.write(buffer, 0, read)
- }
- } finally {
- outFile.close()
- }
- filePath.toUri
- } finally {
- fs.close()
- }
- }
-
- private def getStagingDir(fs: FileSystem): Path = synchronized {
- if (stagingDir == null) {
- val stagingRoot = Option(livyConf.get(LivyConf.SESSION_STAGING_DIR)).getOrElse {
- new Path(fs.getHomeDirectory(), ".livy-sessions").toString()
- }
-
- val sessionDir = new Path(stagingRoot, UUID.randomUUID().toString())
- fs.mkdirs(sessionDir)
- fs.setPermission(sessionDir, new FsPermission("700"))
- stagingDir = sessionDir
- debug(s"Session $id staging directory is $stagingDir")
- }
- stagingDir
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala b/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala
deleted file mode 100644
index 93f162e..0000000
--- a/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.sessions
-
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.mutable
-import scala.concurrent.{Await, ExecutionContext, Future}
-import scala.concurrent.duration.Duration
-import scala.reflect.ClassTag
-import scala.util.control.NonFatal
-
-import com.cloudera.livy.{LivyConf, Logging}
-import com.cloudera.livy.server.batch.{BatchRecoveryMetadata, BatchSession}
-import com.cloudera.livy.server.interactive.{InteractiveRecoveryMetadata, InteractiveSession, SessionHeartbeatWatchdog}
-import com.cloudera.livy.server.recovery.SessionStore
-import com.cloudera.livy.sessions.Session.RecoveryMetadata
-
-object SessionManager {
- val SESSION_RECOVERY_MODE_OFF = "off"
- val SESSION_RECOVERY_MODE_RECOVERY = "recovery"
-}
-
-class BatchSessionManager(
- livyConf: LivyConf,
- sessionStore: SessionStore,
- mockSessions: Option[Seq[BatchSession]] = None)
- extends SessionManager[BatchSession, BatchRecoveryMetadata] (
- livyConf, BatchSession.recover(_, livyConf, sessionStore), sessionStore, "batch", mockSessions)
-
-class InteractiveSessionManager(
- livyConf: LivyConf,
- sessionStore: SessionStore,
- mockSessions: Option[Seq[InteractiveSession]] = None)
- extends SessionManager[InteractiveSession, InteractiveRecoveryMetadata] (
- livyConf,
- InteractiveSession.recover(_, livyConf, sessionStore),
- sessionStore,
- "interactive",
- mockSessions)
- with SessionHeartbeatWatchdog[InteractiveSession, InteractiveRecoveryMetadata]
- {
- start()
- }
-
-class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
- protected val livyConf: LivyConf,
- sessionRecovery: R => S,
- sessionStore: SessionStore,
- sessionType: String,
- mockSessions: Option[Seq[S]] = None)
- extends Logging {
-
- import SessionManager._
-
- protected implicit def executor: ExecutionContext = ExecutionContext.global
-
- protected[this] final val idCounter = new AtomicInteger(0)
- protected[this] final val sessions = mutable.LinkedHashMap[Int, S]()
-
- private[this] final val sessionTimeoutCheck = livyConf.getBoolean(LivyConf.SESSION_TIMEOUT_CHECK)
- private[this] final val sessionTimeout =
- TimeUnit.MILLISECONDS.toNanos(livyConf.getTimeAsMs(LivyConf.SESSION_TIMEOUT))
- private[this] final val sessionStateRetainedInSec =
- TimeUnit.MILLISECONDS.toNanos(livyConf.getTimeAsMs(LivyConf.SESSION_STATE_RETAIN_TIME))
-
- mockSessions.getOrElse(recover()).foreach(register)
- new GarbageCollector().start()
-
- def nextId(): Int = synchronized {
- val id = idCounter.getAndIncrement()
- sessionStore.saveNextSessionId(sessionType, idCounter.get())
- id
- }
-
- def register(session: S): S = {
- info(s"Registering new session ${session.id}")
- synchronized {
- sessions.put(session.id, session)
- }
- session
- }
-
- def get(id: Int): Option[S] = sessions.get(id)
-
- def size(): Int = sessions.size
-
- def all(): Iterable[S] = sessions.values
-
- def delete(id: Int): Option[Future[Unit]] = {
- get(id).map(delete)
- }
-
- def delete(session: S): Future[Unit] = {
- session.stop().map { case _ =>
- try {
- sessionStore.remove(sessionType, session.id)
- synchronized {
- sessions.remove(session.id)
- }
- } catch {
- case NonFatal(e) =>
- error("Exception was thrown during stop session:", e)
- throw e
- }
- }
- }
-
- def shutdown(): Unit = {
- val recoveryEnabled = livyConf.get(LivyConf.RECOVERY_MODE) != SESSION_RECOVERY_MODE_OFF
- if (!recoveryEnabled) {
- sessions.values.map(_.stop).foreach { future =>
- Await.ready(future, Duration.Inf)
- }
- }
- }
-
- def collectGarbage(): Future[Iterable[Unit]] = {
- def expired(session: Session): Boolean = {
- session.state match {
- case s: FinishedSessionState =>
- val currentTime = System.nanoTime()
- currentTime - s.time > sessionStateRetainedInSec
- case _ =>
- if (!sessionTimeoutCheck) {
- false
- } else if (session.isInstanceOf[BatchSession]) {
- false
- } else {
- val currentTime = System.nanoTime()
- currentTime - session.lastActivity > sessionTimeout
- }
- }
- }
-
- Future.sequence(all().filter(expired).map(delete))
- }
-
- private def recover(): Seq[S] = {
- // Recover next session id from state store and create SessionManager.
- idCounter.set(sessionStore.getNextSessionId(sessionType))
-
- // Retrieve session recovery metadata from state store.
- val sessionMetadata = sessionStore.getAllSessions[R](sessionType)
-
- // Recover session from session recovery metadata.
- val recoveredSessions = sessionMetadata.flatMap(_.toOption).map(sessionRecovery)
-
- info(s"Recovered ${recoveredSessions.length} $sessionType sessions." +
- s" Next session id: $idCounter")
-
- // Print recovery error.
- val recoveryFailure = sessionMetadata.filter(_.isFailure).map(_.failed.get)
- recoveryFailure.foreach(ex => error(ex.getMessage, ex.getCause))
-
- recoveredSessions
- }
-
- private class GarbageCollector extends Thread("session gc thread") {
-
- setDaemon(true)
-
- override def run(): Unit = {
- while (true) {
- collectGarbage()
- Thread.sleep(60 * 1000)
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/utils/Clock.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/utils/Clock.scala b/server/src/main/scala/com/cloudera/livy/utils/Clock.scala
deleted file mode 100644
index 76ad9d4..0000000
--- a/server/src/main/scala/com/cloudera/livy/utils/Clock.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.cloudera.livy.utils
-
-/**
- * A lot of Livy code relies on time related functions like Thread.sleep.
- * To timing effects from unit test, this class is created to mock out time.
- *
- * Code in Livy should not call Thread.sleep() directly. It should call this class instead.
- */
-object Clock {
- private var _sleep: Long => Unit = Thread.sleep
-
- def withSleepMethod(mockSleep: Long => Unit)(f: => Unit): Unit = {
- try {
- _sleep = mockSleep
- f
- } finally {
- _sleep = Thread.sleep
- }
- }
-
- def sleep: Long => Unit = _sleep
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/utils/LineBufferedProcess.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/utils/LineBufferedProcess.scala b/server/src/main/scala/com/cloudera/livy/utils/LineBufferedProcess.scala
deleted file mode 100644
index a07ab4b..0000000
--- a/server/src/main/scala/com/cloudera/livy/utils/LineBufferedProcess.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.util
-
-import com.cloudera.livy.{Logging, Utils}
-
-class LineBufferedProcess(process: Process) extends Logging {
-
- private[this] val _inputStream = new LineBufferedStream(process.getInputStream)
- private[this] val _errorStream = new LineBufferedStream(process.getErrorStream)
-
- def inputLines: IndexedSeq[String] = _inputStream.lines
- def errorLines: IndexedSeq[String] = _errorStream.lines
-
- def inputIterator: Iterator[String] = _inputStream.iterator
- def errorIterator: Iterator[String] = _errorStream.iterator
-
- def destroy(): Unit = {
- process.destroy()
- }
-
- /** Returns if the process is still actively running. */
- def isAlive: Boolean = Utils.isProcessAlive(process)
-
- def exitValue(): Int = {
- process.exitValue()
- }
-
- def waitFor(): Int = {
- val returnCode = process.waitFor()
- _inputStream.waitUntilClose()
- _errorStream.waitUntilClose()
- returnCode
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala b/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala
deleted file mode 100644
index 5f79ca1..0000000
--- a/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.util
-
-import java.io.InputStream
-import java.util.concurrent.locks.ReentrantLock
-
-import scala.io.Source
-
-import com.cloudera.livy.Logging
-
-class LineBufferedStream(inputStream: InputStream) extends Logging {
-
- private[this] var _lines: IndexedSeq[String] = IndexedSeq()
-
- private[this] val _lock = new ReentrantLock()
- private[this] val _condition = _lock.newCondition()
- private[this] var _finished = false
-
- private val thread = new Thread {
- override def run() = {
- val lines = Source.fromInputStream(inputStream).getLines()
- for (line <- lines) {
- _lock.lock()
- try {
- _lines = _lines :+ line
- _condition.signalAll()
- } finally {
- _lock.unlock()
- }
- }
-
- _lines.map { line => info("stdout: ", line) }
- _lock.lock()
- try {
- _finished = true
- _condition.signalAll()
- } finally {
- _lock.unlock()
- }
- }
- }
- thread.setDaemon(true)
- thread.start()
-
- def lines: IndexedSeq[String] = _lines
-
- def iterator: Iterator[String] = {
- new LinesIterator
- }
-
- def waitUntilClose(): Unit = thread.join()
-
- private class LinesIterator extends Iterator[String] {
- private[this] var index = 0
-
- override def hasNext: Boolean = {
- if (index < _lines.length) {
- true
- } else {
- // Otherwise we might still have more data.
- _lock.lock()
- try {
- if (_finished) {
- false
- } else {
- _condition.await()
- index < _lines.length
- }
- } finally {
- _lock.unlock()
- }
- }
- }
-
- override def next(): String = {
- val line = _lines(index)
- index += 1
- line
- }
- }
-}