You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2022/04/13 10:11:09 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2331] Add createSession method to further abstract openSession
This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 8a525e520 [KYUUBI #2331] Add createSession method to further abstract openSession
8a525e520 is described below
commit 8a525e520383278e88c7ffcef87931dc1601253c
Author: wforget <64...@qq.com>
AuthorDate: Wed Apr 13 18:10:59 2022 +0800
[KYUUBI #2331] Add createSession method to further abstract openSession
### _Why are the changes needed?_
close #2331
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2332 from wForget/KYUUBI-2331.
Closes #2331
ce7f32e8 [wforget] fix
b74efd80 [wforget] [KYUUBI-2331] Add createSession method to further abstract openSession
Authored-by: wforget <64...@qq.com>
Signed-off-by: ulysses-you <ul...@apache.org>
---
.../flink/session/FlinkSQLSessionManager.scala | 21 +++-----------
.../engine/hive/session/HiveSessionManager.scala | 21 +++-----------
.../spark/session/SparkSQLSessionManager.scala | 19 ++-----------
.../engine/trino/session/TrinoSessionManager.scala | 25 +++--------------
.../org/apache/kyuubi/session/SessionManager.scala | 29 +++++++++++++++++++-
.../apache/kyuubi/session/NoopSessionManager.scala | 8 ++----
.../kyuubi/session/KyuubiSessionManager.scala | 32 ++++++++++------------
7 files changed, 60 insertions(+), 95 deletions(-)
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
index 4e4f49001..b073fc9cb 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
@@ -21,10 +21,9 @@ import org.apache.flink.table.client.gateway.context.DefaultContext
import org.apache.flink.table.client.gateway.local.LocalExecutor
import org.apache.hive.service.rpc.thrift.TProtocolVersion
-import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
-import org.apache.kyuubi.session.{SessionHandle, SessionManager}
+import org.apache.kyuubi.session.{Session, SessionHandle, SessionManager}
class FlinkSQLSessionManager(engineContext: DefaultContext)
extends SessionManager("FlinkSQLSessionManager") {
@@ -39,12 +38,12 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
executor.start()
}
- override def openSession(
+ override protected def createSession(
protocol: TProtocolVersion,
user: String,
password: String,
ipAddress: String,
- conf: Map[String, String]): SessionHandle = {
+ conf: Map[String, String]): Session = {
val sessionHandle = SessionHandle(protocol)
val sessionId = sessionHandle.identifier.toString
@@ -52,7 +51,7 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
executor.openSession(sessionId)
val sessionContext = FlinkEngineUtils.getSessionContext(executor, sessionId)
- val sessionImpl = new FlinkSessionImpl(
+ new FlinkSessionImpl(
protocol,
user,
password,
@@ -61,18 +60,6 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
this,
sessionHandle,
sessionContext)
-
- try {
- sessionImpl.open()
- setSession(sessionHandle, sessionImpl)
- info(s"$user's session with $sessionHandle is opened, current opening sessions" +
- s" $getOpenSessionCount")
- sessionHandle
- } catch {
- case e: Exception =>
- sessionImpl.close()
- throw KyuubiSQLException(e)
- }
}
override def closeSession(sessionHandle: SessionHandle): Unit = {
diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala
index 2c74a735b..5f23f74a7 100644
--- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala
+++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala
@@ -25,13 +25,12 @@ import org.apache.hive.service.cli.{SessionHandle => ImportedSessionHandle}
import org.apache.hive.service.cli.session.{HiveSessionImpl => ImportedHiveSessionImpl, SessionManager => ImportedHiveSessionManager}
import org.apache.hive.service.rpc.thrift.TProtocolVersion
-import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.hive.HiveSQLEngine
import org.apache.kyuubi.engine.hive.operation.HiveOperationManager
import org.apache.kyuubi.operation.OperationManager
-import org.apache.kyuubi.session.{CLIENT_IP_KEY, SessionHandle, SessionManager}
+import org.apache.kyuubi.session.{CLIENT_IP_KEY, Session, SessionHandle, SessionManager}
class HiveSessionManager(engine: HiveSQLEngine) extends SessionManager("HiveSessionManager") {
override protected def isServer: Boolean = false
@@ -65,15 +64,14 @@ class HiveSessionManager(engine: HiveSQLEngine) extends SessionManager("HiveSess
}
}
- override def openSession(
+ override protected def createSession(
protocol: TProtocolVersion,
user: String,
password: String,
ipAddress: String,
- conf: Map[String, String]): SessionHandle = {
+ conf: Map[String, String]): Session = {
val sessionHandle = SessionHandle(protocol)
val clientIp = conf.getOrElse(CLIENT_IP_KEY, ipAddress)
- info(s"Opening session for $user@$clientIp")
val hive = new ImportedHiveSessionImpl(
new ImportedSessionHandle(sessionHandle.toTSessionHandle, protocol),
protocol,
@@ -85,7 +83,7 @@ class HiveSessionManager(engine: HiveSQLEngine) extends SessionManager("HiveSess
hive.setSessionManager(internalSessionManager)
hive.setOperationManager(internalSessionManager.getOperationManager)
operationLogRoot.foreach(dir => hive.setOperationLogSessionDir(new File(dir)))
- val session = new HiveSessionImpl(
+ new HiveSessionImpl(
protocol,
user,
password,
@@ -95,17 +93,6 @@ class HiveSessionManager(engine: HiveSQLEngine) extends SessionManager("HiveSess
this,
sessionHandle,
hive)
- try {
- session.open()
- setSession(sessionHandle, session)
- info(s"$user's session with $sessionHandle is opened, current opening sessions" +
- s" $getOpenSessionCount")
- sessionHandle
- } catch {
- case e: Exception =>
- session.close()
- throw KyuubiSQLException(e)
- }
}
override def closeSession(sessionHandle: SessionHandle): Unit = {
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index fc039d851..4e11f9bfd 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -51,14 +51,13 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
private lazy val singleSparkSession = conf.get(ENGINE_SINGLE_SPARK_SESSION)
- override def openSession(
+ override protected def createSession(
protocol: TProtocolVersion,
user: String,
password: String,
ipAddress: String,
- conf: Map[String, String]): SessionHandle = {
+ conf: Map[String, String]): Session = {
val clientIp = conf.getOrElse(CLIENT_IP_KEY, ipAddress)
- info(s"Opening session for $user@$clientIp")
val sparkSession =
try {
if (singleSparkSession) {
@@ -80,7 +79,7 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
case e: Exception => throw KyuubiSQLException(e)
}
- val session = new SparkSessionImpl(
+ new SparkSessionImpl(
protocol,
user,
password,
@@ -89,18 +88,6 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
conf,
this,
sparkSession)
- try {
- val handle = session.handle
- session.open()
- setSession(handle, session)
- info(s"$user's session with $handle is opened, current opening sessions" +
- s" $getOpenSessionCount")
- handle
- } catch {
- case e: Exception =>
- session.close()
- throw KyuubiSQLException(e)
- }
}
override def closeSession(sessionHandle: SessionHandle): Unit = {
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala
index 83af04a03..aa1823bc7 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala
@@ -19,7 +19,6 @@ package org.apache.kyuubi.engine.trino.session
import org.apache.hive.service.rpc.thrift.TProtocolVersion
-import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_OPERATION_LOG_DIR_ROOT
@@ -27,8 +26,7 @@ import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.trino.TrinoSqlEngine
import org.apache.kyuubi.engine.trino.operation.TrinoOperationManager
-import org.apache.kyuubi.session.SessionHandle
-import org.apache.kyuubi.session.SessionManager
+import org.apache.kyuubi.session.{Session, SessionHandle, SessionManager}
class TrinoSessionManager
extends SessionManager("TrinoSessionManager") {
@@ -41,28 +39,13 @@ class TrinoSessionManager
super.initialize(conf)
}
- override def openSession(
+ override protected def createSession(
protocol: TProtocolVersion,
user: String,
password: String,
ipAddress: String,
- conf: Map[String, String]): SessionHandle = {
- info(s"Opening session for $user@$ipAddress")
- val sessionImpl =
- new TrinoSessionImpl(protocol, user, password, ipAddress, conf, this)
-
- try {
- val handle = sessionImpl.handle
- sessionImpl.open()
- setSession(handle, sessionImpl)
- info(s"$user's trino session with $handle is opened, current opening sessions" +
- s" $getOpenSessionCount")
- handle
- } catch {
- case e: Exception =>
- sessionImpl.close()
- throw KyuubiSQLException(e)
- }
+ conf: Map[String, String]): Session = {
+ new TrinoSessionImpl(protocol, user, password, ipAddress, conf, this)
}
override def closeSession(sessionHandle: SessionHandle): Unit = {
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
index dfacc9ece..992902c40 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
@@ -77,12 +77,39 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
def operationManager: OperationManager
+ protected def createSession(
+ protocol: TProtocolVersion,
+ user: String,
+ password: String,
+ ipAddress: String,
+ conf: Map[String, String]): Session
+
def openSession(
protocol: TProtocolVersion,
user: String,
password: String,
ipAddress: String,
- conf: Map[String, String]): SessionHandle
+ conf: Map[String, String]): SessionHandle = {
+ info(s"Opening session for $user@$ipAddress")
+ val session = createSession(protocol, user, password, ipAddress, conf)
+ try {
+ val handle = session.handle
+ session.open()
+ setSession(handle, session)
+ info(s"$user's session with $handle is opened, current opening sessions" +
+ s" $getOpenSessionCount")
+ handle
+ } catch {
+ case e: Exception =>
+ try {
+ session.close()
+ } catch {
+ case t: Throwable =>
+ warn(s"Error closing session for $user client ip: $ipAddress", t)
+ }
+ throw KyuubiSQLException(e)
+ }
+ }
def closeSession(sessionHandle: SessionHandle): Unit = {
_latestLogoutTime = System.currentTimeMillis()
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala
index 5d14c045d..392759ede 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala
@@ -35,18 +35,16 @@ class NoopSessionManager extends SessionManager("noop") {
super.initialize(conf)
}
- override def openSession(
+ override protected def createSession(
protocol: TProtocolVersion,
user: String,
password: String,
ipAddress: String,
- conf: Map[String, String]): SessionHandle = {
+ conf: Map[String, String]): Session = {
if (conf.get("kyuubi.test.should.fail").exists(_.toBoolean)) {
throw KyuubiSQLException("Asked to fail")
}
- val session = new NoopSessionImpl(protocol, user, password, ipAddress, conf, this)
- setSession(session.handle, session)
- session.handle
+ new NoopSessionImpl(protocol, user, password, ipAddress, conf, this)
}
override protected def isServer: Boolean = true
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 90a70e65c..4fb3dd9d8 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -45,43 +45,39 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
super.initialize(conf)
}
- override def openSession(
+ override protected def createSession(
protocol: TProtocolVersion,
user: String,
password: String,
ipAddress: String,
- conf: Map[String, String]): SessionHandle = {
-
- val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
+ conf: Map[String, String]): Session = {
// inject client ip into session conf
val newConf = conf + (CLIENT_IP_KEY -> ipAddress)
- val sessionImpl = new KyuubiSessionImpl(
+ new KyuubiSessionImpl(
protocol,
- username,
+ user,
password,
ipAddress,
newConf,
this,
this.getConf.getUserDefaults(user))
+ }
+
+ override def openSession(
+ protocol: TProtocolVersion,
+ user: String,
+ password: String,
+ ipAddress: String,
+ conf: Map[String, String]): SessionHandle = {
+ val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
try {
- sessionImpl.open()
- val handle = sessionImpl.handle
- setSession(handle, sessionImpl)
- info(s"$username's session with $handle is opened, current opening sessions" +
- s" $getOpenSessionCount")
- handle
+ super.openSession(protocol, username, password, ipAddress, conf)
} catch {
case e: Throwable =>
MetricsSystem.tracing { ms =>
ms.incCount(CONN_FAIL)
ms.incCount(MetricRegistry.name(CONN_FAIL, user))
}
- try {
- sessionImpl.close()
- } catch {
- case t: Throwable =>
- warn(s"Error closing session for $username client ip: $ipAddress", t)
- }
throw KyuubiSQLException(
s"Error opening session for $username client ip $ipAddress, due to ${e.getMessage}",
e)