You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/06/27 06:39:02 UTC

[21/50] [abbrv] incubator-livy git commit: LIVY-240. Improve the session gc mechanism. (#277)

LIVY-240. Improve the session gc mechanism. (#277)

Current session gc mechanism has some issues:
- Stopped session still needs to wait to timeout to gc-ed.
-Batch session will be gc-ed unexpectedly in run-time when timing out, which makes long running application impossible.
- Sometimes user doesn't want to stop idle sessions.

Changes of this commit:
- Never check the activity of batch session, which means batch session will only be gc-ed after stop.
- Add a configuration to turn off activity check for interactive session, which meets some usage scenarios.
- Add a configuration to control how long a finished session state will be kept in memory before cleaned out.

Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/bfdb5a19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/bfdb5a19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/bfdb5a19

Branch: refs/heads/master
Commit: bfdb5a1925327758b054d8f67191e2bf4bc2c2bb
Parents: 6bfe177
Author: Saisai Shao <sa...@gmail.com>
Authored: Thu Feb 23 06:05:54 2017 +0800
Committer: Alex Man <tc...@gmail.com>
Committed: Wed Feb 22 14:05:54 2017 -0800

----------------------------------------------------------------------
 conf/livy.conf                                  |  6 ++
 .../cloudera/livy/sessions/SessionState.scala   | 11 +++-
 .../main/scala/com/cloudera/livy/LivyConf.scala |  8 +++
 .../cloudera/livy/sessions/SessionManager.scala | 20 ++++++-
 .../livy/sessions/SessionManagerSpec.scala      | 58 +++++++++++++++++++-
 5 files changed, 97 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/bfdb5a19/conf/livy.conf
----------------------------------------------------------------------
diff --git a/conf/livy.conf b/conf/livy.conf
index a238f37..c23aab2 100644
--- a/conf/livy.conf
+++ b/conf/livy.conf
@@ -16,8 +16,14 @@
 # What spark deploy mode Livy sessions should use.
 # livy.spark.deployMode =
 
+# Enabled to check whether timeout Livy sessions should be stopped.
+# livy.server.session.timeout-check = true
+
 # Time in milliseconds on how long Livy will wait before timing out an idle session.
 # livy.server.session.timeout = 1h
+#
+# How long a finished session state should be kept in LivyServer for query.
+# livy.server.session.state-retain.sec = 600s
 
 # If livy should impersonate the requesting users when creating a new session.
 # livy.impersonation.enabled = true

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/bfdb5a19/core/src/main/scala/com/cloudera/livy/sessions/SessionState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/com/cloudera/livy/sessions/SessionState.scala b/core/src/main/scala/com/cloudera/livy/sessions/SessionState.scala
index 975d592..ac4bf9b 100644
--- a/core/src/main/scala/com/cloudera/livy/sessions/SessionState.scala
+++ b/core/src/main/scala/com/cloudera/livy/sessions/SessionState.scala
@@ -23,6 +23,11 @@ sealed trait SessionState {
   def isActive: Boolean
 }
 
+sealed trait FinishedSessionState extends SessionState {
+  /** When session is finished. */
+  def time: Long
+}
+
 object SessionState {
 
   def apply(s: String): SessionState = {
@@ -83,19 +88,19 @@ object SessionState {
     override def toString: String = "shutting_down"
   }
 
-  case class Error(time: Long = System.nanoTime()) extends SessionState {
+  case class Error(time: Long = System.nanoTime()) extends FinishedSessionState {
     override def isActive: Boolean = true
 
     override def toString: String = "error"
   }
 
-  case class Dead(time: Long = System.nanoTime()) extends SessionState {
+  case class Dead(time: Long = System.nanoTime()) extends FinishedSessionState {
     override def isActive: Boolean = false
 
     override def toString: String = "dead"
   }
 
-  case class Success(time: Long = System.nanoTime()) extends SessionState {
+  case class Success(time: Long = System.nanoTime()) extends FinishedSessionState {
     override def isActive: Boolean = false
 
     override def toString: String = "success"

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/bfdb5a19/server/src/main/scala/com/cloudera/livy/LivyConf.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/LivyConf.scala b/server/src/main/scala/com/cloudera/livy/LivyConf.scala
index ab0e52b..6562b03 100644
--- a/server/src/main/scala/com/cloudera/livy/LivyConf.scala
+++ b/server/src/main/scala/com/cloudera/livy/LivyConf.scala
@@ -124,6 +124,14 @@ object LivyConf {
   // how often to check livy session leakage
   val YARN_APP_LEAKAGE_CHECK_INTERVAL = Entry("livy.server.yarn.app-leakage.check_interval", "60s")
 
+  // Whether session timeout should be checked, by default it will be checked, which means inactive
+  // session will be stopped after "livy.server.session.timeout"
+  val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true)
+  // How long will an inactive session be gc-ed.
+  val SESSION_TIMEOUT = Entry("livy.server.session.timeout", "1h")
+  // How long a finished session state will be kept in memory
+  val SESSION_STATE_RETAIN_TIME = Entry("livy.server.session.state-retain.sec", "600s")
+
   val SPARK_MASTER = "spark.master"
   val SPARK_DEPLOY_MODE = "spark.submit.deployMode"
   val SPARK_JARS = "spark.jars"

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/bfdb5a19/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala b/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala
index a50bd41..e177108 100644
--- a/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala
+++ b/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala
@@ -36,7 +36,6 @@ import com.cloudera.livy.sessions.Session.RecoveryMetadata
 object SessionManager {
   val SESSION_RECOVERY_MODE_OFF = "off"
   val SESSION_RECOVERY_MODE_RECOVERY = "recovery"
-  val SESSION_TIMEOUT = LivyConf.Entry("livy.server.session.timeout", "1h")
 }
 
 class BatchSessionManager(
@@ -76,8 +75,11 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
   protected[this] final val idCounter = new AtomicInteger(0)
   protected[this] final val sessions = mutable.LinkedHashMap[Int, S]()
 
+  private[this] final val sessionTimeoutCheck = livyConf.getBoolean(LivyConf.SESSION_TIMEOUT_CHECK)
   private[this] final val sessionTimeout =
-    TimeUnit.MILLISECONDS.toNanos(livyConf.getTimeAsMs(SessionManager.SESSION_TIMEOUT))
+    TimeUnit.MILLISECONDS.toNanos(livyConf.getTimeAsMs(LivyConf.SESSION_TIMEOUT))
+  private[this] final val sessionStateRetainedInSec =
+    TimeUnit.MILLISECONDS.toNanos(livyConf.getTimeAsMs(LivyConf.SESSION_STATE_RETAIN_TIME))
 
   mockSessions.getOrElse(recover()).foreach(register)
   new GarbageCollector().start()
@@ -134,6 +136,20 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
     def expired(session: Session): Boolean = {
       val currentTime = System.nanoTime()
       currentTime - session.lastActivity > sessionTimeout
+      session.state match {
+        case s: FinishedSessionState =>
+          val currentTime = System.nanoTime()
+          currentTime - s.time > sessionStateRetainedInSec
+        case _ =>
+          if (!sessionTimeoutCheck) {
+            false
+          } else if (session.isInstanceOf[BatchSession]) {
+            false
+          } else {
+            val currentTime = System.nanoTime()
+            currentTime - session.lastActivity > sessionTimeout
+          }
+      }
     }
 
     Future.sequence(all().filter(expired).map(delete))

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/bfdb5a19/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala
index 8195bf8..241ba27 100644
--- a/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala
+++ b/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala
@@ -30,14 +30,17 @@ import org.scalatest.mock.MockitoSugar.mock
 
 import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf}
 import com.cloudera.livy.server.batch.{BatchRecoveryMetadata, BatchSession}
+import com.cloudera.livy.server.interactive.InteractiveSession
 import com.cloudera.livy.server.recovery.SessionStore
 import com.cloudera.livy.sessions.Session.RecoveryMetadata
 
 class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuite {
+  implicit def executor: ExecutionContext = ExecutionContext.global
+
   describe("SessionManager") {
     it("should garbage collect old sessions") {
       val livyConf = new LivyConf()
-      livyConf.set(SessionManager.SESSION_TIMEOUT, "100ms")
+      livyConf.set(LivyConf.SESSION_TIMEOUT, "100ms")
       val manager = new SessionManager[MockSession, RecoveryMetadata](
         livyConf,
         { _ => assert(false).asInstanceOf[MockSession] },
@@ -51,6 +54,59 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit
         manager.get(session.id) should be(None)
       }
     }
+
+    it("batch session should not be gc-ed until application is finished") {
+      val sessionId = 24
+      val session = mock[BatchSession]
+      when(session.id).thenReturn(sessionId)
+      when(session.stop()).thenReturn(Future {})
+      when(session.lastActivity).thenReturn(System.nanoTime())
+
+      val conf = new LivyConf().set(LivyConf.SESSION_STATE_RETAIN_TIME, "1s")
+      val sm = new BatchSessionManager(conf, mock[SessionStore], Some(Seq(session)))
+      testSessionGC(session, sm)
+    }
+
+    it("interactive session should not gc-ed if session timeout check is off") {
+      val sessionId = 24
+      val session = mock[InteractiveSession]
+      when(session.id).thenReturn(sessionId)
+      when(session.stop()).thenReturn(Future {})
+      when(session.lastActivity).thenReturn(System.nanoTime())
+
+      val conf = new LivyConf().set(LivyConf.SESSION_TIMEOUT_CHECK, false)
+        .set(LivyConf.SESSION_STATE_RETAIN_TIME, "1s")
+      val sm = new InteractiveSessionManager(conf, mock[SessionStore], Some(Seq(session)))
+      testSessionGC(session, sm)
+    }
+
+    def testSessionGC(session: Session, sm: SessionManager[_, _]): Unit = {
+
+      def changeStateAndCheck(s: SessionState)(fn: SessionManager[_, _] => Unit): Unit = {
+        when(session.state).thenReturn(s)
+        Await.result(sm.collectGarbage(), Duration.Inf)
+        fn(sm)
+      }
+
+      // Batch session should not be gc-ed when alive
+      for (s <- Seq(SessionState.Running(),
+        SessionState.Idle(),
+        SessionState.Recovering(),
+        SessionState.NotStarted(),
+        SessionState.Busy(),
+        SessionState.ShuttingDown())) {
+        changeStateAndCheck(s) { sm => sm.get(session.id) should be (Some(session)) }
+      }
+
+      // Stopped session should be gc-ed after retained timeout
+      for (s <- Seq(SessionState.Error(),
+        SessionState.Success(),
+        SessionState.Dead())) {
+        eventually(timeout(30 seconds), interval(100 millis)) {
+          changeStateAndCheck(s) { sm => sm.get(session.id) should be (None) }
+        }
+      }
+    }
   }
 
   describe("BatchSessionManager") {