You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/06/27 06:39:02 UTC
[21/50] [abbrv] incubator-livy git commit: LIVY-240. Improve the
session gc mechanism. (#277)
LIVY-240. Improve the session gc mechanism. (#277)
Current session gc mechanism has some issues:
- Stopped session still needs to wait to timeout to gc-ed.
-Batch session will be gc-ed unexpectedly in run-time when timing out, which makes long running application impossible.
- Sometimes user doesn't want to stop idle sessions.
Changes of this commit:
- Never check the activity of batch session, which means batch session will only be gc-ed after stop.
- Add a configuration to turn off activity check for interactive session, which meets some usage scenarios.
- Add a configuration to control how long a finished session state will be kept in memory before cleaned out.
Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/bfdb5a19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/bfdb5a19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/bfdb5a19
Branch: refs/heads/master
Commit: bfdb5a1925327758b054d8f67191e2bf4bc2c2bb
Parents: 6bfe177
Author: Saisai Shao <sa...@gmail.com>
Authored: Thu Feb 23 06:05:54 2017 +0800
Committer: Alex Man <tc...@gmail.com>
Committed: Wed Feb 22 14:05:54 2017 -0800
----------------------------------------------------------------------
conf/livy.conf | 6 ++
.../cloudera/livy/sessions/SessionState.scala | 11 +++-
.../main/scala/com/cloudera/livy/LivyConf.scala | 8 +++
.../cloudera/livy/sessions/SessionManager.scala | 20 ++++++-
.../livy/sessions/SessionManagerSpec.scala | 58 +++++++++++++++++++-
5 files changed, 97 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/bfdb5a19/conf/livy.conf
----------------------------------------------------------------------
diff --git a/conf/livy.conf b/conf/livy.conf
index a238f37..c23aab2 100644
--- a/conf/livy.conf
+++ b/conf/livy.conf
@@ -16,8 +16,14 @@
# What spark deploy mode Livy sessions should use.
# livy.spark.deployMode =
+# Enabled to check whether timeout Livy sessions should be stopped.
+# livy.server.session.timeout-check = true
+
# Time in milliseconds on how long Livy will wait before timing out an idle session.
# livy.server.session.timeout = 1h
+#
+# How long a finished session state should be kept in LivyServer for query.
+# livy.server.session.state-retain.sec = 600s
# If livy should impersonate the requesting users when creating a new session.
# livy.impersonation.enabled = true
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/bfdb5a19/core/src/main/scala/com/cloudera/livy/sessions/SessionState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/com/cloudera/livy/sessions/SessionState.scala b/core/src/main/scala/com/cloudera/livy/sessions/SessionState.scala
index 975d592..ac4bf9b 100644
--- a/core/src/main/scala/com/cloudera/livy/sessions/SessionState.scala
+++ b/core/src/main/scala/com/cloudera/livy/sessions/SessionState.scala
@@ -23,6 +23,11 @@ sealed trait SessionState {
def isActive: Boolean
}
+sealed trait FinishedSessionState extends SessionState {
+ /** When session is finished. */
+ def time: Long
+}
+
object SessionState {
def apply(s: String): SessionState = {
@@ -83,19 +88,19 @@ object SessionState {
override def toString: String = "shutting_down"
}
- case class Error(time: Long = System.nanoTime()) extends SessionState {
+ case class Error(time: Long = System.nanoTime()) extends FinishedSessionState {
override def isActive: Boolean = true
override def toString: String = "error"
}
- case class Dead(time: Long = System.nanoTime()) extends SessionState {
+ case class Dead(time: Long = System.nanoTime()) extends FinishedSessionState {
override def isActive: Boolean = false
override def toString: String = "dead"
}
- case class Success(time: Long = System.nanoTime()) extends SessionState {
+ case class Success(time: Long = System.nanoTime()) extends FinishedSessionState {
override def isActive: Boolean = false
override def toString: String = "success"
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/bfdb5a19/server/src/main/scala/com/cloudera/livy/LivyConf.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/LivyConf.scala b/server/src/main/scala/com/cloudera/livy/LivyConf.scala
index ab0e52b..6562b03 100644
--- a/server/src/main/scala/com/cloudera/livy/LivyConf.scala
+++ b/server/src/main/scala/com/cloudera/livy/LivyConf.scala
@@ -124,6 +124,14 @@ object LivyConf {
// how often to check livy session leakage
val YARN_APP_LEAKAGE_CHECK_INTERVAL = Entry("livy.server.yarn.app-leakage.check_interval", "60s")
+ // Whether session timeout should be checked, by default it will be checked, which means inactive
+ // session will be stopped after "livy.server.session.timeout"
+ val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true)
+ // How long will an inactive session be gc-ed.
+ val SESSION_TIMEOUT = Entry("livy.server.session.timeout", "1h")
+ // How long a finished session state will be kept in memory
+ val SESSION_STATE_RETAIN_TIME = Entry("livy.server.session.state-retain.sec", "600s")
+
val SPARK_MASTER = "spark.master"
val SPARK_DEPLOY_MODE = "spark.submit.deployMode"
val SPARK_JARS = "spark.jars"
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/bfdb5a19/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala b/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala
index a50bd41..e177108 100644
--- a/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala
+++ b/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala
@@ -36,7 +36,6 @@ import com.cloudera.livy.sessions.Session.RecoveryMetadata
object SessionManager {
val SESSION_RECOVERY_MODE_OFF = "off"
val SESSION_RECOVERY_MODE_RECOVERY = "recovery"
- val SESSION_TIMEOUT = LivyConf.Entry("livy.server.session.timeout", "1h")
}
class BatchSessionManager(
@@ -76,8 +75,11 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
protected[this] final val idCounter = new AtomicInteger(0)
protected[this] final val sessions = mutable.LinkedHashMap[Int, S]()
+ private[this] final val sessionTimeoutCheck = livyConf.getBoolean(LivyConf.SESSION_TIMEOUT_CHECK)
private[this] final val sessionTimeout =
- TimeUnit.MILLISECONDS.toNanos(livyConf.getTimeAsMs(SessionManager.SESSION_TIMEOUT))
+ TimeUnit.MILLISECONDS.toNanos(livyConf.getTimeAsMs(LivyConf.SESSION_TIMEOUT))
+ private[this] final val sessionStateRetainedInSec =
+ TimeUnit.MILLISECONDS.toNanos(livyConf.getTimeAsMs(LivyConf.SESSION_STATE_RETAIN_TIME))
mockSessions.getOrElse(recover()).foreach(register)
new GarbageCollector().start()
@@ -134,6 +136,20 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
def expired(session: Session): Boolean = {
val currentTime = System.nanoTime()
currentTime - session.lastActivity > sessionTimeout
+ session.state match {
+ case s: FinishedSessionState =>
+ val currentTime = System.nanoTime()
+ currentTime - s.time > sessionStateRetainedInSec
+ case _ =>
+ if (!sessionTimeoutCheck) {
+ false
+ } else if (session.isInstanceOf[BatchSession]) {
+ false
+ } else {
+ val currentTime = System.nanoTime()
+ currentTime - session.lastActivity > sessionTimeout
+ }
+ }
}
Future.sequence(all().filter(expired).map(delete))
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/bfdb5a19/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala
index 8195bf8..241ba27 100644
--- a/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala
+++ b/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala
@@ -30,14 +30,17 @@ import org.scalatest.mock.MockitoSugar.mock
import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf}
import com.cloudera.livy.server.batch.{BatchRecoveryMetadata, BatchSession}
+import com.cloudera.livy.server.interactive.InteractiveSession
import com.cloudera.livy.server.recovery.SessionStore
import com.cloudera.livy.sessions.Session.RecoveryMetadata
class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuite {
+ implicit def executor: ExecutionContext = ExecutionContext.global
+
describe("SessionManager") {
it("should garbage collect old sessions") {
val livyConf = new LivyConf()
- livyConf.set(SessionManager.SESSION_TIMEOUT, "100ms")
+ livyConf.set(LivyConf.SESSION_TIMEOUT, "100ms")
val manager = new SessionManager[MockSession, RecoveryMetadata](
livyConf,
{ _ => assert(false).asInstanceOf[MockSession] },
@@ -51,6 +54,59 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit
manager.get(session.id) should be(None)
}
}
+
+ it("batch session should not be gc-ed until application is finished") {
+ val sessionId = 24
+ val session = mock[BatchSession]
+ when(session.id).thenReturn(sessionId)
+ when(session.stop()).thenReturn(Future {})
+ when(session.lastActivity).thenReturn(System.nanoTime())
+
+ val conf = new LivyConf().set(LivyConf.SESSION_STATE_RETAIN_TIME, "1s")
+ val sm = new BatchSessionManager(conf, mock[SessionStore], Some(Seq(session)))
+ testSessionGC(session, sm)
+ }
+
+ it("interactive session should not gc-ed if session timeout check is off") {
+ val sessionId = 24
+ val session = mock[InteractiveSession]
+ when(session.id).thenReturn(sessionId)
+ when(session.stop()).thenReturn(Future {})
+ when(session.lastActivity).thenReturn(System.nanoTime())
+
+ val conf = new LivyConf().set(LivyConf.SESSION_TIMEOUT_CHECK, false)
+ .set(LivyConf.SESSION_STATE_RETAIN_TIME, "1s")
+ val sm = new InteractiveSessionManager(conf, mock[SessionStore], Some(Seq(session)))
+ testSessionGC(session, sm)
+ }
+
+ def testSessionGC(session: Session, sm: SessionManager[_, _]): Unit = {
+
+ def changeStateAndCheck(s: SessionState)(fn: SessionManager[_, _] => Unit): Unit = {
+ when(session.state).thenReturn(s)
+ Await.result(sm.collectGarbage(), Duration.Inf)
+ fn(sm)
+ }
+
+ // Batch session should not be gc-ed when alive
+ for (s <- Seq(SessionState.Running(),
+ SessionState.Idle(),
+ SessionState.Recovering(),
+ SessionState.NotStarted(),
+ SessionState.Busy(),
+ SessionState.ShuttingDown())) {
+ changeStateAndCheck(s) { sm => sm.get(session.id) should be (Some(session)) }
+ }
+
+ // Stopped session should be gc-ed after retained timeout
+ for (s <- Seq(SessionState.Error(),
+ SessionState.Success(),
+ SessionState.Dead())) {
+ eventually(timeout(30 seconds), interval(100 millis)) {
+ changeStateAndCheck(s) { sm => sm.get(session.id) should be (None) }
+ }
+ }
+ }
}
describe("BatchSessionManager") {