You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2023/01/19 04:36:16 UTC
[kyuubi] branch master updated: [KYUUBI #4185] Handle session exception for KyuubiSessionImpl::openEngineSession
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new b7efed958 [KYUUBI #4185] Handle session exception for KyuubiSessionImpl::openEngineSession
b7efed958 is described below
commit b7efed958fc0de5e281166b23f60cc351aedd9c2
Author: fwang12 <fw...@ebay.com>
AuthorDate: Thu Jan 19 12:36:07 2023 +0800
[KYUUBI #4185] Handle session exception for KyuubiSessionImpl::openEngineSession
### _Why are the changes needed?_
It is helpful for restful use case.
If the kyuubi session failed, customer can get the session info and check the exception.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4185 from turboFei/launchEngine_rest.
Closes #4185
1e6396454 [fwang12] comment
a534f3b7d [fwang12] fix ut
70631a294 [fwang12] save
67dd5b3a2 [fwang12] handle session exception when open engine session
Authored-by: fwang12 <fw...@ebay.com>
Signed-off-by: fwang12 <fw...@ebay.com>
---
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 136 +++++++++++----------
.../server/api/v1/SessionsResourceSuite.scala | 22 ++++
2 files changed, 91 insertions(+), 67 deletions(-)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index b66939096..93e6e9858 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -122,80 +122,82 @@ class KyuubiSessionImpl(
runOperation(launchEngineOp)
}
- private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = None): Unit = {
- withDiscoveryClient(sessionConf) { discoveryClient =>
- var openEngineSessionConf = optimizedConf
- if (engineCredentials.nonEmpty) {
- sessionConf.set(KYUUBI_ENGINE_CREDENTIALS_KEY, engineCredentials)
- openEngineSessionConf =
- optimizedConf ++ Map(KYUUBI_ENGINE_CREDENTIALS_KEY -> engineCredentials)
- }
+ private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = None): Unit =
+ handleSessionException {
+ withDiscoveryClient(sessionConf) { discoveryClient =>
+ var openEngineSessionConf = optimizedConf
+ if (engineCredentials.nonEmpty) {
+ sessionConf.set(KYUUBI_ENGINE_CREDENTIALS_KEY, engineCredentials)
+ openEngineSessionConf =
+ optimizedConf ++ Map(KYUUBI_ENGINE_CREDENTIALS_KEY -> engineCredentials)
+ }
- if (sessionConf.get(SESSION_USER_SIGN_ENABLED)) {
- openEngineSessionConf = openEngineSessionConf +
- (SESSION_USER_SIGN_ENABLED.key ->
- sessionConf.get(SESSION_USER_SIGN_ENABLED).toString) +
- (KYUUBI_SESSION_SIGN_PUBLICKEY ->
- Base64.getEncoder.encodeToString(
- sessionManager.signingPublicKey.getEncoded)) +
- (KYUUBI_SESSION_USER_SIGN -> sessionUserSignBase64)
- }
+ if (sessionConf.get(SESSION_USER_SIGN_ENABLED)) {
+ openEngineSessionConf = openEngineSessionConf +
+ (SESSION_USER_SIGN_ENABLED.key ->
+ sessionConf.get(SESSION_USER_SIGN_ENABLED).toString) +
+ (KYUUBI_SESSION_SIGN_PUBLICKEY ->
+ Base64.getEncoder.encodeToString(
+ sessionManager.signingPublicKey.getEncoded)) +
+ (KYUUBI_SESSION_USER_SIGN -> sessionUserSignBase64)
+ }
- val maxAttempts = sessionManager.getConf.get(ENGINE_OPEN_MAX_ATTEMPTS)
- val retryWait = sessionManager.getConf.get(ENGINE_OPEN_RETRY_WAIT)
- var attempt = 0
- var shouldRetry = true
- while (attempt <= maxAttempts && shouldRetry) {
- val (host, port) = engine.getOrCreate(discoveryClient, extraEngineLog)
- try {
- val passwd =
- if (sessionManager.getConf.get(ENGINE_SECURITY_ENABLED)) {
- InternalSecurityAccessor.get().issueToken()
- } else {
- Option(password).filter(_.nonEmpty).getOrElse("anonymous")
- }
- _client = KyuubiSyncThriftClient.createClient(user, passwd, host, port, sessionConf)
- _engineSessionHandle = _client.openSession(protocol, user, passwd, openEngineSessionConf)
- logSessionInfo(s"Connected to engine [$host:$port]/[${client.engineId.getOrElse("")}]" +
- s" with ${_engineSessionHandle}]")
- shouldRetry = false
- } catch {
- case e: org.apache.thrift.transport.TTransportException
- if attempt < maxAttempts && e.getCause.isInstanceOf[java.net.ConnectException] &&
- e.getCause.getMessage.contains("Connection refused (Connection refused)") =>
- warn(
- s"Failed to open [${engine.defaultEngineName} $host:$port] after" +
- s" $attempt/$maxAttempts times, retrying",
- e.getCause)
- Thread.sleep(retryWait)
- shouldRetry = true
- case e: Throwable =>
- error(
- s"Opening engine [${engine.defaultEngineName} $host:$port]" +
- s" for $user session failed",
- e)
- throw e
- } finally {
- attempt += 1
- if (shouldRetry && _client != null) {
- try {
- _client.closeSession()
- } catch {
- case e: Throwable =>
- warn(
- "Error on closing broken client of engine " +
- s"[${engine.defaultEngineName} $host:$port]",
- e)
+ val maxAttempts = sessionManager.getConf.get(ENGINE_OPEN_MAX_ATTEMPTS)
+ val retryWait = sessionManager.getConf.get(ENGINE_OPEN_RETRY_WAIT)
+ var attempt = 0
+ var shouldRetry = true
+ while (attempt <= maxAttempts && shouldRetry) {
+ val (host, port) = engine.getOrCreate(discoveryClient, extraEngineLog)
+ try {
+ val passwd =
+ if (sessionManager.getConf.get(ENGINE_SECURITY_ENABLED)) {
+ InternalSecurityAccessor.get().issueToken()
+ } else {
+ Option(password).filter(_.nonEmpty).getOrElse("anonymous")
+ }
+ _client = KyuubiSyncThriftClient.createClient(user, passwd, host, port, sessionConf)
+ _engineSessionHandle =
+ _client.openSession(protocol, user, passwd, openEngineSessionConf)
+ logSessionInfo(s"Connected to engine [$host:$port]/[${client.engineId.getOrElse("")}]" +
+ s" with ${_engineSessionHandle}]")
+ shouldRetry = false
+ } catch {
+ case e: org.apache.thrift.transport.TTransportException
+ if attempt < maxAttempts && e.getCause.isInstanceOf[java.net.ConnectException] &&
+ e.getCause.getMessage.contains("Connection refused (Connection refused)") =>
+ warn(
+ s"Failed to open [${engine.defaultEngineName} $host:$port] after" +
+ s" $attempt/$maxAttempts times, retrying",
+ e.getCause)
+ Thread.sleep(retryWait)
+ shouldRetry = true
+ case e: Throwable =>
+ error(
+ s"Opening engine [${engine.defaultEngineName} $host:$port]" +
+ s" for $user session failed",
+ e)
+ throw e
+ } finally {
+ attempt += 1
+ if (shouldRetry && _client != null) {
+ try {
+ _client.closeSession()
+ } catch {
+ case e: Throwable =>
+ warn(
+ "Error on closing broken client of engine " +
+ s"[${engine.defaultEngineName} $host:$port]",
+ e)
+ }
}
}
}
+ sessionEvent.openedTime = System.currentTimeMillis()
+ sessionEvent.remoteSessionId = _engineSessionHandle.identifier.toString
+ _client.engineId.foreach(e => sessionEvent.engineId = e)
+ EventBus.post(sessionEvent)
}
- sessionEvent.openedTime = System.currentTimeMillis()
- sessionEvent.remoteSessionId = _engineSessionHandle.identifier.toString
- _client.engineId.foreach(e => sessionEvent.engineId = e)
- EventBus.post(sessionEvent)
}
- }
override protected def runOperation(operation: Operation): OperationHandle = {
if (operation != launchEngineOp) {
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
index db5e1360b..1357245ad 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
@@ -31,6 +31,7 @@ import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
import org.apache.kyuubi.client.api.v1.dto._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_CONNECTION_URL_KEY
+import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.events.KyuubiSessionEvent
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.operation.OperationHandle
@@ -277,4 +278,25 @@ class SessionsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
.post(Entity.entity(getCrossReferenceReq, MediaType.APPLICATION_JSON_TYPE))
assert(404 == response.getStatus)
}
+
+ test("post session exception if failed to open engine session") {
+ val requestObj = new SessionOpenRequest(
+ 1,
+ Map(
+ "spark.master" -> "invalid",
+ KyuubiConf.ENGINE_SHARE_LEVEL.key -> ShareLevel.CONNECTION.toString).asJava)
+
+ var response = webTarget.path("api/v1/sessions")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
+
+ val sessionHandle = response.readEntity(classOf[SessionHandle]).getIdentifier
+
+ eventually(timeout(1.minutes), interval(200.milliseconds)) {
+ response = webTarget.path(s"api/v1/sessions/$sessionHandle").request().get()
+ // will meet json parse exception with response.readEntity(classOf[KyuubiSessionEvent])
+ val sessionEvent = response.readEntity(classOf[String])
+ assert(sessionEvent.contains("SparkException: Master"))
+ }
+ }
}