You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by mg...@apache.org on 2023/09/05 09:57:57 UTC
[incubator-livy] branch master updated: [LIVY-987] NPE when waiting for thrift session to start timeout.
This is an automated email from the ASF dual-hosted git repository.
mgaido pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push:
new bf30e3bd [LIVY-987] NPE when waiting for thrift session to start timeout.
bf30e3bd is described below
commit bf30e3bd3105f72a6a7fd2a276a16dabe8bdf104
Author: jianzhen.wu <ji...@shopee.com>
AuthorDate: Tue Sep 5 11:57:36 2023 +0200
[LIVY-987] NPE when waiting for thrift session to start timeout.
## What changes were proposed in this pull request?
Fix NPE when waiting for thrift session to start timeout.
https://issues.apache.org/jira/browse/LIVY-987
## How was this patch tested?
manual tests by beeline and set timeout to 10s.
0: jdbc:hive2://username:passwordthrift-server> select 123;
RSC client is executing SQL query: select 123, statementId = 681bc017-8f37-4665-a575-da355db77254, session = SessionHandle [c17f1729-6ee1-4260-b82b-aebec3b08e14]
Livy session has not yet started. Please wait for it to be ready...
Error: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds] (state=,code=0)
0: jdbc:hive2://username:passwordthrift-server> Closing: 0: jdbc:hive2://username:passwordthrift-server
Please review https://livy.incubator.apache.org/community/ before opening a pull request.
Author: jianzhen.wu <ji...@shopee.com>
Author: jianzhenwu <11...@users.noreply.github.com>
Closes #416 from jianzhenwu/LIVY-987.
---
.../thriftserver/LivyThriftSessionManager.scala | 6 +--
.../TestLivyThriftSessionManager.scala | 47 +++++++++++++++++++++-
2 files changed, 49 insertions(+), 4 deletions(-)
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
index 11294db8..54208a65 100644
--- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
@@ -50,7 +50,7 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
extends ThriftService(classOf[LivyThriftSessionManager].getName) with Logging {
private[thriftserver] val operationManager = new LivyOperationManager(this)
- private val sessionHandleToLivySession =
+ private[thriftserver] val sessionHandleToLivySession =
new ConcurrentHashMap[SessionHandle, Future[InteractiveSession]]()
// A map which returns how many incoming connections are open for a Livy session.
// This map tracks only the sessions created by the Livy thriftserver and not those which have
@@ -95,12 +95,12 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
if (!future.isCompleted) {
Try(Await.result(future, maxSessionWait)) match {
case Success(session) => session
- case Failure(e) => throw e.getCause
+ case Failure(e) => throw Option(e.getCause).getOrElse(e)
}
} else {
future.value match {
case Some(Success(session)) => session
- case Some(Failure(e)) => throw e.getCause
+ case Some(Failure(e)) => throw Option(e.getCause).getOrElse(e)
case None => throw new RuntimeException("Future cannot be None when it is completed")
}
}
diff --git a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/TestLivyThriftSessionManager.scala b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/TestLivyThriftSessionManager.scala
index fee801e3..11eea31f 100644
--- a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/TestLivyThriftSessionManager.scala
+++ b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/TestLivyThriftSessionManager.scala
@@ -17,15 +17,24 @@
package org.apache.livy.thriftserver
-import org.apache.hive.service.cli.HiveSQLException
+import java.util.concurrent.TimeoutException
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
+
+import org.apache.hive.service.cli.{HiveSQLException, SessionHandle}
import org.junit.Assert._
import org.junit.Test
import org.mockito.Mockito.mock
import org.apache.livy.LivyConf
import org.apache.livy.server.AccessManager
+import org.apache.livy.server.interactive.InteractiveSession
import org.apache.livy.server.recovery.{SessionStore, StateStore}
import org.apache.livy.sessions.InteractiveSessionManager
+import org.apache.livy.utils.Clock.sleep
object ConnectionLimitType extends Enumeration {
type ConnectionLimitType = Value
@@ -49,6 +58,18 @@ class TestLivyThriftSessionManager {
}
conf.set(entry, limit)
}
+ this.createThriftSessionManager(conf)
+ }
+
+ private def createThriftSessionManager(
+ maxSessionWait: Option[String]): LivyThriftSessionManager = {
+ val conf = new LivyConf()
+ conf.set(LivyConf.LIVY_SPARK_VERSION, sys.env("LIVY_SPARK_VERSION"))
+ maxSessionWait.foreach(conf.set(LivyConf.THRIFT_SESSION_CREATION_TIMEOUT, _))
+ this.createThriftSessionManager(conf)
+ }
+
+ private def createThriftSessionManager(conf: LivyConf): LivyThriftSessionManager = {
val server = new LivyThriftServer(
conf,
mock(classOf[InteractiveSessionManager]),
@@ -142,4 +163,28 @@ class TestLivyThriftSessionManager {
val msg = s"Connection limit per user reached (user: $user limit: 3)"
testLimit(thriftSessionMgr, user, ipAddress, forwardedAddresses, msg)
}
+
+ @Test(expected = classOf[TimeoutException])
+ def testGetLivySessionWaitForTimeout(): Unit = {
+ val thriftSessionMgr = createThriftSessionManager(Some("10ms"))
+ val sessionHandle = mock(classOf[SessionHandle])
+ val future = Future[InteractiveSession] {
+ sleep(100)
+ mock(classOf[InteractiveSession])
+ }
+ thriftSessionMgr.sessionHandleToLivySession.put(sessionHandle, future)
+ thriftSessionMgr.getLivySession(sessionHandle)
+ }
+
+ @Test(expected = classOf[TimeoutException])
+ def testGetLivySessionWithTimeoutException(): Unit = {
+ val thriftSessionMgr = createThriftSessionManager(None)
+ val sessionHandle = mock(classOf[SessionHandle])
+ val future = Future[InteractiveSession] {
+ throw new TimeoutException("Actively throw TimeoutException in Future.")
+ }
+ thriftSessionMgr.sessionHandleToLivySession.put(sessionHandle, future)
+ Await.ready(future, Duration(30, TimeUnit.SECONDS))
+ thriftSessionMgr.getLivySession(sessionHandle)
+ }
}