You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2023/02/25 06:04:46 UTC

[kyuubi] branch master updated: [KYUUBI #4412] Align the server session handle and engine session handle for Spark engine

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new c0241052a [KYUUBI #4412] Align the server session handle and engine session handle for Spark engine
c0241052a is described below

commit c0241052ae156971a07ee30d89bc4a02d6115d3e
Author: fwang12 <fw...@ebay.com>
AuthorDate: Sat Feb 25 14:04:38 2023 +0800

    [KYUUBI #4412] Align the server session handle and engine session handle for Spark engine
    
    ### _Why are the changes needed?_
    
    Align the server session handle and engine session handle for Spark engine.
    
    It make it easy to recovery the engine session in any kyuubi instance easy.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4412 from turboFei/server_engine_handle_align.
    
    Closes #4412
    
    a20e0f155 [fwang12] fix
    9d590e38b [fwang12] fix
    94267e583 [fwang12] save
    7012c2bef [fwang12] align
    
    Authored-by: fwang12 <fw...@ebay.com>
    Signed-off-by: fwang12 <fw...@ebay.com>
---
 .../spark/session/SparkSQLSessionManager.scala     | 31 ++++++++++++----------
 .../engine/spark/session/SparkSessionImpl.scala    |  5 +++-
 .../apache/kyuubi/config/KyuubiReservedKeys.scala  |  1 +
 .../kyuubi/client/KyuubiSyncThriftClient.scala     |  3 +++
 .../apache/kyuubi/session/KyuubiSessionImpl.scala  |  7 ++---
 .../operation/KyuubiOperationPerUserSuite.scala    |  9 +++++++
 6 files changed, 38 insertions(+), 18 deletions(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index cb8702df3..091149d21 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.SparkSession
 
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
 import org.apache.kyuubi.engine.ShareLevel
 import org.apache.kyuubi.engine.ShareLevel._
 import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine}
@@ -135,21 +136,23 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
       password: String,
       ipAddress: String,
       conf: Map[String, String]): Session = {
-    val sparkSession =
-      try {
-        getOrNewSparkSession(user)
-      } catch {
-        case e: Exception => throw KyuubiSQLException(e)
-      }
+    getSessionOption(SessionHandle.fromUUID(conf(KYUUBI_SESSION_HANDLE_KEY))).getOrElse {
+      val sparkSession =
+        try {
+          getOrNewSparkSession(user)
+        } catch {
+          case e: Exception => throw KyuubiSQLException(e)
+        }
 
-    new SparkSessionImpl(
-      protocol,
-      user,
-      password,
-      ipAddress,
-      conf,
-      this,
-      sparkSession)
+      new SparkSessionImpl(
+        protocol,
+        user,
+        password,
+        ipAddress,
+        conf,
+        this,
+        sparkSession)
+    }
   }
 
   override def closeSession(sessionHandle: SessionHandle): Unit = {
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
index 5bf1ec084..30155c8f2 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
@@ -21,13 +21,14 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtoco
 import org.apache.spark.sql.{AnalysisException, SparkSession}
 
 import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
 import org.apache.kyuubi.engine.spark.events.SessionEvent
 import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
 import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
 import org.apache.kyuubi.engine.spark.udf.KDFRegistry
 import org.apache.kyuubi.events.EventBus
 import org.apache.kyuubi.operation.{Operation, OperationHandle}
-import org.apache.kyuubi.session.{AbstractSession, SessionManager}
+import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
 
 class SparkSessionImpl(
     protocol: TProtocolVersion,
@@ -39,6 +40,8 @@ class SparkSessionImpl(
     val spark: SparkSession)
   extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
 
+  override val handle: SessionHandle = SessionHandle.fromUUID(conf(KYUUBI_SESSION_HANDLE_KEY))
+
   private def setModifiableConfig(key: String, value: String): Unit = {
     try {
       spark.conf.set(key, value)
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
index dfbdd9449..85874d3b9 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
@@ -33,6 +33,7 @@ object KyuubiReservedKeys {
   final val KYUUBI_ENGINE_URL = "kyuubi.engine.url"
   final val KYUUBI_ENGINE_SUBMIT_TIME_KEY = "kyuubi.engine.submit.time"
   final val KYUUBI_ENGINE_CREDENTIALS_KEY = "kyuubi.engine.credentials"
+  final val KYUUBI_SESSION_HANDLE_KEY = "kyuubi.session.handle"
   final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_GUID =
     "kyuubi.session.engine.launch.handle.guid"
   final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_SECRET =
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index babe73745..dbca8ca32 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -53,6 +53,9 @@ class KyuubiSyncThriftClient private (
 
   private val lock = new ReentrantLock()
 
+  // Visible for testing.
+  private[kyuubi] def remoteSessionHandle: TSessionHandle = _remoteSessionHandle
+
   @volatile private var _aliveProbeSessionHandle: TSessionHandle = _
   @volatile private var remoteEngineBroken: Boolean = false
   private val engineAliveProbeClient = engineAliveProbeProtocol.map(new TCLIService.Client(_))
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index ce94b0275..e4203b301 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -27,7 +27,7 @@ import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.client.KyuubiSyncThriftClient
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_CREDENTIALS_KEY, KYUUBI_SESSION_SIGN_PUBLICKEY, KYUUBI_SESSION_USER_SIGN}
+import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_CREDENTIALS_KEY, KYUUBI_SESSION_HANDLE_KEY, KYUUBI_SESSION_SIGN_PUBLICKEY, KYUUBI_SESSION_USER_SIGN}
 import org.apache.kyuubi.engine.{EngineRef, KyuubiApplicationManager}
 import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
 import org.apache.kyuubi.ha.client.DiscoveryClientProvider._
@@ -121,11 +121,12 @@ class KyuubiSessionImpl(
   private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = None): Unit =
     handleSessionException {
       withDiscoveryClient(sessionConf) { discoveryClient =>
-        var openEngineSessionConf = optimizedConf
+        var openEngineSessionConf =
+          optimizedConf ++ Map(KYUUBI_SESSION_HANDLE_KEY -> handle.identifier.toString)
         if (engineCredentials.nonEmpty) {
           sessionConf.set(KYUUBI_ENGINE_CREDENTIALS_KEY, engineCredentials)
           openEngineSessionConf =
-            optimizedConf ++ Map(KYUUBI_ENGINE_CREDENTIALS_KEY -> engineCredentials)
+            openEngineSessionConf ++ Map(KYUUBI_ENGINE_CREDENTIALS_KEY -> engineCredentials)
         }
 
         if (sessionConf.get(SESSION_USER_SIGN_ENABLED)) {
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
index 2ece2379a..26fc89ba2 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
@@ -364,4 +364,13 @@ class KyuubiOperationPerUserSuite
     val snapshot = MetricsSystem.histogramSnapshot(metric).get
     assert(snapshot.getMax > 0 && snapshot.getMedian > 0)
   }
+
+  test("align the server session handle and engine session handle for Spark engine") {
+    withJdbcStatement() { _ =>
+      val session =
+        server.backendService.sessionManager.allSessions().head.asInstanceOf[KyuubiSessionImpl]
+      val engineSessionHandle = SessionHandle.apply(session.client.remoteSessionHandle)
+      assert(session.handle === engineSessionHandle)
+    }
+  }
 }