You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by lm...@apache.org on 2023/02/10 16:12:37 UTC
[incubator-livy] branch master updated: [LIVY-968][SERVER] Provide ttl field for a livy session (#384)
This is an automated email from the ASF dual-hosted git repository.
lmccay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push:
new 45e07fec [LIVY-968][SERVER] Provide ttl field for a livy session (#384)
45e07fec is described below
commit 45e07fec68f2b9ad1dc7ebce8db08ad8a778dddc
Author: Asif Khatri <12...@users.noreply.github.com>
AuthorDate: Fri Feb 10 21:42:31 2023 +0530
[LIVY-968][SERVER] Provide ttl field for a livy session (#384)
* [LIVY-968][SERVER] Provide ttl field for a livy session via askhatri
---
.../org/apache/livy/client/common/ClientConf.java | 18 +++++++++++++-----
.../org/apache/livy/client/common/HttpMessages.java | 6 ++++--
.../apache/livy/client/common/TestClientConf.java | 20 ++++++++++++++++++++
.../org/apache/livy/client/http/HttpClientSpec.scala | 1 +
docs/rest-api.md | 10 ++++++++++
scalastyle.xml | 2 +-
.../org/apache/livy/server/SessionServlet.scala | 19 ++++++++++++-------
.../interactive/CreateInteractiveRequest.scala | 4 +++-
.../livy/server/interactive/InteractiveSession.scala | 9 +++++++--
.../interactive/InteractiveSessionServlet.scala | 15 ++++++++++++---
.../scala/org/apache/livy/sessions/Session.scala | 9 +++++++++
.../org/apache/livy/sessions/SessionManager.scala | 13 ++++++++++---
.../interactive/InteractiveSessionServletSpec.scala | 2 ++
.../server/interactive/InteractiveSessionSpec.scala | 9 +++++----
.../scala/org/apache/livy/sessions/MockSession.scala | 5 +++--
.../apache/livy/sessions/SessionManagerSpec.scala | 11 +++++++++++
.../livy/thriftserver/LivyThriftSessionManager.scala | 3 ++-
17 files changed, 125 insertions(+), 31 deletions(-)
diff --git a/client-common/src/main/java/org/apache/livy/client/common/ClientConf.java b/client-common/src/main/java/org/apache/livy/client/common/ClientConf.java
index d0234bfc..01571079 100644
--- a/client-common/src/main/java/org/apache/livy/client/common/ClientConf.java
+++ b/client-common/src/main/java/org/apache/livy/client/common/ClientConf.java
@@ -36,7 +36,7 @@ import org.apache.livy.annotations.Private;
public abstract class ClientConf<T extends ClientConf>
implements Iterable<Map.Entry<String, String>> {
- protected Logger LOG = LoggerFactory.getLogger(getClass());
+ protected static final Logger LOG = LoggerFactory.getLogger(ClientConf.class);
public static interface ConfEntry {
@@ -152,11 +152,15 @@ public abstract class ClientConf<T extends ClientConf>
String time = get(e, String.class);
if (time == null) {
check(e.dflt() != null,
- "ConfEntry %s doesn't have a default value, cannot convert to time value.", e.key());
+ "ConfEntry %s doesn't have a default value, cannot convert to time value.", e.key());
time = (String) e.dflt();
}
+ return getTimeAsMs(time);
+ }
- Matcher m = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(time.toLowerCase());
+ public static long getTimeAsMs(String time) {
+ check(time != null && !time.trim().isEmpty(), "Invalid time string: %s", time);
+ Matcher m = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(time.trim().toLowerCase());
if (!m.matches()) {
throw new IllegalArgumentException("Invalid time string: " + time);
}
@@ -168,8 +172,12 @@ public abstract class ClientConf<T extends ClientConf>
throw new IllegalArgumentException("Invalid suffix: \"" + suffix + "\"");
}
+ if (val < 0L) {
+ throw new IllegalArgumentException("Invalid value: " + val);
+ }
+
return TimeUnit.MILLISECONDS.convert(val,
- suffix != null ? TIME_SUFFIXES.get(suffix) : TimeUnit.MILLISECONDS);
+ suffix != null ? TIME_SUFFIXES.get(suffix) : TimeUnit.MILLISECONDS);
}
@SuppressWarnings("unchecked")
@@ -204,7 +212,7 @@ public abstract class ClientConf<T extends ClientConf>
return (o != null) ? o.getClass() : String.class;
}
- private void check(boolean test, String message, Object... args) {
+ private static void check(boolean test, String message, Object... args) {
if (!test) {
throw new IllegalArgumentException(String.format(message, args));
}
diff --git a/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java b/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
index 2245eb98..a88ed8ca 100644
--- a/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
+++ b/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
@@ -61,9 +61,10 @@ public class HttpMessages {
public final String kind;
public final Map<String, String> appInfo;
public final List<String> log;
+ public final String ttl;
public SessionInfo(int id, String name, String appId, String owner, String proxyUser,
- String state, String kind, Map<String, String> appInfo, List<String> log) {
+ String state, String kind, Map<String, String> appInfo, List<String> log, String ttl) {
this.id = id;
this.name = name;
this.appId = appId;
@@ -73,10 +74,11 @@ public class HttpMessages {
this.kind = kind;
this.appInfo = appInfo;
this.log = log;
+ this.ttl = ttl;
}
private SessionInfo() {
- this(-1, null, null, null, null, null, null, null, null);
+ this(-1, null, null, null, null, null, null, null, null, null);
}
}
diff --git a/client-common/src/test/java/org/apache/livy/client/common/TestClientConf.java b/client-common/src/test/java/org/apache/livy/client/common/TestClientConf.java
index 02bbaaad..b07b396b 100644
--- a/client-common/src/test/java/org/apache/livy/client/common/TestClientConf.java
+++ b/client-common/src/test/java/org/apache/livy/client/common/TestClientConf.java
@@ -37,6 +37,26 @@ public class TestClientConf {
assertEquals(42, conf.getInt(TestConf.Entry.INT));
assertEquals(84L, conf.getLong(TestConf.Entry.LONG));
assertEquals(168L, conf.getTimeAsMs(TestConf.Entry.TIME));
+ assertEquals(60000L, TestConf.getTimeAsMs("1m"));
+ assertEquals(80L, TestConf.getTimeAsMs(" 80 "));
+ try {
+ TestConf.getTimeAsMs("invalid");
+ fail("Should have failed to getTimeAsMs for invalid ttl.");
+ } catch (IllegalArgumentException ie) {
+ // Expected.
+ }
+ try {
+ TestConf.getTimeAsMs("30b");
+ fail("Should have failed to getTimeAsMs for invalid ttl suffix.");
+ } catch (IllegalArgumentException ie) {
+ // Expected.
+ }
+ try {
+ TestConf.getTimeAsMs("-1m");
+ fail("Should have failed to getTimeAsMs for invalid ttl value.");
+ } catch (IllegalArgumentException ie) {
+ // Expected.
+ }
try {
conf.get(TestConf.Entry.INT);
diff --git a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
index f53d9f5b..336ff8c8 100644
--- a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
+++ b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
@@ -280,6 +280,7 @@ private class HttpClientTestBootstrap extends LifeCycle {
when(session.proxyUser).thenReturn(None)
when(session.kind).thenReturn(Spark)
when(session.stop()).thenReturn(Future.successful(()))
+ when(session.ttl).thenReturn(None)
require(HttpClientSpec.session == null, "Session already created?")
HttpClientSpec.session = session
session
diff --git a/docs/rest-api.md b/docs/rest-api.md
index d80e77d2..342bd59a 100644
--- a/docs/rest-api.md
+++ b/docs/rest-api.md
@@ -151,6 +151,11 @@ Creates a new interactive Scala, Python, or R shell in the cluster.
<td>Timeout in second to which session be orphaned</td>
<td>int</td>
</tr>
+ <tr>
+ <td>ttl</td>
+ <td>The timeout for this inactive session, example: 10m (10 minutes)</td>
+ <td>string</td>
+ </tr>
</table>
<a id="footnote1">1</a>: Starting with version 0.5.0-incubating this field is not required. To be
@@ -816,6 +821,11 @@ A statement represents the result of an execution statement.
<td>The detailed application info</td>
<td>Map of key=val</td>
</tr>
+ <tr>
+ <td>ttl</td>
+ <td>The timeout for this inactive session, example: 10m (10 minutes)</td>
+ <td>string</td>
+ </tr>
<tr>
<td>log</td>
<td>The log lines</td>
diff --git a/scalastyle.xml b/scalastyle.xml
index 28609fb7..3abfe183 100644
--- a/scalastyle.xml
+++ b/scalastyle.xml
@@ -57,7 +57,7 @@
<check level="error" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
<parameters>
- <parameter name="maxParameters"><![CDATA[10]]></parameter>
+ <parameter name="maxParameters"><![CDATA[11]]></parameter>
</parameters>
</check>
diff --git a/server/src/main/scala/org/apache/livy/server/SessionServlet.scala b/server/src/main/scala/org/apache/livy/server/SessionServlet.scala
index a726e7d8..a63e1aec 100644
--- a/server/src/main/scala/org/apache/livy/server/SessionServlet.scala
+++ b/server/src/main/scala/org/apache/livy/server/SessionServlet.scala
@@ -131,13 +131,18 @@ abstract class SessionServlet[S <: Session, R <: RecoveryMetadata](
if (tooManySessions) {
BadRequest(ResponseMessage("Rejected, too many sessions are being created!"))
} else {
- val session = sessionManager.register(createSession(request))
- // Because it may take some time to establish the session, update the last activity
- // time before returning the session info to the client.
- session.recordActivity()
- Created(clientSessionView(session, request),
- headers = Map("Location" ->
- (getRequestPathInfo(request) + url(getSession, "id" -> session.id.toString))))
+ try {
+ val session = sessionManager.register(createSession(request))
+ // Because it may take some time to establish the session, update the last activity
+ // time before returning the session info to the client.
+ session.recordActivity()
+ Created(clientSessionView(session, request),
+ headers = Map("Location" ->
+ (getRequestPathInfo(request) + url(getSession, "id" -> session.id.toString))))
+ } catch {
+ case e: IllegalArgumentException =>
+ BadRequest(ResponseMessage("Rejected, Reason: " + e.getMessage))
+ }
}
}
}
diff --git a/server/src/main/scala/org/apache/livy/server/interactive/CreateInteractiveRequest.scala b/server/src/main/scala/org/apache/livy/server/interactive/CreateInteractiveRequest.scala
index b2f34b00..c685e29c 100644
--- a/server/src/main/scala/org/apache/livy/server/interactive/CreateInteractiveRequest.scala
+++ b/server/src/main/scala/org/apache/livy/server/interactive/CreateInteractiveRequest.scala
@@ -35,6 +35,7 @@ class CreateInteractiveRequest {
var name: Option[String] = None
var conf: Map[String, String] = Map()
var heartbeatTimeoutInSecond: Int = 0
+ var ttl: Option[String] = None
override def toString: String = {
s"[kind: $kind, proxyUser: $proxyUser, " +
@@ -50,6 +51,7 @@ class CreateInteractiveRequest {
(if (queue.isDefined) s"queue: ${queue.get}, " else "") +
(if (name.isDefined) s"name: ${name.get}, " else "") +
(if (conf.nonEmpty) s"conf: ${conf.mkString(",")}, " else "") +
- s"heartbeatTimeoutInSecond: $heartbeatTimeoutInSecond]"
+ s"heartbeatTimeoutInSecond: $heartbeatTimeoutInSecond, " +
+ (if (ttl.isDefined) s"ttl: ${ttl.get}]" else "]")
}
}
diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
index c4c273ac..61305242 100644
--- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
+++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -54,6 +54,7 @@ case class InteractiveRecoveryMetadata(
kind: Kind,
heartbeatTimeoutS: Int,
owner: String,
+ ttl: Option[String],
proxyUser: Option[String],
rscDriverUri: Option[URI],
version: Int = 1)
@@ -73,6 +74,7 @@ object InteractiveSession extends Logging {
accessManager: AccessManager,
request: CreateInteractiveRequest,
sessionStore: SessionStore,
+ ttl: Option[String],
mockApp: Option[SparkApp] = None,
mockClient: Option[RSCClient] = None): InteractiveSession = {
val appTag = s"livy-session-$id-${Random.alphanumeric.take(8).mkString}"
@@ -123,6 +125,7 @@ object InteractiveSession extends Logging {
livyConf,
owner,
impersonatedUser,
+ ttl,
sessionStore,
mockApp)
}
@@ -150,6 +153,7 @@ object InteractiveSession extends Logging {
livyConf,
metadata.owner,
metadata.proxyUser,
+ metadata.ttl,
sessionStore,
mockApp)
}
@@ -371,9 +375,10 @@ class InteractiveSession(
livyConf: LivyConf,
owner: String,
override val proxyUser: Option[String],
+ ttl: Option[String],
sessionStore: SessionStore,
mockApp: Option[SparkApp]) // For unit test.
- extends Session(id, name, owner, livyConf)
+ extends Session(id, name, owner, ttl, livyConf)
with SessionHeartbeat
with SparkAppListener {
@@ -464,7 +469,7 @@ class InteractiveSession(
override def recoveryMetadata: RecoveryMetadata =
InteractiveRecoveryMetadata(id, name, appId, appTag, kind,
- heartbeatTimeout.toSeconds.toInt, owner, proxyUser, rscDriverUri)
+ heartbeatTimeout.toSeconds.toInt, owner, None, proxyUser, rscDriverUri)
override def state: SessionState = {
if (serverSideState == SessionState.Running) {
diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
index 3b700875..4440c775 100644
--- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
+++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
@@ -29,6 +29,7 @@ import org.scalatra._
import org.scalatra.servlet.FileUploadSupport
import org.apache.livy.{CompletionRequest, ExecuteRequest, JobHandle, LivyConf, Logging}
+import org.apache.livy.client.common.ClientConf
import org.apache.livy.client.common.HttpMessages
import org.apache.livy.client.common.HttpMessages._
import org.apache.livy.server.{AccessManager, SessionServlet}
@@ -52,15 +53,23 @@ class InteractiveSessionServlet(
override protected def createSession(req: HttpServletRequest): InteractiveSession = {
val createRequest = bodyAs[CreateInteractiveRequest](req)
+ val sessionId = sessionManager.nextId();
+
+ // Calling getTimeAsMs just to validate the ttl value
+ if (createRequest.ttl.isDefined) {
+ ClientConf.getTimeAsMs(createRequest.ttl.get);
+ }
+
InteractiveSession.create(
- sessionManager.nextId(),
+ sessionId,
createRequest.name,
remoteUser(req),
proxyUser(req, createRequest.proxyUser),
livyConf,
accessManager,
createRequest,
- sessionStore)
+ sessionStore,
+ createRequest.ttl)
}
override protected[interactive] def clientSessionView(
@@ -85,7 +94,7 @@ class InteractiveSessionServlet(
new SessionInfo(session.id, session.name.orNull, session.appId.orNull, session.owner,
session.proxyUser.orNull, session.state.toString, session.kind.toString,
- session.appInfo.asJavaMap, logs.asJava)
+ session.appInfo.asJavaMap, logs.asJava, session.ttl.orNull)
}
post("/:id/stop") {
diff --git a/server/src/main/scala/org/apache/livy/sessions/Session.scala b/server/src/main/scala/org/apache/livy/sessions/Session.scala
index 67f78d4d..197e22f9 100644
--- a/server/src/main/scala/org/apache/livy/sessions/Session.scala
+++ b/server/src/main/scala/org/apache/livy/sessions/Session.scala
@@ -138,9 +138,17 @@ abstract class Session(
val id: Int,
val name: Option[String],
val owner: String,
+ val ttl: Option[String],
val livyConf: LivyConf)
extends Logging {
+ def this(id: Int,
+ name: Option[String],
+ owner: String,
+ livyConf: LivyConf) {
+ this(id, name, owner, None, livyConf)
+ }
+
import Session._
protected implicit val executionContext = ExecutionContext.global
@@ -164,6 +172,7 @@ abstract class Session(
var appInfo: AppInfo = AppInfo()
+
def lastActivity: Long = state match {
case SessionState.Error(time) => time
case SessionState.Dead(time) => time
diff --git a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
index f2548ac0..25bf593d 100644
--- a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
+++ b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
@@ -27,6 +27,7 @@ import scala.reflect.ClassTag
import scala.util.control.NonFatal
import org.apache.livy.{LivyConf, Logging}
+import org.apache.livy.client.common.ClientConf
import org.apache.livy.server.batch.{BatchRecoveryMetadata, BatchSession}
import org.apache.livy.server.interactive.{InteractiveRecoveryMetadata, InteractiveSession, SessionHeartbeatWatchdog}
import org.apache.livy.server.recovery.SessionStore
@@ -79,8 +80,9 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
private[this] final val sessionTimeoutCheck = livyConf.getBoolean(LivyConf.SESSION_TIMEOUT_CHECK)
private[this] final val sessionTimeoutCheckSkipBusy =
livyConf.getBoolean(LivyConf.SESSION_TIMEOUT_CHECK_SKIP_BUSY)
- private[this] final val sessionTimeout =
- TimeUnit.MILLISECONDS.toNanos(livyConf.getTimeAsMs(LivyConf.SESSION_TIMEOUT))
+
+ private[this] final val sessionTimeout = livyConf.getTimeAsMs(LivyConf.SESSION_TIMEOUT)
+
private[this] final val sessionStateRetainedInSec =
TimeUnit.MILLISECONDS.toNanos(livyConf.getTimeAsMs(LivyConf.SESSION_STATE_RETAIN_TIME))
@@ -168,7 +170,12 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
false
} else {
val currentTime = System.nanoTime()
- currentTime - session.lastActivity > sessionTimeout
+ var calculatedTimeout = sessionTimeout;
+ if (session.ttl.isDefined) {
+ calculatedTimeout = ClientConf.getTimeAsMs(session.ttl.get)
+ }
+ calculatedTimeout = TimeUnit.MILLISECONDS.toNanos(calculatedTimeout)
+ currentTime - session.lastActivity > calculatedTimeout
}
}
}
diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
index 78407d5a..0f1cdc7f 100644
--- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
@@ -69,6 +69,7 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec {
when(session.stop()).thenReturn(Future.successful(()))
when(session.proxyUser).thenReturn(None)
when(session.heartbeatExpired).thenReturn(false)
+ when(session.ttl).thenReturn(None)
when(session.statements).thenAnswer(
new Answer[IndexedSeq[Statement]]() {
override def answer(args: InvocationOnMock): IndexedSeq[Statement] = statements
@@ -182,6 +183,7 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec {
when(session.appInfo).thenReturn(appInfo)
when(session.logLines()).thenReturn(log)
when(session.heartbeatExpired).thenReturn(false)
+ when(session.ttl).thenReturn(None)
val req = mock[HttpServletRequest]
diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
index d13e6826..02aca27a 100644
--- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
@@ -71,7 +71,7 @@ class InteractiveSessionSpec extends FunSpec
RSCConf.Entry.LIVY_JARS.key() -> ""
)
InteractiveSession.create(0, None, null, None, livyConf, accessManager, req,
- sessionStore, mockApp)
+ sessionStore, None, mockApp)
}
private def executeStatement(code: String, codeType: Option[String] = None): JValue = {
@@ -277,7 +277,8 @@ class InteractiveSessionSpec extends FunSpec
val mockClient = mock[RSCClient]
when(mockClient.submit(any(classOf[PingJob]))).thenReturn(mock[JobHandle[Void]])
val m = InteractiveRecoveryMetadata(
- 78, Some("Test session"), None, "appTag", Spark, 0, null, None, Some(URI.create("")))
+ 78, Some("Test session"), None, "appTag", Spark, 0, null, None, None,
+ Some(URI.create("")))
val s = InteractiveSession.recover(m, conf, sessionStore, None, Some(mockClient))
s.start()
@@ -294,7 +295,7 @@ class InteractiveSessionSpec extends FunSpec
val mockClient = mock[RSCClient]
when(mockClient.submit(any(classOf[PingJob]))).thenReturn(mock[JobHandle[Void]])
val m = InteractiveRecoveryMetadata(
- 78, None, None, "appTag", Spark, 0, null, None, Some(URI.create("")))
+ 78, None, None, "appTag", Spark, 0, null, None, None, Some(URI.create("")))
val s = InteractiveSession.recover(m, conf, sessionStore, None, Some(mockClient))
s.start()
@@ -309,7 +310,7 @@ class InteractiveSessionSpec extends FunSpec
val conf = new LivyConf()
val sessionStore = mock[SessionStore]
val m = InteractiveRecoveryMetadata(
- 78, None, Some("appId"), "appTag", Spark, 0, null, None, None)
+ 78, None, Some("appId"), "appTag", Spark, 0, null, None, None, None)
val s = InteractiveSession.recover(m, conf, sessionStore, None)
s.start()
s.state shouldBe a[SessionState.Dead]
diff --git a/server/src/test/scala/org/apache/livy/sessions/MockSession.scala b/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
index f9609b19..e28d566b 100644
--- a/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
+++ b/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
@@ -19,8 +19,9 @@ package org.apache.livy.sessions
import org.apache.livy.LivyConf
-class MockSession(id: Int, owner: String, conf: LivyConf, name: Option[String] = None)
- extends Session(id, name, owner, conf) {
+class MockSession(id: Int, owner: String, conf: LivyConf, name: Option[String] = None,
+ ttl: Option[String] = None)
+ extends Session(id, name, owner, ttl, conf) {
case class RecoveryMetadata(id: Int) extends Session.RecoveryMetadata()
override val proxyUser = None
diff --git a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
index 8014d4ab..c32eebaf 100644
--- a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
+++ b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
@@ -59,6 +59,17 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit
}
}
+ it("should garbage collect old sessions with ttl") {
+ val (livyConf, manager) = createSessionManager()
+ val session = manager.register(new MockSession(manager.nextId(), null, livyConf,
+ None, Some("4s")))
+ manager.get(session.id).isDefined should be(true)
+ eventually(timeout(5 seconds), interval(100 millis)) {
+ Await.result(manager.collectGarbage(), Duration.Inf)
+ manager.get(session.id) should be(None)
+ }
+ }
+
it("should not garbage collect busy sessions if skip-busy configured") {
val lc = new LivyConf()
lc.set(LivyConf.SESSION_TIMEOUT_CHECK_SKIP_BUSY, true)
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
index 0b4bbd54..11294db8 100644
--- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
@@ -235,7 +235,8 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
server.livyConf,
server.accessManager,
createInteractiveRequest,
- server.sessionStore)
+ server.sessionStore,
+ None)
onLivySessionOpened(newSession)
newSession
}