You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2023/01/19 04:36:16 UTC

[kyuubi] branch master updated: [KYUUBI #4185] Handle session exception for KyuubiSessionImpl::openEngineSession

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new b7efed958 [KYUUBI #4185] Handle session exception for KyuubiSessionImpl::openEngineSession
b7efed958 is described below

commit b7efed958fc0de5e281166b23f60cc351aedd9c2
Author: fwang12 <fw...@ebay.com>
AuthorDate: Thu Jan 19 12:36:07 2023 +0800

    [KYUUBI #4185] Handle session exception for KyuubiSessionImpl::openEngineSession
    
    ### _Why are the changes needed?_
    
    It is helpful for restful use case.
    
    If the kyuubi session failed, customer can get the session info and check the exception.
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4185 from turboFei/launchEngine_rest.
    
    Closes #4185
    
    1e6396454 [fwang12] comment
    a534f3b7d [fwang12] fix ut
    70631a294 [fwang12] save
    67dd5b3a2 [fwang12] handle session exception when open engine session
    
    Authored-by: fwang12 <fw...@ebay.com>
    Signed-off-by: fwang12 <fw...@ebay.com>
---
 .../apache/kyuubi/session/KyuubiSessionImpl.scala  | 136 +++++++++++----------
 .../server/api/v1/SessionsResourceSuite.scala      |  22 ++++
 2 files changed, 91 insertions(+), 67 deletions(-)

diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index b66939096..93e6e9858 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -122,80 +122,82 @@ class KyuubiSessionImpl(
     runOperation(launchEngineOp)
   }
 
-  private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = None): Unit = {
-    withDiscoveryClient(sessionConf) { discoveryClient =>
-      var openEngineSessionConf = optimizedConf
-      if (engineCredentials.nonEmpty) {
-        sessionConf.set(KYUUBI_ENGINE_CREDENTIALS_KEY, engineCredentials)
-        openEngineSessionConf =
-          optimizedConf ++ Map(KYUUBI_ENGINE_CREDENTIALS_KEY -> engineCredentials)
-      }
+  private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = None): Unit =
+    handleSessionException {
+      withDiscoveryClient(sessionConf) { discoveryClient =>
+        var openEngineSessionConf = optimizedConf
+        if (engineCredentials.nonEmpty) {
+          sessionConf.set(KYUUBI_ENGINE_CREDENTIALS_KEY, engineCredentials)
+          openEngineSessionConf =
+            optimizedConf ++ Map(KYUUBI_ENGINE_CREDENTIALS_KEY -> engineCredentials)
+        }
 
-      if (sessionConf.get(SESSION_USER_SIGN_ENABLED)) {
-        openEngineSessionConf = openEngineSessionConf +
-          (SESSION_USER_SIGN_ENABLED.key ->
-            sessionConf.get(SESSION_USER_SIGN_ENABLED).toString) +
-          (KYUUBI_SESSION_SIGN_PUBLICKEY ->
-            Base64.getEncoder.encodeToString(
-              sessionManager.signingPublicKey.getEncoded)) +
-          (KYUUBI_SESSION_USER_SIGN -> sessionUserSignBase64)
-      }
+        if (sessionConf.get(SESSION_USER_SIGN_ENABLED)) {
+          openEngineSessionConf = openEngineSessionConf +
+            (SESSION_USER_SIGN_ENABLED.key ->
+              sessionConf.get(SESSION_USER_SIGN_ENABLED).toString) +
+            (KYUUBI_SESSION_SIGN_PUBLICKEY ->
+              Base64.getEncoder.encodeToString(
+                sessionManager.signingPublicKey.getEncoded)) +
+            (KYUUBI_SESSION_USER_SIGN -> sessionUserSignBase64)
+        }
 
-      val maxAttempts = sessionManager.getConf.get(ENGINE_OPEN_MAX_ATTEMPTS)
-      val retryWait = sessionManager.getConf.get(ENGINE_OPEN_RETRY_WAIT)
-      var attempt = 0
-      var shouldRetry = true
-      while (attempt <= maxAttempts && shouldRetry) {
-        val (host, port) = engine.getOrCreate(discoveryClient, extraEngineLog)
-        try {
-          val passwd =
-            if (sessionManager.getConf.get(ENGINE_SECURITY_ENABLED)) {
-              InternalSecurityAccessor.get().issueToken()
-            } else {
-              Option(password).filter(_.nonEmpty).getOrElse("anonymous")
-            }
-          _client = KyuubiSyncThriftClient.createClient(user, passwd, host, port, sessionConf)
-          _engineSessionHandle = _client.openSession(protocol, user, passwd, openEngineSessionConf)
-          logSessionInfo(s"Connected to engine [$host:$port]/[${client.engineId.getOrElse("")}]" +
-            s" with ${_engineSessionHandle}]")
-          shouldRetry = false
-        } catch {
-          case e: org.apache.thrift.transport.TTransportException
-              if attempt < maxAttempts && e.getCause.isInstanceOf[java.net.ConnectException] &&
-                e.getCause.getMessage.contains("Connection refused (Connection refused)") =>
-            warn(
-              s"Failed to open [${engine.defaultEngineName} $host:$port] after" +
-                s" $attempt/$maxAttempts times, retrying",
-              e.getCause)
-            Thread.sleep(retryWait)
-            shouldRetry = true
-          case e: Throwable =>
-            error(
-              s"Opening engine [${engine.defaultEngineName} $host:$port]" +
-                s" for $user session failed",
-              e)
-            throw e
-        } finally {
-          attempt += 1
-          if (shouldRetry && _client != null) {
-            try {
-              _client.closeSession()
-            } catch {
-              case e: Throwable =>
-                warn(
-                  "Error on closing broken client of engine " +
-                    s"[${engine.defaultEngineName} $host:$port]",
-                  e)
+        val maxAttempts = sessionManager.getConf.get(ENGINE_OPEN_MAX_ATTEMPTS)
+        val retryWait = sessionManager.getConf.get(ENGINE_OPEN_RETRY_WAIT)
+        var attempt = 0
+        var shouldRetry = true
+        while (attempt <= maxAttempts && shouldRetry) {
+          val (host, port) = engine.getOrCreate(discoveryClient, extraEngineLog)
+          try {
+            val passwd =
+              if (sessionManager.getConf.get(ENGINE_SECURITY_ENABLED)) {
+                InternalSecurityAccessor.get().issueToken()
+              } else {
+                Option(password).filter(_.nonEmpty).getOrElse("anonymous")
+              }
+            _client = KyuubiSyncThriftClient.createClient(user, passwd, host, port, sessionConf)
+            _engineSessionHandle =
+              _client.openSession(protocol, user, passwd, openEngineSessionConf)
+            logSessionInfo(s"Connected to engine [$host:$port]/[${client.engineId.getOrElse("")}]" +
+              s" with ${_engineSessionHandle}]")
+            shouldRetry = false
+          } catch {
+            case e: org.apache.thrift.transport.TTransportException
+                if attempt < maxAttempts && e.getCause.isInstanceOf[java.net.ConnectException] &&
+                  e.getCause.getMessage.contains("Connection refused (Connection refused)") =>
+              warn(
+                s"Failed to open [${engine.defaultEngineName} $host:$port] after" +
+                  s" $attempt/$maxAttempts times, retrying",
+                e.getCause)
+              Thread.sleep(retryWait)
+              shouldRetry = true
+            case e: Throwable =>
+              error(
+                s"Opening engine [${engine.defaultEngineName} $host:$port]" +
+                  s" for $user session failed",
+                e)
+              throw e
+          } finally {
+            attempt += 1
+            if (shouldRetry && _client != null) {
+              try {
+                _client.closeSession()
+              } catch {
+                case e: Throwable =>
+                  warn(
+                    "Error on closing broken client of engine " +
+                      s"[${engine.defaultEngineName} $host:$port]",
+                    e)
+              }
             }
           }
         }
+        sessionEvent.openedTime = System.currentTimeMillis()
+        sessionEvent.remoteSessionId = _engineSessionHandle.identifier.toString
+        _client.engineId.foreach(e => sessionEvent.engineId = e)
+        EventBus.post(sessionEvent)
       }
-      sessionEvent.openedTime = System.currentTimeMillis()
-      sessionEvent.remoteSessionId = _engineSessionHandle.identifier.toString
-      _client.engineId.foreach(e => sessionEvent.engineId = e)
-      EventBus.post(sessionEvent)
     }
-  }
 
   override protected def runOperation(operation: Operation): OperationHandle = {
     if (operation != launchEngineOp) {
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
index db5e1360b..1357245ad 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
@@ -31,6 +31,7 @@ import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
 import org.apache.kyuubi.client.api.v1.dto._
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_CONNECTION_URL_KEY
+import org.apache.kyuubi.engine.ShareLevel
 import org.apache.kyuubi.events.KyuubiSessionEvent
 import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
 import org.apache.kyuubi.operation.OperationHandle
@@ -277,4 +278,25 @@ class SessionsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
       .post(Entity.entity(getCrossReferenceReq, MediaType.APPLICATION_JSON_TYPE))
     assert(404 == response.getStatus)
   }
+
+  test("post session exception if failed to open engine session") {
+    val requestObj = new SessionOpenRequest(
+      1,
+      Map(
+        "spark.master" -> "invalid",
+        KyuubiConf.ENGINE_SHARE_LEVEL.key -> ShareLevel.CONNECTION.toString).asJava)
+
+    var response = webTarget.path("api/v1/sessions")
+      .request(MediaType.APPLICATION_JSON_TYPE)
+      .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
+
+    val sessionHandle = response.readEntity(classOf[SessionHandle]).getIdentifier
+
+    eventually(timeout(1.minutes), interval(200.milliseconds)) {
+      response = webTarget.path(s"api/v1/sessions/$sessionHandle").request().get()
+      // will meet json parse exception with response.readEntity(classOf[KyuubiSessionEvent])
+      val sessionEvent = response.readEntity(classOf[String])
+      assert(sessionEvent.contains("SparkException: Master"))
+    }
+  }
 }