You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2023/02/27 12:57:29 UTC

[kyuubi] branch master updated: [KYUUBI #4412][FOLLOWUP] Align the server/engine session handle for flink/hive/trino/jdbc engines

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a7c45b8e [KYUUBI #4412][FOLLOWUP] Align the server/engine session handle for flink/hive/trino/jdbc engines
0a7c45b8e is described below

commit 0a7c45b8eec4998bb8ff35e65e2b805ea7370bf1
Author: fwang12 <fw...@ebay.com>
AuthorDate: Mon Feb 27 20:57:19 2023 +0800

    [KYUUBI #4412][FOLLOWUP] Align the server/engine session handle for flink/hive/trino/jdbc engines
    
    ### _Why are the changes needed?_
    
    #4412 follow up
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4422 from turboFei/align_session_id.
    
    Closes #4412
    
    319373296 [fwang12] save
    
    Authored-by: fwang12 <fw...@ebay.com>
    Signed-off-by: fwang12 <fw...@ebay.com>
---
 .../flink/session/FlinkSQLSessionManager.scala     | 20 +++++----
 .../engine/flink/session/FlinkSessionImpl.scala    |  6 ++-
 .../engine/hive/session/HiveSessionManager.scala   | 50 ++++++++++++----------
 .../engine/jdbc/session/JdbcSessionImpl.scala      |  6 ++-
 .../engine/jdbc/session/JdbcSessionManager.scala   |  6 ++-
 .../engine/trino/session/TrinoSessionImpl.scala    |  6 ++-
 .../engine/trino/session/TrinoSessionManager.scala |  6 ++-
 7 files changed, 65 insertions(+), 35 deletions(-)

diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
index 8a3fc7446..07971e39f 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
@@ -21,6 +21,7 @@ import org.apache.flink.table.client.gateway.context.DefaultContext
 import org.apache.flink.table.client.gateway.local.LocalExecutor
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
 import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
 import org.apache.kyuubi.session.{Session, SessionHandle, SessionManager}
 
@@ -43,14 +44,17 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
       password: String,
       ipAddress: String,
       conf: Map[String, String]): Session = {
-    new FlinkSessionImpl(
-      protocol,
-      user,
-      password,
-      ipAddress,
-      conf,
-      this,
-      executor)
+    conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap(
+      getSessionOption).getOrElse {
+      new FlinkSessionImpl(
+        protocol,
+        user,
+        password,
+        ipAddress,
+        conf,
+        this,
+        executor)
+    }
   }
 
   override def closeSession(sessionHandle: SessionHandle): Unit = {
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
index 03d9ce42e..75087b48c 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
@@ -26,8 +26,9 @@ import org.apache.flink.table.client.gateway.local.LocalExecutor
 import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion}
 
 import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
 import org.apache.kyuubi.engine.flink.FlinkEngineUtils
-import org.apache.kyuubi.session.{AbstractSession, SessionManager}
+import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
 
 class FlinkSessionImpl(
     protocol: TProtocolVersion,
@@ -39,6 +40,9 @@ class FlinkSessionImpl(
     val executor: LocalExecutor)
   extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
 
+  override val handle: SessionHandle =
+    conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle())
+
   lazy val sessionContext: SessionContext = {
     FlinkEngineUtils.getSessionContext(executor, handle.identifier.toString)
   }
diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala
index dc807429c..d09912770 100644
--- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala
+++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala
@@ -28,6 +28,7 @@ import org.apache.hive.service.cli.session.{HiveSessionImplwithUGI => ImportedHi
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
 import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
 import org.apache.kyuubi.engine.ShareLevel
 import org.apache.kyuubi.engine.hive.HiveSQLEngine
 import org.apache.kyuubi.engine.hive.operation.HiveOperationManager
@@ -72,33 +73,38 @@ class HiveSessionManager(engine: HiveSQLEngine) extends SessionManager("HiveSess
       password: String,
       ipAddress: String,
       conf: Map[String, String]): Session = {
-    val sessionHandle = SessionHandle()
-    val hive = {
-      val sessionWithUGI = new ImportedHiveSessionImpl(
-        new ImportedSessionHandle(sessionHandle.toTSessionHandle, protocol),
+    conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap(
+      getSessionOption).getOrElse {
+      val sessionHandle =
+        conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle())
+      val hive = {
+        val sessionWithUGI = new ImportedHiveSessionImpl(
+          new ImportedSessionHandle(sessionHandle.toTSessionHandle, protocol),
+          protocol,
+          user,
+          password,
+          HiveSQLEngine.hiveConf,
+          ipAddress,
+          null,
+          Seq(ipAddress).asJava)
+        val proxy = HiveSessionProxy.getProxy(sessionWithUGI, sessionWithUGI.getSessionUgi)
+        sessionWithUGI.setProxySession(proxy)
+        proxy
+      }
+      hive.setSessionManager(internalSessionManager)
+      hive.setOperationManager(internalSessionManager.getOperationManager)
+      operationLogRoot.foreach(dir => hive.setOperationLogSessionDir(new File(dir)))
+      new HiveSessionImpl(
         protocol,
         user,
         password,
-        HiveSQLEngine.hiveConf,
         ipAddress,
-        null,
-        Seq(ipAddress).asJava)
-      val proxy = HiveSessionProxy.getProxy(sessionWithUGI, sessionWithUGI.getSessionUgi)
-      sessionWithUGI.setProxySession(proxy)
-      proxy
+        conf,
+        this,
+        sessionHandle,
+        hive)
     }
-    hive.setSessionManager(internalSessionManager)
-    hive.setOperationManager(internalSessionManager.getOperationManager)
-    operationLogRoot.foreach(dir => hive.setOperationLogSessionDir(new File(dir)))
-    new HiveSessionImpl(
-      protocol,
-      user,
-      password,
-      ipAddress,
-      conf,
-      this,
-      sessionHandle,
-      hive)
+
   }
 
   override def closeSession(sessionHandle: SessionHandle): Unit = {
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionImpl.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionImpl.scala
index 63fb2dd07..f8cd40412 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionImpl.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionImpl.scala
@@ -23,8 +23,9 @@ import scala.util.{Failure, Success, Try}
 import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion}
 
 import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
 import org.apache.kyuubi.engine.jdbc.connection.ConnectionProvider
-import org.apache.kyuubi.session.{AbstractSession, SessionManager}
+import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
 
 class JdbcSessionImpl(
     protocol: TProtocolVersion,
@@ -35,6 +36,9 @@ class JdbcSessionImpl(
     sessionManager: SessionManager)
   extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
 
+  override val handle: SessionHandle =
+    conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle())
+
   private[jdbc] var sessionConnection: Connection = _
 
   private var databaseMetaData: DatabaseMetaData = _
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionManager.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionManager.scala
index db8f60c3c..09958e050 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionManager.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionManager.scala
@@ -20,6 +20,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
 import org.apache.kyuubi.engine.ShareLevel
 import org.apache.kyuubi.engine.jdbc.JdbcSQLEngine
 import org.apache.kyuubi.engine.jdbc.operation.JdbcOperationManager
@@ -46,7 +47,10 @@ class JdbcSessionManager(name: String)
       password: String,
       ipAddress: String,
       conf: Map[String, String]): Session = {
-    new JdbcSessionImpl(protocol, user, password, ipAddress, conf, this)
+    conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap(
+      getSessionOption).getOrElse {
+      new JdbcSessionImpl(protocol, user, password, ipAddress, conf, this)
+    }
   }
 
   override def closeSession(sessionHandle: SessionHandle): Unit = {
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
index a19d74d58..81f973b1b 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
@@ -30,11 +30,12 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtoco
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.Utils.currentUser
 import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
 import org.apache.kyuubi.engine.trino.{TrinoConf, TrinoContext, TrinoStatement}
 import org.apache.kyuubi.engine.trino.event.TrinoSessionEvent
 import org.apache.kyuubi.events.EventBus
 import org.apache.kyuubi.operation.{Operation, OperationHandle}
-import org.apache.kyuubi.session.{AbstractSession, SessionManager}
+import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
 
 class TrinoSessionImpl(
     protocol: TProtocolVersion,
@@ -45,6 +46,9 @@ class TrinoSessionImpl(
     sessionManager: SessionManager)
   extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
 
+  override val handle: SessionHandle =
+    conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle())
+
   var trinoContext: TrinoContext = _
   private var clientSession: ClientSession = _
   private var catalogName: String = null
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala
index 6d56d5c05..e18b8f758 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala
@@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.trino.session
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
 import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
 import org.apache.kyuubi.engine.ShareLevel
 import org.apache.kyuubi.engine.trino.TrinoSqlEngine
 import org.apache.kyuubi.engine.trino.operation.TrinoOperationManager
@@ -36,7 +37,10 @@ class TrinoSessionManager
       password: String,
       ipAddress: String,
       conf: Map[String, String]): Session = {
-    new TrinoSessionImpl(protocol, user, password, ipAddress, conf, this)
+    conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap(
+      getSessionOption).getOrElse {
+      new TrinoSessionImpl(protocol, user, password, ipAddress, conf, this)
+    }
   }
 
   override def closeSession(sessionHandle: SessionHandle): Unit = {