You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2023/02/25 06:04:46 UTC
[kyuubi] branch master updated: [KYUUBI #4412] Align the server session handle and engine session handle for Spark engine
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new c0241052a [KYUUBI #4412] Align the server session handle and engine session handle for Spark engine
c0241052a is described below
commit c0241052ae156971a07ee30d89bc4a02d6115d3e
Author: fwang12 <fw...@ebay.com>
AuthorDate: Sat Feb 25 14:04:38 2023 +0800
[KYUUBI #4412] Align the server session handle and engine session handle for Spark engine
### _Why are the changes needed?_
Align the server session handle and engine session handle for Spark engine.
It make it easy to recovery the engine session in any kyuubi instance easy.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4412 from turboFei/server_engine_handle_align.
Closes #4412
a20e0f155 [fwang12] fix
9d590e38b [fwang12] fix
94267e583 [fwang12] save
7012c2bef [fwang12] align
Authored-by: fwang12 <fw...@ebay.com>
Signed-off-by: fwang12 <fw...@ebay.com>
---
.../spark/session/SparkSQLSessionManager.scala | 31 ++++++++++++----------
.../engine/spark/session/SparkSessionImpl.scala | 5 +++-
.../apache/kyuubi/config/KyuubiReservedKeys.scala | 1 +
.../kyuubi/client/KyuubiSyncThriftClient.scala | 3 +++
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 7 ++---
.../operation/KyuubiOperationPerUserSuite.scala | 9 +++++++
6 files changed, 38 insertions(+), 18 deletions(-)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index cb8702df3..091149d21 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel._
import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine}
@@ -135,21 +136,23 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
password: String,
ipAddress: String,
conf: Map[String, String]): Session = {
- val sparkSession =
- try {
- getOrNewSparkSession(user)
- } catch {
- case e: Exception => throw KyuubiSQLException(e)
- }
+ getSessionOption(SessionHandle.fromUUID(conf(KYUUBI_SESSION_HANDLE_KEY))).getOrElse {
+ val sparkSession =
+ try {
+ getOrNewSparkSession(user)
+ } catch {
+ case e: Exception => throw KyuubiSQLException(e)
+ }
- new SparkSessionImpl(
- protocol,
- user,
- password,
- ipAddress,
- conf,
- this,
- sparkSession)
+ new SparkSessionImpl(
+ protocol,
+ user,
+ password,
+ ipAddress,
+ conf,
+ this,
+ sparkSession)
+ }
}
override def closeSession(sessionHandle: SessionHandle): Unit = {
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
index 5bf1ec084..30155c8f2 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
@@ -21,13 +21,14 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtoco
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.spark.events.SessionEvent
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
import org.apache.kyuubi.engine.spark.udf.KDFRegistry
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.operation.{Operation, OperationHandle}
-import org.apache.kyuubi.session.{AbstractSession, SessionManager}
+import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
class SparkSessionImpl(
protocol: TProtocolVersion,
@@ -39,6 +40,8 @@ class SparkSessionImpl(
val spark: SparkSession)
extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
+ override val handle: SessionHandle = SessionHandle.fromUUID(conf(KYUUBI_SESSION_HANDLE_KEY))
+
private def setModifiableConfig(key: String, value: String): Unit = {
try {
spark.conf.set(key, value)
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
index dfbdd9449..85874d3b9 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
@@ -33,6 +33,7 @@ object KyuubiReservedKeys {
final val KYUUBI_ENGINE_URL = "kyuubi.engine.url"
final val KYUUBI_ENGINE_SUBMIT_TIME_KEY = "kyuubi.engine.submit.time"
final val KYUUBI_ENGINE_CREDENTIALS_KEY = "kyuubi.engine.credentials"
+ final val KYUUBI_SESSION_HANDLE_KEY = "kyuubi.session.handle"
final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_GUID =
"kyuubi.session.engine.launch.handle.guid"
final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_SECRET =
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index babe73745..dbca8ca32 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -53,6 +53,9 @@ class KyuubiSyncThriftClient private (
private val lock = new ReentrantLock()
+ // Visible for testing.
+ private[kyuubi] def remoteSessionHandle: TSessionHandle = _remoteSessionHandle
+
@volatile private var _aliveProbeSessionHandle: TSessionHandle = _
@volatile private var remoteEngineBroken: Boolean = false
private val engineAliveProbeClient = engineAliveProbeProtocol.map(new TCLIService.Client(_))
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index ce94b0275..e4203b301 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -27,7 +27,7 @@ import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.client.KyuubiSyncThriftClient
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_CREDENTIALS_KEY, KYUUBI_SESSION_SIGN_PUBLICKEY, KYUUBI_SESSION_USER_SIGN}
+import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_CREDENTIALS_KEY, KYUUBI_SESSION_HANDLE_KEY, KYUUBI_SESSION_SIGN_PUBLICKEY, KYUUBI_SESSION_USER_SIGN}
import org.apache.kyuubi.engine.{EngineRef, KyuubiApplicationManager}
import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
import org.apache.kyuubi.ha.client.DiscoveryClientProvider._
@@ -121,11 +121,12 @@ class KyuubiSessionImpl(
private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = None): Unit =
handleSessionException {
withDiscoveryClient(sessionConf) { discoveryClient =>
- var openEngineSessionConf = optimizedConf
+ var openEngineSessionConf =
+ optimizedConf ++ Map(KYUUBI_SESSION_HANDLE_KEY -> handle.identifier.toString)
if (engineCredentials.nonEmpty) {
sessionConf.set(KYUUBI_ENGINE_CREDENTIALS_KEY, engineCredentials)
openEngineSessionConf =
- optimizedConf ++ Map(KYUUBI_ENGINE_CREDENTIALS_KEY -> engineCredentials)
+ openEngineSessionConf ++ Map(KYUUBI_ENGINE_CREDENTIALS_KEY -> engineCredentials)
}
if (sessionConf.get(SESSION_USER_SIGN_ENABLED)) {
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
index 2ece2379a..26fc89ba2 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
@@ -364,4 +364,13 @@ class KyuubiOperationPerUserSuite
val snapshot = MetricsSystem.histogramSnapshot(metric).get
assert(snapshot.getMax > 0 && snapshot.getMedian > 0)
}
+
+ test("align the server session handle and engine session handle for Spark engine") {
+ withJdbcStatement() { _ =>
+ val session =
+ server.backendService.sessionManager.allSessions().head.asInstanceOf[KyuubiSessionImpl]
+ val engineSessionHandle = SessionHandle.apply(session.client.remoteSessionHandle)
+ assert(session.handle === engineSessionHandle)
+ }
+ }
}