You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2022/04/13 10:11:09 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #2331] Add createSession method to further abstract openSession

This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a525e520 [KYUUBI #2331] Add createSession method to further abstract openSession
8a525e520 is described below

commit 8a525e520383278e88c7ffcef87931dc1601253c
Author: wforget <64...@qq.com>
AuthorDate: Wed Apr 13 18:10:59 2022 +0800

    [KYUUBI #2331] Add createSession method to further abstract openSession
    
    ### _Why are the changes needed?_
    
    close #2331
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2332 from wForget/KYUUBI-2331.
    
    Closes #2331
    
    ce7f32e8 [wforget] fix
    b74efd80 [wforget] [KYUUBI-2331] Add createSession method to further abstract openSession
    
    Authored-by: wforget <64...@qq.com>
    Signed-off-by: ulysses-you <ul...@apache.org>
---
 .../flink/session/FlinkSQLSessionManager.scala     | 21 +++-----------
 .../engine/hive/session/HiveSessionManager.scala   | 21 +++-----------
 .../spark/session/SparkSQLSessionManager.scala     | 19 ++-----------
 .../engine/trino/session/TrinoSessionManager.scala | 25 +++--------------
 .../org/apache/kyuubi/session/SessionManager.scala | 29 +++++++++++++++++++-
 .../apache/kyuubi/session/NoopSessionManager.scala |  8 ++----
 .../kyuubi/session/KyuubiSessionManager.scala      | 32 ++++++++++------------
 7 files changed, 60 insertions(+), 95 deletions(-)

diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
index 4e4f49001..b073fc9cb 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
@@ -21,10 +21,9 @@ import org.apache.flink.table.client.gateway.context.DefaultContext
 import org.apache.flink.table.client.gateway.local.LocalExecutor
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
-import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.engine.flink.FlinkEngineUtils
 import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
-import org.apache.kyuubi.session.{SessionHandle, SessionManager}
+import org.apache.kyuubi.session.{Session, SessionHandle, SessionManager}
 
 class FlinkSQLSessionManager(engineContext: DefaultContext)
   extends SessionManager("FlinkSQLSessionManager") {
@@ -39,12 +38,12 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
     executor.start()
   }
 
-  override def openSession(
+  override protected def createSession(
       protocol: TProtocolVersion,
       user: String,
       password: String,
       ipAddress: String,
-      conf: Map[String, String]): SessionHandle = {
+      conf: Map[String, String]): Session = {
 
     val sessionHandle = SessionHandle(protocol)
     val sessionId = sessionHandle.identifier.toString
@@ -52,7 +51,7 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
     executor.openSession(sessionId)
     val sessionContext = FlinkEngineUtils.getSessionContext(executor, sessionId)
 
-    val sessionImpl = new FlinkSessionImpl(
+    new FlinkSessionImpl(
       protocol,
       user,
       password,
@@ -61,18 +60,6 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
       this,
       sessionHandle,
       sessionContext)
-
-    try {
-      sessionImpl.open()
-      setSession(sessionHandle, sessionImpl)
-      info(s"$user's session with $sessionHandle is opened, current opening sessions" +
-        s" $getOpenSessionCount")
-      sessionHandle
-    } catch {
-      case e: Exception =>
-        sessionImpl.close()
-        throw KyuubiSQLException(e)
-    }
   }
 
   override def closeSession(sessionHandle: SessionHandle): Unit = {
diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala
index 2c74a735b..5f23f74a7 100644
--- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala
+++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionManager.scala
@@ -25,13 +25,12 @@ import org.apache.hive.service.cli.{SessionHandle => ImportedSessionHandle}
 import org.apache.hive.service.cli.session.{HiveSessionImpl => ImportedHiveSessionImpl, SessionManager => ImportedHiveSessionManager}
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
-import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
 import org.apache.kyuubi.engine.ShareLevel
 import org.apache.kyuubi.engine.hive.HiveSQLEngine
 import org.apache.kyuubi.engine.hive.operation.HiveOperationManager
 import org.apache.kyuubi.operation.OperationManager
-import org.apache.kyuubi.session.{CLIENT_IP_KEY, SessionHandle, SessionManager}
+import org.apache.kyuubi.session.{CLIENT_IP_KEY, Session, SessionHandle, SessionManager}
 
 class HiveSessionManager(engine: HiveSQLEngine) extends SessionManager("HiveSessionManager") {
   override protected def isServer: Boolean = false
@@ -65,15 +64,14 @@ class HiveSessionManager(engine: HiveSQLEngine) extends SessionManager("HiveSess
     }
   }
 
-  override def openSession(
+  override protected def createSession(
       protocol: TProtocolVersion,
       user: String,
       password: String,
       ipAddress: String,
-      conf: Map[String, String]): SessionHandle = {
+      conf: Map[String, String]): Session = {
     val sessionHandle = SessionHandle(protocol)
     val clientIp = conf.getOrElse(CLIENT_IP_KEY, ipAddress)
-    info(s"Opening session for $user@$clientIp")
     val hive = new ImportedHiveSessionImpl(
       new ImportedSessionHandle(sessionHandle.toTSessionHandle, protocol),
       protocol,
@@ -85,7 +83,7 @@ class HiveSessionManager(engine: HiveSQLEngine) extends SessionManager("HiveSess
     hive.setSessionManager(internalSessionManager)
     hive.setOperationManager(internalSessionManager.getOperationManager)
     operationLogRoot.foreach(dir => hive.setOperationLogSessionDir(new File(dir)))
-    val session = new HiveSessionImpl(
+    new HiveSessionImpl(
       protocol,
       user,
       password,
@@ -95,17 +93,6 @@ class HiveSessionManager(engine: HiveSQLEngine) extends SessionManager("HiveSess
       this,
       sessionHandle,
       hive)
-    try {
-      session.open()
-      setSession(sessionHandle, session)
-      info(s"$user's session with $sessionHandle is opened, current opening sessions" +
-        s" $getOpenSessionCount")
-      sessionHandle
-    } catch {
-      case e: Exception =>
-        session.close()
-        throw KyuubiSQLException(e)
-    }
   }
 
   override def closeSession(sessionHandle: SessionHandle): Unit = {
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index fc039d851..4e11f9bfd 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -51,14 +51,13 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
 
   private lazy val singleSparkSession = conf.get(ENGINE_SINGLE_SPARK_SESSION)
 
-  override def openSession(
+  override protected def createSession(
       protocol: TProtocolVersion,
       user: String,
       password: String,
       ipAddress: String,
-      conf: Map[String, String]): SessionHandle = {
+      conf: Map[String, String]): Session = {
     val clientIp = conf.getOrElse(CLIENT_IP_KEY, ipAddress)
-    info(s"Opening session for $user@$clientIp")
     val sparkSession =
       try {
         if (singleSparkSession) {
@@ -80,7 +79,7 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
         case e: Exception => throw KyuubiSQLException(e)
       }
 
-    val session = new SparkSessionImpl(
+    new SparkSessionImpl(
       protocol,
       user,
       password,
@@ -89,18 +88,6 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
       conf,
       this,
       sparkSession)
-    try {
-      val handle = session.handle
-      session.open()
-      setSession(handle, session)
-      info(s"$user's session with $handle is opened, current opening sessions" +
-        s" $getOpenSessionCount")
-      handle
-    } catch {
-      case e: Exception =>
-        session.close()
-        throw KyuubiSQLException(e)
-    }
   }
 
   override def closeSession(sessionHandle: SessionHandle): Unit = {
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala
index 83af04a03..aa1823bc7 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionManager.scala
@@ -19,7 +19,6 @@ package org.apache.kyuubi.engine.trino.session
 
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
-import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.Utils
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.ENGINE_OPERATION_LOG_DIR_ROOT
@@ -27,8 +26,7 @@ import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
 import org.apache.kyuubi.engine.ShareLevel
 import org.apache.kyuubi.engine.trino.TrinoSqlEngine
 import org.apache.kyuubi.engine.trino.operation.TrinoOperationManager
-import org.apache.kyuubi.session.SessionHandle
-import org.apache.kyuubi.session.SessionManager
+import org.apache.kyuubi.session.{Session, SessionHandle, SessionManager}
 
 class TrinoSessionManager
   extends SessionManager("TrinoSessionManager") {
@@ -41,28 +39,13 @@ class TrinoSessionManager
     super.initialize(conf)
   }
 
-  override def openSession(
+  override protected def createSession(
       protocol: TProtocolVersion,
       user: String,
       password: String,
       ipAddress: String,
-      conf: Map[String, String]): SessionHandle = {
-    info(s"Opening session for $user@$ipAddress")
-    val sessionImpl =
-      new TrinoSessionImpl(protocol, user, password, ipAddress, conf, this)
-
-    try {
-      val handle = sessionImpl.handle
-      sessionImpl.open()
-      setSession(handle, sessionImpl)
-      info(s"$user's trino session with $handle is opened, current opening sessions" +
-        s" $getOpenSessionCount")
-      handle
-    } catch {
-      case e: Exception =>
-        sessionImpl.close()
-        throw KyuubiSQLException(e)
-    }
+      conf: Map[String, String]): Session = {
+    new TrinoSessionImpl(protocol, user, password, ipAddress, conf, this)
   }
 
   override def closeSession(sessionHandle: SessionHandle): Unit = {
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
index dfacc9ece..992902c40 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
@@ -77,12 +77,39 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
 
   def operationManager: OperationManager
 
+  protected def createSession(
+      protocol: TProtocolVersion,
+      user: String,
+      password: String,
+      ipAddress: String,
+      conf: Map[String, String]): Session
+
   def openSession(
       protocol: TProtocolVersion,
       user: String,
       password: String,
       ipAddress: String,
-      conf: Map[String, String]): SessionHandle
+      conf: Map[String, String]): SessionHandle = {
+    info(s"Opening session for $user@$ipAddress")
+    val session = createSession(protocol, user, password, ipAddress, conf)
+    try {
+      val handle = session.handle
+      session.open()
+      setSession(handle, session)
+      info(s"$user's session with $handle is opened, current opening sessions" +
+        s" $getOpenSessionCount")
+      handle
+    } catch {
+      case e: Exception =>
+        try {
+          session.close()
+        } catch {
+          case t: Throwable =>
+            warn(s"Error closing session for $user client ip: $ipAddress", t)
+        }
+        throw KyuubiSQLException(e)
+    }
+  }
 
   def closeSession(sessionHandle: SessionHandle): Unit = {
     _latestLogoutTime = System.currentTimeMillis()
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala
index 5d14c045d..392759ede 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala
@@ -35,18 +35,16 @@ class NoopSessionManager extends SessionManager("noop") {
     super.initialize(conf)
   }
 
-  override def openSession(
+  override protected def createSession(
       protocol: TProtocolVersion,
       user: String,
       password: String,
       ipAddress: String,
-      conf: Map[String, String]): SessionHandle = {
+      conf: Map[String, String]): Session = {
     if (conf.get("kyuubi.test.should.fail").exists(_.toBoolean)) {
       throw KyuubiSQLException("Asked to fail")
     }
-    val session = new NoopSessionImpl(protocol, user, password, ipAddress, conf, this)
-    setSession(session.handle, session)
-    session.handle
+    new NoopSessionImpl(protocol, user, password, ipAddress, conf, this)
   }
 
   override protected def isServer: Boolean = true
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 90a70e65c..4fb3dd9d8 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -45,43 +45,39 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
     super.initialize(conf)
   }
 
-  override def openSession(
+  override protected def createSession(
       protocol: TProtocolVersion,
       user: String,
       password: String,
       ipAddress: String,
-      conf: Map[String, String]): SessionHandle = {
-
-    val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
+      conf: Map[String, String]): Session = {
     // inject client ip into session conf
     val newConf = conf + (CLIENT_IP_KEY -> ipAddress)
-    val sessionImpl = new KyuubiSessionImpl(
+    new KyuubiSessionImpl(
       protocol,
-      username,
+      user,
       password,
       ipAddress,
       newConf,
       this,
       this.getConf.getUserDefaults(user))
+  }
+
+  override def openSession(
+      protocol: TProtocolVersion,
+      user: String,
+      password: String,
+      ipAddress: String,
+      conf: Map[String, String]): SessionHandle = {
+    val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
     try {
-      sessionImpl.open()
-      val handle = sessionImpl.handle
-      setSession(handle, sessionImpl)
-      info(s"$username's session with $handle is opened, current opening sessions" +
-        s" $getOpenSessionCount")
-      handle
+      super.openSession(protocol, username, password, ipAddress, conf)
     } catch {
       case e: Throwable =>
         MetricsSystem.tracing { ms =>
           ms.incCount(CONN_FAIL)
           ms.incCount(MetricRegistry.name(CONN_FAIL, user))
         }
-        try {
-          sessionImpl.close()
-        } catch {
-          case t: Throwable =>
-            warn(s"Error closing session for $username client ip: $ipAddress", t)
-        }
         throw KyuubiSQLException(
           s"Error opening session for $username client ip $ipAddress, due to ${e.getMessage}",
           e)