You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2023/02/27 12:57:29 UTC
[kyuubi] branch master updated: [KYUUBI #4412][FOLLOWUP] Align the server/engine session handle for flink/hive/trino/jdbc engines
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 0a7c45b8e [KYUUBI #4412][FOLLOWUP] Align the server/engine session handle for flink/hive/trino/jdbc engines
0a7c45b8e is described below
commit 0a7c45b8eec4998bb8ff35e65e2b805ea7370bf1
Author: fwang12 <fw...@ebay.com>
AuthorDate: Mon Feb 27 20:57:19 2023 +0800
[KYUUBI #4412][FOLLOWUP] Align the server/engine session handle for flink/hive/trino/jdbc engines
### _Why are the changes needed?_
#4412 follow up
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4422 from turboFei/align_session_id.
Closes #4412
319373296 [fwang12] save
Authored-by: fwang12 <fw...@ebay.com>
Signed-off-by: fwang12 <fw...@ebay.com>
---
.../flink/session/FlinkSQLSessionManager.scala | 20 +++++----
.../engine/flink/session/FlinkSessionImpl.scala | 6 ++-
.../engine/hive/session/HiveSessionManager.scala | 50 ++++++++++++----------
.../engine/jdbc/session/JdbcSessionImpl.scala | 6 ++-
.../engine/jdbc/session/JdbcSessionManager.scala | 6 ++-
.../engine/trino/session/TrinoSessionImpl.scala | 6 ++-
.../engine/trino/session/TrinoSessionManager.scala | 6 ++-
7 files changed, 65 insertions(+), 35 deletions(-)
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
index 8a3fc7446..07971e39f 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
@@ -21,6 +21,7 @@ import org.apache.flink.table.client.gateway.context.DefaultContext
import org.apache.flink.table.client.gateway.local.LocalExecutor
import org.apache.hive.service.rpc.thrift.TProtocolVersion
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
import org.apache.kyuubi.session.{Session, SessionHandle, SessionManager}
@@ -43,14 +44,17 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
password: String,
ipAddress: String,
conf: Map[String, String]): Session = {
- new FlinkSessionImpl(
- protocol,
- user,
- password,
- ipAddress,
- conf,
- this,
- executor)
+ conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap(
+ getSessionOption).getOrElse {
+ new FlinkSessionImpl(
+ protocol,
+ user,
+ password,
+ ipAddress,
+ conf,
+ this,
+ executor)
+ }
}
override def closeSession(sessionHandle: SessionHandle): Unit = {
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
index 03d9ce42e..75087b48c 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
@@ -26,8 +26,9 @@ import org.apache.flink.table.client.gateway.local.LocalExecutor
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion}
import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
-import org.apache.kyuubi.session.{AbstractSession, SessionManager}
+import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
class FlinkSessionImpl(
protocol: TProtocolVersion,
@@ -39,6 +40,9 @@ class FlinkSessionImpl(
val executor: LocalExecutor)
extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
+ override val handle: SessionHandle =
+ conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle())
+
lazy val sessionContext: SessionContext = {
FlinkEngineUtils.getSessionContext(executor, handle.identifier.toString)
}
diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala
index dc807429c..d09912770 100644
--- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala
+++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala
@@ -28,6 +28,7 @@ import org.apache.hive.service.cli.session.{HiveSessionImplwithUGI => ImportedHi
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.hive.HiveSQLEngine
import org.apache.kyuubi.engine.hive.operation.HiveOperationManager
@@ -72,33 +73,38 @@ class HiveSessionManager(engine: HiveSQLEngine) extends SessionManager("HiveSess
password: String,
ipAddress: String,
conf: Map[String, String]): Session = {
- val sessionHandle = SessionHandle()
- val hive = {
- val sessionWithUGI = new ImportedHiveSessionImpl(
- new ImportedSessionHandle(sessionHandle.toTSessionHandle, protocol),
+ conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap(
+ getSessionOption).getOrElse {
+ val sessionHandle =
+ conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle())
+ val hive = {
+ val sessionWithUGI = new ImportedHiveSessionImpl(
+ new ImportedSessionHandle(sessionHandle.toTSessionHandle, protocol),
+ protocol,
+ user,
+ password,
+ HiveSQLEngine.hiveConf,
+ ipAddress,
+ null,
+ Seq(ipAddress).asJava)
+ val proxy = HiveSessionProxy.getProxy(sessionWithUGI, sessionWithUGI.getSessionUgi)
+ sessionWithUGI.setProxySession(proxy)
+ proxy
+ }
+ hive.setSessionManager(internalSessionManager)
+ hive.setOperationManager(internalSessionManager.getOperationManager)
+ operationLogRoot.foreach(dir => hive.setOperationLogSessionDir(new File(dir)))
+ new HiveSessionImpl(
protocol,
user,
password,
- HiveSQLEngine.hiveConf,
ipAddress,
- null,
- Seq(ipAddress).asJava)
- val proxy = HiveSessionProxy.getProxy(sessionWithUGI, sessionWithUGI.getSessionUgi)
- sessionWithUGI.setProxySession(proxy)
- proxy
+ conf,
+ this,
+ sessionHandle,
+ hive)
}
- hive.setSessionManager(internalSessionManager)
- hive.setOperationManager(internalSessionManager.getOperationManager)
- operationLogRoot.foreach(dir => hive.setOperationLogSessionDir(new File(dir)))
- new HiveSessionImpl(
- protocol,
- user,
- password,
- ipAddress,
- conf,
- this,
- sessionHandle,
- hive)
+
}
override def closeSession(sessionHandle: SessionHandle): Unit = {
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionImpl.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionImpl.scala
index 63fb2dd07..f8cd40412 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionImpl.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionImpl.scala
@@ -23,8 +23,9 @@ import scala.util.{Failure, Success, Try}
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion}
import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.jdbc.connection.ConnectionProvider
-import org.apache.kyuubi.session.{AbstractSession, SessionManager}
+import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
class JdbcSessionImpl(
protocol: TProtocolVersion,
@@ -35,6 +36,9 @@ class JdbcSessionImpl(
sessionManager: SessionManager)
extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
+ override val handle: SessionHandle =
+ conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle())
+
private[jdbc] var sessionConnection: Connection = _
private var databaseMetaData: DatabaseMetaData = _
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionManager.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionManager.scala
index db8f60c3c..09958e050 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionManager.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionManager.scala
@@ -20,6 +20,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.jdbc.JdbcSQLEngine
import org.apache.kyuubi.engine.jdbc.operation.JdbcOperationManager
@@ -46,7 +47,10 @@ class JdbcSessionManager(name: String)
password: String,
ipAddress: String,
conf: Map[String, String]): Session = {
- new JdbcSessionImpl(protocol, user, password, ipAddress, conf, this)
+ conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap(
+ getSessionOption).getOrElse {
+ new JdbcSessionImpl(protocol, user, password, ipAddress, conf, this)
+ }
}
override def closeSession(sessionHandle: SessionHandle): Unit = {
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
index a19d74d58..81f973b1b 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
@@ -30,11 +30,12 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtoco
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.Utils.currentUser
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.trino.{TrinoConf, TrinoContext, TrinoStatement}
import org.apache.kyuubi.engine.trino.event.TrinoSessionEvent
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.operation.{Operation, OperationHandle}
-import org.apache.kyuubi.session.{AbstractSession, SessionManager}
+import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
class TrinoSessionImpl(
protocol: TProtocolVersion,
@@ -45,6 +46,9 @@ class TrinoSessionImpl(
sessionManager: SessionManager)
extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
+ override val handle: SessionHandle =
+ conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle())
+
var trinoContext: TrinoContext = _
private var clientSession: ClientSession = _
private var catalogName: String = null
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala
index 6d56d5c05..e18b8f758 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala
@@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.trino.session
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.trino.TrinoSqlEngine
import org.apache.kyuubi.engine.trino.operation.TrinoOperationManager
@@ -36,7 +37,10 @@ class TrinoSessionManager
password: String,
ipAddress: String,
conf: Map[String, String]): Session = {
- new TrinoSessionImpl(protocol, user, password, ipAddress, conf, this)
+ conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap(
+ getSessionOption).getOrElse {
+ new TrinoSessionImpl(protocol, user, password, ipAddress, conf, this)
+ }
}
override def closeSession(sessionHandle: SessionHandle): Unit = {