You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2022/06/06 01:46:25 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #2807] Trino, Hive and JDBC Engine support session conf in newExecuteStatementOperation

This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new aabc53ec5 [KYUUBI #2807] Trino, Hive and JDBC Engine support session conf in newExecuteStatementOperation
aabc53ec5 is described below

commit aabc53ec550dca23922defe3072759859d369bb2
Author: sychen <sy...@ctrip.com>
AuthorDate: Mon Jun 6 09:46:14 2022 +0800

    [KYUUBI #2807] Trino, Hive and JDBC Engine support session conf in newExecuteStatementOperation
    
    ### _Why are the changes needed?_
    Now Trino, Hive and JDBC Engine use session manager configuration in `newExecuteStatementOperation`, ignoring session level configuration.
    
    Supporting session conf also helps with testing `withSessionConf`.
    
    close #2807
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2808 from cxzl25/KYUUBI-2807.
    
    Closes #2807
    
    29d6c51e [sychen] support session conf jdbc
    7bd560fe [sychen] support session conf trino/hive
    ce5e4735 [sychen] closeExpiredOperations add parentheses
    2d402fb0 [sychen] trigger test
    3e7ae439 [sychen] trigger test
    d0f47983 [sychen] sessionKyuubiConf
    
    Authored-by: sychen <sy...@ctrip.com>
    Signed-off-by: ulysses-you <ul...@apache.org>
---
 .../hive/operation/HiveOperationManager.scala      |  6 +++++-
 .../jdbc/operation/JdbcOperationManager.scala      |  7 ++++--
 .../trino/operation/TrinoOperationManager.scala    | 10 +++++++--
 .../trino/operation/TrinoOperationSuite.scala      | 25 +++++++++++++++++++++-
 .../apache/kyuubi/session/AbstractSession.scala    |  2 +-
 .../scala/org/apache/kyuubi/session/Session.scala  |  2 +-
 .../org/apache/kyuubi/session/SessionManager.scala |  2 +-
 7 files changed, 45 insertions(+), 9 deletions(-)

diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
index 8cd7a3112..0762a2938 100644
--- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
+++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hive.service.rpc.thrift.TRowSet
 
 import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.hive.session.HiveSessionImpl
 import org.apache.kyuubi.operation.{Operation, OperationHandle, OperationManager}
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
 import org.apache.kyuubi.session.Session
@@ -37,7 +38,10 @@ class HiveOperationManager() extends OperationManager("HiveOperationManager") {
       confOverlay: Map[String, String],
       runAsync: Boolean,
       queryTimeout: Long): Operation = {
-    if (session.sessionManager.getConf.get(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED)) {
+    val normalizedConf = session.asInstanceOf[HiveSessionImpl].normalizedConf
+    if (normalizedConf.get(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key).map(
+        _.toBoolean).getOrElse(
+        session.sessionManager.getConf.get(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED))) {
       val catalogDatabaseOperation = processCatalogDatabase(session, statement, confOverlay)
       if (catalogDatabaseOperation != null) {
         return catalogDatabaseOperation
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperationManager.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperationManager.scala
index c9700870b..857a19f80 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperationManager.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperationManager.scala
@@ -22,6 +22,7 @@ import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.OPERATION_INCREMENTAL_COLLECT
 import org.apache.kyuubi.engine.jdbc.dialect.{JdbcDialect, JdbcDialects}
+import org.apache.kyuubi.engine.jdbc.session.JdbcSessionImpl
 import org.apache.kyuubi.engine.jdbc.util.SupportServiceLoader
 import org.apache.kyuubi.operation.{Operation, OperationManager}
 import org.apache.kyuubi.session.Session
@@ -39,8 +40,10 @@ class JdbcOperationManager(conf: KyuubiConf) extends OperationManager("JdbcOpera
       confOverlay: Map[String, String],
       runAsync: Boolean,
       queryTimeout: Long): Operation = {
-    val conf = session.sessionManager.getConf
-    val incrementalCollect = conf.get(OPERATION_INCREMENTAL_COLLECT)
+    val normalizedConf = session.asInstanceOf[JdbcSessionImpl].normalizedConf
+    val incrementalCollect = normalizedConf.get(OPERATION_INCREMENTAL_COLLECT.key).map(
+      _.toBoolean).getOrElse(
+      session.sessionManager.getConf.get(OPERATION_INCREMENTAL_COLLECT))
     val executeStatement =
       new ExecuteStatement(session, statement, runAsync, queryTimeout, incrementalCollect)
     addOperation(executeStatement)
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
index 63140bd7f..e00699f9e 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.trino.session.TrinoSessionImpl
 import org.apache.kyuubi.operation.{Operation, OperationManager}
 import org.apache.kyuubi.session.Session
 
@@ -34,13 +35,18 @@ class TrinoOperationManager extends OperationManager("TrinoOperationManager") {
       confOverlay: Map[String, String],
       runAsync: Boolean,
       queryTimeout: Long): Operation = {
-    if (session.sessionManager.getConf.get(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED)) {
+    val normalizedConf = session.asInstanceOf[TrinoSessionImpl].normalizedConf
+    if (normalizedConf.get(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key).map(
+        _.toBoolean).getOrElse(
+        session.sessionManager.getConf.get(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED))) {
       val catalogDatabaseOperation = processCatalogDatabase(session, statement, confOverlay)
       if (catalogDatabaseOperation != null) {
         return catalogDatabaseOperation
       }
     }
-    val incrementalCollect = session.sessionManager.getConf.get(OPERATION_INCREMENTAL_COLLECT)
+    val incrementalCollect = normalizedConf.get(OPERATION_INCREMENTAL_COLLECT.key).map(
+      _.toBoolean).getOrElse(
+      session.sessionManager.getConf.get(OPERATION_INCREMENTAL_COLLECT))
     val operation =
       new ExecuteStatement(session, statement, runAsync, queryTimeout, incrementalCollect)
     addOperation(operation)
diff --git a/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationSuite.scala b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationSuite.scala
index 194672883..af75a8e5c 100644
--- a/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationSuite.scala
+++ b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationSuite.scala
@@ -34,7 +34,7 @@ import org.apache.hive.service.rpc.thrift.TOperationState
 import org.apache.hive.service.rpc.thrift.TStatusCode
 
 import org.apache.kyuubi.KyuubiSQLException
-import org.apache.kyuubi.config.KyuubiConf.{ENGINE_TRINO_CONNECTION_CATALOG, OPERATION_INCREMENTAL_COLLECT}
+import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.engine.trino.TrinoQueryTests
 import org.apache.kyuubi.engine.trino.WithTrinoEngine
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
@@ -746,4 +746,27 @@ class TrinoOperationSuite extends WithTrinoEngine with TrinoQueryTests {
       assert(tFetchResultsResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
     }
   }
+
+  test("trino - set/get catalog with session conf") {
+    Seq(true, false).foreach { enable =>
+      withSessionConf()(
+        Map(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key -> enable.toString))(Map.empty) {
+        withJdbcStatement() { statement =>
+          val catalog = statement.getConnection.getCatalog
+          if (enable) {
+            assert(catalog == "memory")
+          } else {
+            assert(catalog == "")
+          }
+          statement.getConnection.setCatalog("system")
+          val changedCatalog = statement.getConnection.getCatalog
+          if (enable) {
+            assert(changedCatalog == "system")
+          } else {
+            assert(changedCatalog == "")
+          }
+        }
+      }
+    }
+  }
 }
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
index 5e580d5ef..f02c37aaa 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
@@ -234,7 +234,7 @@ abstract class AbstractSession(
     }
   }
 
-  override def closeExpiredOperations: Unit = {
+  override def closeExpiredOperations(): Unit = {
     val operations = sessionManager.operationManager
       .removeExpiredOperations(opHandleSet.asScala.toSeq)
     operations.foreach { op =>
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
index 7e283b670..4097bb7c6 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
@@ -91,5 +91,5 @@ trait Session {
       maxRows: Int,
       fetchLog: Boolean): TRowSet
 
-  def closeExpiredOperations: Unit
+  def closeExpiredOperations(): Unit
 }
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
index 2ed469fee..84c28ad65 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
@@ -304,7 +304,7 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
                   warn(s"Error closing idle session ${session.handle}", e)
               }
             } else {
-              session.closeExpiredOperations
+              session.closeExpiredOperations()
             }
           }
         }