You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by lm...@apache.org on 2023/02/10 16:12:37 UTC

[incubator-livy] branch master updated: [LIVY-968][SERVER] Provide ttl field for a livy session (#384)

This is an automated email from the ASF dual-hosted git repository.

lmccay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new 45e07fec [LIVY-968][SERVER] Provide ttl field for a livy session (#384)
45e07fec is described below

commit 45e07fec68f2b9ad1dc7ebce8db08ad8a778dddc
Author: Asif Khatri <12...@users.noreply.github.com>
AuthorDate: Fri Feb 10 21:42:31 2023 +0530

    [LIVY-968][SERVER] Provide ttl field for a livy session (#384)
    
    * [LIVY-968][SERVER] Provide ttl field for a livy session via askhatri
---
 .../org/apache/livy/client/common/ClientConf.java    | 18 +++++++++++++-----
 .../org/apache/livy/client/common/HttpMessages.java  |  6 ++++--
 .../apache/livy/client/common/TestClientConf.java    | 20 ++++++++++++++++++++
 .../org/apache/livy/client/http/HttpClientSpec.scala |  1 +
 docs/rest-api.md                                     | 10 ++++++++++
 scalastyle.xml                                       |  2 +-
 .../org/apache/livy/server/SessionServlet.scala      | 19 ++++++++++++-------
 .../interactive/CreateInteractiveRequest.scala       |  4 +++-
 .../livy/server/interactive/InteractiveSession.scala |  9 +++++++--
 .../interactive/InteractiveSessionServlet.scala      | 15 ++++++++++++---
 .../scala/org/apache/livy/sessions/Session.scala     |  9 +++++++++
 .../org/apache/livy/sessions/SessionManager.scala    | 13 ++++++++++---
 .../interactive/InteractiveSessionServletSpec.scala  |  2 ++
 .../server/interactive/InteractiveSessionSpec.scala  |  9 +++++----
 .../scala/org/apache/livy/sessions/MockSession.scala |  5 +++--
 .../apache/livy/sessions/SessionManagerSpec.scala    | 11 +++++++++++
 .../livy/thriftserver/LivyThriftSessionManager.scala |  3 ++-
 17 files changed, 125 insertions(+), 31 deletions(-)

diff --git a/client-common/src/main/java/org/apache/livy/client/common/ClientConf.java b/client-common/src/main/java/org/apache/livy/client/common/ClientConf.java
index d0234bfc..01571079 100644
--- a/client-common/src/main/java/org/apache/livy/client/common/ClientConf.java
+++ b/client-common/src/main/java/org/apache/livy/client/common/ClientConf.java
@@ -36,7 +36,7 @@ import org.apache.livy.annotations.Private;
 public abstract class ClientConf<T extends ClientConf>
   implements Iterable<Map.Entry<String, String>> {
 
-  protected Logger LOG = LoggerFactory.getLogger(getClass());
+  protected static final Logger LOG = LoggerFactory.getLogger(ClientConf.class);
 
   public static interface ConfEntry {
 
@@ -152,11 +152,15 @@ public abstract class ClientConf<T extends ClientConf>
     String time = get(e, String.class);
     if (time == null) {
       check(e.dflt() != null,
-        "ConfEntry %s doesn't have a default value, cannot convert to time value.", e.key());
+              "ConfEntry %s doesn't have a default value, cannot convert to time value.", e.key());
       time = (String) e.dflt();
     }
+    return getTimeAsMs(time);
+  }
 
-    Matcher m = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(time.toLowerCase());
+  public static long getTimeAsMs(String time) {
+    check(time != null && !time.trim().isEmpty(), "Invalid time string: %s", time);
+    Matcher m = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(time.trim().toLowerCase());
     if (!m.matches()) {
       throw new IllegalArgumentException("Invalid time string: " + time);
     }
@@ -168,8 +172,12 @@ public abstract class ClientConf<T extends ClientConf>
       throw new IllegalArgumentException("Invalid suffix: \"" + suffix + "\"");
     }
 
+    if (val < 0L) {
+      throw new IllegalArgumentException("Invalid value: " + val);
+    }
+
     return TimeUnit.MILLISECONDS.convert(val,
-      suffix != null ? TIME_SUFFIXES.get(suffix) : TimeUnit.MILLISECONDS);
+            suffix != null ? TIME_SUFFIXES.get(suffix) : TimeUnit.MILLISECONDS);
   }
 
   @SuppressWarnings("unchecked")
@@ -204,7 +212,7 @@ public abstract class ClientConf<T extends ClientConf>
     return (o != null) ? o.getClass() : String.class;
   }
 
-  private void check(boolean test, String message, Object... args) {
+  private static void check(boolean test, String message, Object... args) {
     if (!test) {
       throw new IllegalArgumentException(String.format(message, args));
     }
diff --git a/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java b/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
index 2245eb98..a88ed8ca 100644
--- a/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
+++ b/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
@@ -61,9 +61,10 @@ public class HttpMessages {
     public final String kind;
     public final Map<String, String> appInfo;
     public final List<String> log;
+    public final String ttl;
 
     public SessionInfo(int id, String name, String appId, String owner, String proxyUser,
-        String state, String kind, Map<String, String> appInfo, List<String> log) {
+        String state, String kind, Map<String, String> appInfo, List<String> log, String ttl) {
       this.id = id;
       this.name = name;
       this.appId = appId;
@@ -73,10 +74,11 @@ public class HttpMessages {
       this.kind = kind;
       this.appInfo = appInfo;
       this.log = log;
+      this.ttl = ttl;
     }
 
     private SessionInfo() {
-      this(-1, null, null, null, null, null, null, null, null);
+      this(-1, null, null, null, null, null, null, null, null, null);
     }
 
   }
diff --git a/client-common/src/test/java/org/apache/livy/client/common/TestClientConf.java b/client-common/src/test/java/org/apache/livy/client/common/TestClientConf.java
index 02bbaaad..b07b396b 100644
--- a/client-common/src/test/java/org/apache/livy/client/common/TestClientConf.java
+++ b/client-common/src/test/java/org/apache/livy/client/common/TestClientConf.java
@@ -37,6 +37,26 @@ public class TestClientConf {
     assertEquals(42, conf.getInt(TestConf.Entry.INT));
     assertEquals(84L, conf.getLong(TestConf.Entry.LONG));
     assertEquals(168L, conf.getTimeAsMs(TestConf.Entry.TIME));
+    assertEquals(60000L, TestConf.getTimeAsMs("1m"));
+    assertEquals(80L, TestConf.getTimeAsMs("  80    "));
+    try {
+      TestConf.getTimeAsMs("invalid");
+      fail("Should have failed to getTimeAsMs for invalid ttl.");
+    } catch (IllegalArgumentException ie) {
+      // Expected.
+    }
+    try {
+      TestConf.getTimeAsMs("30b");
+      fail("Should have failed to getTimeAsMs for invalid ttl suffix.");
+    } catch (IllegalArgumentException ie) {
+      // Expected.
+    }
+    try {
+      TestConf.getTimeAsMs("-1m");
+      fail("Should have failed to getTimeAsMs for invalid ttl value.");
+    } catch (IllegalArgumentException ie) {
+      // Expected.
+    }
 
     try {
       conf.get(TestConf.Entry.INT);
diff --git a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
index f53d9f5b..336ff8c8 100644
--- a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
+++ b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
@@ -280,6 +280,7 @@ private class HttpClientTestBootstrap extends LifeCycle {
         when(session.proxyUser).thenReturn(None)
         when(session.kind).thenReturn(Spark)
         when(session.stop()).thenReturn(Future.successful(()))
+        when(session.ttl).thenReturn(None)
         require(HttpClientSpec.session == null, "Session already created?")
         HttpClientSpec.session = session
         session
diff --git a/docs/rest-api.md b/docs/rest-api.md
index d80e77d2..342bd59a 100644
--- a/docs/rest-api.md
+++ b/docs/rest-api.md
@@ -151,6 +151,11 @@ Creates a new interactive Scala, Python, or R shell in the cluster.
     <td>Timeout in second to which session be orphaned</td>
     <td>int</td>
   </tr>
+  <tr>
+    <td>ttl</td>
+    <td>The timeout for this inactive session, example: 10m (10 minutes)</td>
+    <td>string</td>
+  </tr>
 </table>
 
 <a id="footnote1">1</a>: Starting with version 0.5.0-incubating this field is not required. To be
@@ -816,6 +821,11 @@ A statement represents the result of an execution statement.
     <td>The detailed application info</td>
     <td>Map of key=val</td>
   </tr>
+  <tr>
+    <td>ttl</td>
+    <td>The timeout for this inactive session, example: 10m (10 minutes)</td>
+    <td>string</td>
+  </tr>
   <tr>
     <td>log</td>
     <td>The log lines</td>
diff --git a/scalastyle.xml b/scalastyle.xml
index 28609fb7..3abfe183 100644
--- a/scalastyle.xml
+++ b/scalastyle.xml
@@ -57,7 +57,7 @@
 
   <check level="error" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
     <parameters>
-      <parameter name="maxParameters"><![CDATA[10]]></parameter>
+      <parameter name="maxParameters"><![CDATA[11]]></parameter>
     </parameters>
   </check>
 
diff --git a/server/src/main/scala/org/apache/livy/server/SessionServlet.scala b/server/src/main/scala/org/apache/livy/server/SessionServlet.scala
index a726e7d8..a63e1aec 100644
--- a/server/src/main/scala/org/apache/livy/server/SessionServlet.scala
+++ b/server/src/main/scala/org/apache/livy/server/SessionServlet.scala
@@ -131,13 +131,18 @@ abstract class SessionServlet[S <: Session, R <: RecoveryMetadata](
       if (tooManySessions) {
         BadRequest(ResponseMessage("Rejected, too many sessions are being created!"))
       } else {
-        val session = sessionManager.register(createSession(request))
-        // Because it may take some time to establish the session, update the last activity
-        // time before returning the session info to the client.
-        session.recordActivity()
-        Created(clientSessionView(session, request),
-          headers = Map("Location" ->
-            (getRequestPathInfo(request) + url(getSession, "id" -> session.id.toString))))
+        try {
+          val session = sessionManager.register(createSession(request))
+          // Because it may take some time to establish the session, update the last activity
+          // time before returning the session info to the client.
+          session.recordActivity()
+          Created(clientSessionView(session, request),
+            headers = Map("Location" ->
+              (getRequestPathInfo(request) + url(getSession, "id" -> session.id.toString))))
+        } catch {
+          case e: IllegalArgumentException =>
+            BadRequest(ResponseMessage("Rejected, Reason: " + e.getMessage))
+        }
       }
     }
   }
diff --git a/server/src/main/scala/org/apache/livy/server/interactive/CreateInteractiveRequest.scala b/server/src/main/scala/org/apache/livy/server/interactive/CreateInteractiveRequest.scala
index b2f34b00..c685e29c 100644
--- a/server/src/main/scala/org/apache/livy/server/interactive/CreateInteractiveRequest.scala
+++ b/server/src/main/scala/org/apache/livy/server/interactive/CreateInteractiveRequest.scala
@@ -35,6 +35,7 @@ class CreateInteractiveRequest {
   var name: Option[String] = None
   var conf: Map[String, String] = Map()
   var heartbeatTimeoutInSecond: Int = 0
+  var ttl: Option[String] = None
 
   override def toString: String = {
     s"[kind: $kind, proxyUser: $proxyUser, " +
@@ -50,6 +51,7 @@ class CreateInteractiveRequest {
       (if (queue.isDefined) s"queue: ${queue.get}, " else "") +
       (if (name.isDefined) s"name: ${name.get}, " else "") +
       (if (conf.nonEmpty) s"conf: ${conf.mkString(",")}, " else "") +
-      s"heartbeatTimeoutInSecond: $heartbeatTimeoutInSecond]"
+      s"heartbeatTimeoutInSecond: $heartbeatTimeoutInSecond, " +
+      (if (ttl.isDefined) s"ttl: ${ttl.get}]" else "]")
   }
 }
diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
index c4c273ac..61305242 100644
--- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
+++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -54,6 +54,7 @@ case class InteractiveRecoveryMetadata(
     kind: Kind,
     heartbeatTimeoutS: Int,
     owner: String,
+    ttl: Option[String],
     proxyUser: Option[String],
     rscDriverUri: Option[URI],
     version: Int = 1)
@@ -73,6 +74,7 @@ object InteractiveSession extends Logging {
       accessManager: AccessManager,
       request: CreateInteractiveRequest,
       sessionStore: SessionStore,
+      ttl: Option[String],
       mockApp: Option[SparkApp] = None,
       mockClient: Option[RSCClient] = None): InteractiveSession = {
     val appTag = s"livy-session-$id-${Random.alphanumeric.take(8).mkString}"
@@ -123,6 +125,7 @@ object InteractiveSession extends Logging {
       livyConf,
       owner,
       impersonatedUser,
+      ttl,
       sessionStore,
       mockApp)
   }
@@ -150,6 +153,7 @@ object InteractiveSession extends Logging {
       livyConf,
       metadata.owner,
       metadata.proxyUser,
+      metadata.ttl,
       sessionStore,
       mockApp)
   }
@@ -371,9 +375,10 @@ class InteractiveSession(
     livyConf: LivyConf,
     owner: String,
     override val proxyUser: Option[String],
+    ttl: Option[String],
     sessionStore: SessionStore,
     mockApp: Option[SparkApp]) // For unit test.
-  extends Session(id, name, owner, livyConf)
+  extends Session(id, name, owner, ttl, livyConf)
   with SessionHeartbeat
   with SparkAppListener {
 
@@ -464,7 +469,7 @@ class InteractiveSession(
 
   override def recoveryMetadata: RecoveryMetadata =
     InteractiveRecoveryMetadata(id, name, appId, appTag, kind,
-      heartbeatTimeout.toSeconds.toInt, owner, proxyUser, rscDriverUri)
+      heartbeatTimeout.toSeconds.toInt, owner, None, proxyUser, rscDriverUri)
 
   override def state: SessionState = {
     if (serverSideState == SessionState.Running) {
diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
index 3b700875..4440c775 100644
--- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
+++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
@@ -29,6 +29,7 @@ import org.scalatra._
 import org.scalatra.servlet.FileUploadSupport
 
 import org.apache.livy.{CompletionRequest, ExecuteRequest, JobHandle, LivyConf, Logging}
+import org.apache.livy.client.common.ClientConf
 import org.apache.livy.client.common.HttpMessages
 import org.apache.livy.client.common.HttpMessages._
 import org.apache.livy.server.{AccessManager, SessionServlet}
@@ -52,15 +53,23 @@ class InteractiveSessionServlet(
 
   override protected def createSession(req: HttpServletRequest): InteractiveSession = {
     val createRequest = bodyAs[CreateInteractiveRequest](req)
+    val sessionId = sessionManager.nextId();
+
+    // Calling getTimeAsMs just to validate the ttl value
+    if (createRequest.ttl.isDefined) {
+      ClientConf.getTimeAsMs(createRequest.ttl.get);
+    }
+
     InteractiveSession.create(
-      sessionManager.nextId(),
+      sessionId,
       createRequest.name,
       remoteUser(req),
       proxyUser(req, createRequest.proxyUser),
       livyConf,
       accessManager,
       createRequest,
-      sessionStore)
+      sessionStore,
+      createRequest.ttl)
   }
 
   override protected[interactive] def clientSessionView(
@@ -85,7 +94,7 @@ class InteractiveSessionServlet(
 
     new SessionInfo(session.id, session.name.orNull, session.appId.orNull, session.owner,
       session.proxyUser.orNull, session.state.toString, session.kind.toString,
-      session.appInfo.asJavaMap, logs.asJava)
+      session.appInfo.asJavaMap, logs.asJava, session.ttl.orNull)
   }
 
   post("/:id/stop") {
diff --git a/server/src/main/scala/org/apache/livy/sessions/Session.scala b/server/src/main/scala/org/apache/livy/sessions/Session.scala
index 67f78d4d..197e22f9 100644
--- a/server/src/main/scala/org/apache/livy/sessions/Session.scala
+++ b/server/src/main/scala/org/apache/livy/sessions/Session.scala
@@ -138,9 +138,17 @@ abstract class Session(
     val id: Int,
     val name: Option[String],
     val owner: String,
+    val ttl: Option[String],
     val livyConf: LivyConf)
   extends Logging {
 
+  def this(id: Int,
+   name: Option[String],
+   owner: String,
+   livyConf: LivyConf) {
+    this(id, name, owner, None, livyConf)
+  }
+
   import Session._
 
   protected implicit val executionContext = ExecutionContext.global
@@ -164,6 +172,7 @@ abstract class Session(
 
   var appInfo: AppInfo = AppInfo()
 
+
   def lastActivity: Long = state match {
     case SessionState.Error(time) => time
     case SessionState.Dead(time) => time
diff --git a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
index f2548ac0..25bf593d 100644
--- a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
+++ b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
@@ -27,6 +27,7 @@ import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
 import org.apache.livy.{LivyConf, Logging}
+import org.apache.livy.client.common.ClientConf
 import org.apache.livy.server.batch.{BatchRecoveryMetadata, BatchSession}
 import org.apache.livy.server.interactive.{InteractiveRecoveryMetadata, InteractiveSession, SessionHeartbeatWatchdog}
 import org.apache.livy.server.recovery.SessionStore
@@ -79,8 +80,9 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
   private[this] final val sessionTimeoutCheck = livyConf.getBoolean(LivyConf.SESSION_TIMEOUT_CHECK)
   private[this] final val sessionTimeoutCheckSkipBusy =
     livyConf.getBoolean(LivyConf.SESSION_TIMEOUT_CHECK_SKIP_BUSY)
-  private[this] final val sessionTimeout =
-    TimeUnit.MILLISECONDS.toNanos(livyConf.getTimeAsMs(LivyConf.SESSION_TIMEOUT))
+
+  private[this] final val sessionTimeout = livyConf.getTimeAsMs(LivyConf.SESSION_TIMEOUT)
+
   private[this] final val sessionStateRetainedInSec =
     TimeUnit.MILLISECONDS.toNanos(livyConf.getTimeAsMs(LivyConf.SESSION_STATE_RETAIN_TIME))
 
@@ -168,7 +170,12 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
             false
           } else {
             val currentTime = System.nanoTime()
-            currentTime - session.lastActivity > sessionTimeout
+            var calculatedTimeout = sessionTimeout;
+            if (session.ttl.isDefined) {
+              calculatedTimeout = ClientConf.getTimeAsMs(session.ttl.get)
+            }
+            calculatedTimeout = TimeUnit.MILLISECONDS.toNanos(calculatedTimeout)
+            currentTime - session.lastActivity > calculatedTimeout
           }
       }
     }
diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
index 78407d5a..0f1cdc7f 100644
--- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
@@ -69,6 +69,7 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec {
       when(session.stop()).thenReturn(Future.successful(()))
       when(session.proxyUser).thenReturn(None)
       when(session.heartbeatExpired).thenReturn(false)
+      when(session.ttl).thenReturn(None)
       when(session.statements).thenAnswer(
         new Answer[IndexedSeq[Statement]]() {
           override def answer(args: InvocationOnMock): IndexedSeq[Statement] = statements
@@ -182,6 +183,7 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec {
     when(session.appInfo).thenReturn(appInfo)
     when(session.logLines()).thenReturn(log)
     when(session.heartbeatExpired).thenReturn(false)
+    when(session.ttl).thenReturn(None)
 
     val req = mock[HttpServletRequest]
 
diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
index d13e6826..02aca27a 100644
--- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
@@ -71,7 +71,7 @@ class InteractiveSessionSpec extends FunSpec
       RSCConf.Entry.LIVY_JARS.key() -> ""
     )
     InteractiveSession.create(0, None, null, None, livyConf, accessManager, req,
-      sessionStore, mockApp)
+      sessionStore, None, mockApp)
   }
 
   private def executeStatement(code: String, codeType: Option[String] = None): JValue = {
@@ -277,7 +277,8 @@ class InteractiveSessionSpec extends FunSpec
       val mockClient = mock[RSCClient]
       when(mockClient.submit(any(classOf[PingJob]))).thenReturn(mock[JobHandle[Void]])
       val m = InteractiveRecoveryMetadata(
-          78, Some("Test session"), None, "appTag", Spark, 0, null, None, Some(URI.create("")))
+          78, Some("Test session"), None, "appTag", Spark, 0, null, None, None,
+          Some(URI.create("")))
       val s = InteractiveSession.recover(m, conf, sessionStore, None, Some(mockClient))
       s.start()
 
@@ -294,7 +295,7 @@ class InteractiveSessionSpec extends FunSpec
       val mockClient = mock[RSCClient]
       when(mockClient.submit(any(classOf[PingJob]))).thenReturn(mock[JobHandle[Void]])
       val m = InteractiveRecoveryMetadata(
-          78, None, None, "appTag", Spark, 0, null, None, Some(URI.create("")))
+          78, None, None, "appTag", Spark, 0, null, None, None, Some(URI.create("")))
       val s = InteractiveSession.recover(m, conf, sessionStore, None, Some(mockClient))
       s.start()
 
@@ -309,7 +310,7 @@ class InteractiveSessionSpec extends FunSpec
       val conf = new LivyConf()
       val sessionStore = mock[SessionStore]
       val m = InteractiveRecoveryMetadata(
-        78, None, Some("appId"), "appTag", Spark, 0, null, None, None)
+        78, None, Some("appId"), "appTag", Spark, 0, null, None, None, None)
       val s = InteractiveSession.recover(m, conf, sessionStore, None)
       s.start()
       s.state shouldBe a[SessionState.Dead]
diff --git a/server/src/test/scala/org/apache/livy/sessions/MockSession.scala b/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
index f9609b19..e28d566b 100644
--- a/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
+++ b/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
@@ -19,8 +19,9 @@ package org.apache.livy.sessions
 
 import org.apache.livy.LivyConf
 
-class MockSession(id: Int, owner: String, conf: LivyConf, name: Option[String] = None)
-  extends Session(id, name, owner, conf) {
+class MockSession(id: Int, owner: String, conf: LivyConf, name: Option[String] = None,
+                  ttl: Option[String] = None)
+  extends Session(id, name, owner, ttl, conf) {
   case class RecoveryMetadata(id: Int) extends Session.RecoveryMetadata()
 
   override val proxyUser = None
diff --git a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
index 8014d4ab..c32eebaf 100644
--- a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
+++ b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
@@ -59,6 +59,17 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit
       }
     }
 
+    it("should garbage collect old sessions with ttl") {
+      val (livyConf, manager) = createSessionManager()
+      val session = manager.register(new MockSession(manager.nextId(), null, livyConf,
+        None, Some("4s")))
+      manager.get(session.id).isDefined should be(true)
+      eventually(timeout(5 seconds), interval(100 millis)) {
+        Await.result(manager.collectGarbage(), Duration.Inf)
+        manager.get(session.id) should be(None)
+      }
+    }
+
     it("should not garbage collect busy sessions if skip-busy configured") {
       val lc = new LivyConf()
       lc.set(LivyConf.SESSION_TIMEOUT_CHECK_SKIP_BUSY, true)
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
index 0b4bbd54..11294db8 100644
--- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
@@ -235,7 +235,8 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
         server.livyConf,
         server.accessManager,
         createInteractiveRequest,
-        server.sessionStore)
+        server.sessionStore,
+        None)
       onLivySessionOpened(newSession)
       newSession
     }