You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by mg...@apache.org on 2023/09/05 09:57:57 UTC

[incubator-livy] branch master updated: [LIVY-987] NPE when waiting for thrift session to start timeout.

This is an automated email from the ASF dual-hosted git repository.

mgaido pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new bf30e3bd [LIVY-987] NPE when waiting for thrift session to start timeout.
bf30e3bd is described below

commit bf30e3bd3105f72a6a7fd2a276a16dabe8bdf104
Author: jianzhen.wu <ji...@shopee.com>
AuthorDate: Tue Sep 5 11:57:36 2023 +0200

    [LIVY-987] NPE when waiting for thrift session to start timeout.
    
    ## What changes were proposed in this pull request?
    Fix NPE when waiting for thrift session to start timeout.
    https://issues.apache.org/jira/browse/LIVY-987
    
    ## How was this patch tested?
    manual tests by beeline and set timeout to 10s.
    
    0: jdbc:hive2://username:passwordthrift-server> select 123;
    
    RSC client is executing SQL query: select 123, statementId = 681bc017-8f37-4665-a575-da355db77254, session = SessionHandle [c17f1729-6ee1-4260-b82b-aebec3b08e14]
    
    Livy session has not yet started. Please wait for it to be ready...
    
    Error: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds] (state=,code=0)
    
    0: jdbc:hive2://username:passwordthrift-server> Closing: 0: jdbc:hive2://username:passwordthrift-server
    
    Please review https://livy.incubator.apache.org/community/ before opening a pull request.
    
    Author: jianzhen.wu <ji...@shopee.com>
    Author: jianzhenwu <11...@users.noreply.github.com>
    
    Closes #416 from jianzhenwu/LIVY-987.
---
 .../thriftserver/LivyThriftSessionManager.scala    |  6 +--
 .../TestLivyThriftSessionManager.scala             | 47 +++++++++++++++++++++-
 2 files changed, 49 insertions(+), 4 deletions(-)

diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
index 11294db8..54208a65 100644
--- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
@@ -50,7 +50,7 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
   extends ThriftService(classOf[LivyThriftSessionManager].getName) with Logging {
 
   private[thriftserver] val operationManager = new LivyOperationManager(this)
-  private val sessionHandleToLivySession =
+  private[thriftserver] val sessionHandleToLivySession =
     new ConcurrentHashMap[SessionHandle, Future[InteractiveSession]]()
   // A map which returns how many incoming connections are open for a Livy session.
   // This map tracks only the sessions created by the Livy thriftserver and not those which have
@@ -95,12 +95,12 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
     if (!future.isCompleted) {
       Try(Await.result(future, maxSessionWait)) match {
         case Success(session) => session
-        case Failure(e) => throw e.getCause
+        case Failure(e) => throw Option(e.getCause).getOrElse(e)
       }
     } else {
       future.value match {
         case Some(Success(session)) => session
-        case Some(Failure(e)) => throw e.getCause
+        case Some(Failure(e)) => throw Option(e.getCause).getOrElse(e)
         case None => throw new RuntimeException("Future cannot be None when it is completed")
       }
     }
diff --git a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/TestLivyThriftSessionManager.scala b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/TestLivyThriftSessionManager.scala
index fee801e3..11eea31f 100644
--- a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/TestLivyThriftSessionManager.scala
+++ b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/TestLivyThriftSessionManager.scala
@@ -17,15 +17,24 @@
 
 package org.apache.livy.thriftserver
 
-import org.apache.hive.service.cli.HiveSQLException
+import java.util.concurrent.TimeoutException
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
+
+import org.apache.hive.service.cli.{HiveSQLException, SessionHandle}
 import org.junit.Assert._
 import org.junit.Test
 import org.mockito.Mockito.mock
 
 import org.apache.livy.LivyConf
 import org.apache.livy.server.AccessManager
+import org.apache.livy.server.interactive.InteractiveSession
 import org.apache.livy.server.recovery.{SessionStore, StateStore}
 import org.apache.livy.sessions.InteractiveSessionManager
+import org.apache.livy.utils.Clock.sleep
 
 object ConnectionLimitType extends Enumeration {
   type ConnectionLimitType = Value
@@ -49,6 +58,18 @@ class TestLivyThriftSessionManager {
       }
       conf.set(entry, limit)
     }
+    this.createThriftSessionManager(conf)
+  }
+
+  private def createThriftSessionManager(
+      maxSessionWait: Option[String]): LivyThriftSessionManager = {
+    val conf = new LivyConf()
+    conf.set(LivyConf.LIVY_SPARK_VERSION, sys.env("LIVY_SPARK_VERSION"))
+    maxSessionWait.foreach(conf.set(LivyConf.THRIFT_SESSION_CREATION_TIMEOUT, _))
+    this.createThriftSessionManager(conf)
+  }
+
+  private def createThriftSessionManager(conf: LivyConf): LivyThriftSessionManager = {
     val server = new LivyThriftServer(
       conf,
       mock(classOf[InteractiveSessionManager]),
@@ -142,4 +163,28 @@ class TestLivyThriftSessionManager {
     val msg = s"Connection limit per user reached (user: $user limit: 3)"
     testLimit(thriftSessionMgr, user, ipAddress, forwardedAddresses, msg)
   }
+
+  @Test(expected = classOf[TimeoutException])
+  def testGetLivySessionWaitForTimeout(): Unit = {
+    val thriftSessionMgr = createThriftSessionManager(Some("10ms"))
+    val sessionHandle = mock(classOf[SessionHandle])
+    val future = Future[InteractiveSession] {
+      sleep(100)
+      mock(classOf[InteractiveSession])
+    }
+    thriftSessionMgr.sessionHandleToLivySession.put(sessionHandle, future)
+    thriftSessionMgr.getLivySession(sessionHandle)
+  }
+
+  @Test(expected = classOf[TimeoutException])
+  def testGetLivySessionWithTimeoutException(): Unit = {
+    val thriftSessionMgr = createThriftSessionManager(None)
+    val sessionHandle = mock(classOf[SessionHandle])
+    val future = Future[InteractiveSession] {
+      throw new TimeoutException("Actively throw TimeoutException in Future.")
+    }
+    thriftSessionMgr.sessionHandleToLivySession.put(sessionHandle, future)
+    Await.ready(future, Duration(30, TimeUnit.SECONDS))
+    thriftSessionMgr.getLivySession(sessionHandle)
+  }
 }