You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2022/06/06 01:46:25 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2807] Trino, Hive and JDBC Engine support session conf in newExecuteStatementOperation
This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new aabc53ec5 [KYUUBI #2807] Trino, Hive and JDBC Engine support session conf in newExecuteStatementOperation
aabc53ec5 is described below
commit aabc53ec550dca23922defe3072759859d369bb2
Author: sychen <sy...@ctrip.com>
AuthorDate: Mon Jun 6 09:46:14 2022 +0800
[KYUUBI #2807] Trino, Hive and JDBC Engine support session conf in newExecuteStatementOperation
### _Why are the changes needed?_
Now Trino, Hive and JDBC Engine use session manager configuration in `newExecuteStatementOperation`, ignoring session level configuration.
Supporting session conf also helps with testing `withSessionConf`.
close #2807
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2808 from cxzl25/KYUUBI-2807.
Closes #2807
29d6c51e [sychen] support session conf jdbc
7bd560fe [sychen] support session conf trino/hive
ce5e4735 [sychen] closeExpiredOperations add parentheses
2d402fb0 [sychen] trigger test
3e7ae439 [sychen] trigger test
d0f47983 [sychen] sessionKyuubiConf
Authored-by: sychen <sy...@ctrip.com>
Signed-off-by: ulysses-you <ul...@apache.org>
---
.../hive/operation/HiveOperationManager.scala | 6 +++++-
.../jdbc/operation/JdbcOperationManager.scala | 7 ++++--
.../trino/operation/TrinoOperationManager.scala | 10 +++++++--
.../trino/operation/TrinoOperationSuite.scala | 25 +++++++++++++++++++++-
.../apache/kyuubi/session/AbstractSession.scala | 2 +-
.../scala/org/apache/kyuubi/session/Session.scala | 2 +-
.../org/apache/kyuubi/session/SessionManager.scala | 2 +-
7 files changed, 45 insertions(+), 9 deletions(-)
diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
index 8cd7a3112..0762a2938 100644
--- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
+++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.service.rpc.thrift.TRowSet
import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.hive.session.HiveSessionImpl
import org.apache.kyuubi.operation.{Operation, OperationHandle, OperationManager}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.session.Session
@@ -37,7 +38,10 @@ class HiveOperationManager() extends OperationManager("HiveOperationManager") {
confOverlay: Map[String, String],
runAsync: Boolean,
queryTimeout: Long): Operation = {
- if (session.sessionManager.getConf.get(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED)) {
+ val normalizedConf = session.asInstanceOf[HiveSessionImpl].normalizedConf
+ if (normalizedConf.get(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key).map(
+ _.toBoolean).getOrElse(
+ session.sessionManager.getConf.get(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED))) {
val catalogDatabaseOperation = processCatalogDatabase(session, statement, confOverlay)
if (catalogDatabaseOperation != null) {
return catalogDatabaseOperation
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperationManager.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperationManager.scala
index c9700870b..857a19f80 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperationManager.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperationManager.scala
@@ -22,6 +22,7 @@ import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.OPERATION_INCREMENTAL_COLLECT
import org.apache.kyuubi.engine.jdbc.dialect.{JdbcDialect, JdbcDialects}
+import org.apache.kyuubi.engine.jdbc.session.JdbcSessionImpl
import org.apache.kyuubi.engine.jdbc.util.SupportServiceLoader
import org.apache.kyuubi.operation.{Operation, OperationManager}
import org.apache.kyuubi.session.Session
@@ -39,8 +40,10 @@ class JdbcOperationManager(conf: KyuubiConf) extends OperationManager("JdbcOpera
confOverlay: Map[String, String],
runAsync: Boolean,
queryTimeout: Long): Operation = {
- val conf = session.sessionManager.getConf
- val incrementalCollect = conf.get(OPERATION_INCREMENTAL_COLLECT)
+ val normalizedConf = session.asInstanceOf[JdbcSessionImpl].normalizedConf
+ val incrementalCollect = normalizedConf.get(OPERATION_INCREMENTAL_COLLECT.key).map(
+ _.toBoolean).getOrElse(
+ session.sessionManager.getConf.get(OPERATION_INCREMENTAL_COLLECT))
val executeStatement =
new ExecuteStatement(session, statement, runAsync, queryTimeout, incrementalCollect)
addOperation(executeStatement)
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
index 63140bd7f..e00699f9e 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.trino.session.TrinoSessionImpl
import org.apache.kyuubi.operation.{Operation, OperationManager}
import org.apache.kyuubi.session.Session
@@ -34,13 +35,18 @@ class TrinoOperationManager extends OperationManager("TrinoOperationManager") {
confOverlay: Map[String, String],
runAsync: Boolean,
queryTimeout: Long): Operation = {
- if (session.sessionManager.getConf.get(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED)) {
+ val normalizedConf = session.asInstanceOf[TrinoSessionImpl].normalizedConf
+ if (normalizedConf.get(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key).map(
+ _.toBoolean).getOrElse(
+ session.sessionManager.getConf.get(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED))) {
val catalogDatabaseOperation = processCatalogDatabase(session, statement, confOverlay)
if (catalogDatabaseOperation != null) {
return catalogDatabaseOperation
}
}
- val incrementalCollect = session.sessionManager.getConf.get(OPERATION_INCREMENTAL_COLLECT)
+ val incrementalCollect = normalizedConf.get(OPERATION_INCREMENTAL_COLLECT.key).map(
+ _.toBoolean).getOrElse(
+ session.sessionManager.getConf.get(OPERATION_INCREMENTAL_COLLECT))
val operation =
new ExecuteStatement(session, statement, runAsync, queryTimeout, incrementalCollect)
addOperation(operation)
diff --git a/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationSuite.scala b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationSuite.scala
index 194672883..af75a8e5c 100644
--- a/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationSuite.scala
+++ b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationSuite.scala
@@ -34,7 +34,7 @@ import org.apache.hive.service.rpc.thrift.TOperationState
import org.apache.hive.service.rpc.thrift.TStatusCode
import org.apache.kyuubi.KyuubiSQLException
-import org.apache.kyuubi.config.KyuubiConf.{ENGINE_TRINO_CONNECTION_CATALOG, OPERATION_INCREMENTAL_COLLECT}
+import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.trino.TrinoQueryTests
import org.apache.kyuubi.engine.trino.WithTrinoEngine
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
@@ -746,4 +746,27 @@ class TrinoOperationSuite extends WithTrinoEngine with TrinoQueryTests {
assert(tFetchResultsResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
}
}
+
+ test("trino - set/get catalog with session conf") {
+ Seq(true, false).foreach { enable =>
+ withSessionConf()(
+ Map(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key -> enable.toString))(Map.empty) {
+ withJdbcStatement() { statement =>
+ val catalog = statement.getConnection.getCatalog
+ if (enable) {
+ assert(catalog == "memory")
+ } else {
+ assert(catalog == "")
+ }
+ statement.getConnection.setCatalog("system")
+ val changedCatalog = statement.getConnection.getCatalog
+ if (enable) {
+ assert(changedCatalog == "system")
+ } else {
+ assert(changedCatalog == "")
+ }
+ }
+ }
+ }
+ }
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
index 5e580d5ef..f02c37aaa 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
@@ -234,7 +234,7 @@ abstract class AbstractSession(
}
}
- override def closeExpiredOperations: Unit = {
+ override def closeExpiredOperations(): Unit = {
val operations = sessionManager.operationManager
.removeExpiredOperations(opHandleSet.asScala.toSeq)
operations.foreach { op =>
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
index 7e283b670..4097bb7c6 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
@@ -91,5 +91,5 @@ trait Session {
maxRows: Int,
fetchLog: Boolean): TRowSet
- def closeExpiredOperations: Unit
+ def closeExpiredOperations(): Unit
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
index 2ed469fee..84c28ad65 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
@@ -304,7 +304,7 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
warn(s"Error closing idle session ${session.handle}", e)
}
} else {
- session.closeExpiredOperations
+ session.closeExpiredOperations()
}
}
}