You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/09/08 02:21:18 UTC
[incubator-kyuubi] branch branch-1.6 updated (fd16dd740 -> d9f1d0b25)
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a change to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
from fd16dd740 [KYUUBI #3064] Fix scala NPE issue when adding non-local jar URI to class loader
new 5589406c9 [KYUUBI #3318] Transfer the TGetInfoReq to kyuubi engine side to check the connection valid and keep connection alive
new d9f1d0b25 [KYUUBI #3122] GetInfo supports return server/engine info
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
docs/deployment/settings.md | 1 +
.../engine/flink/session/FlinkSessionImpl.scala | 13 ++-
.../engine/hive/session/HiveSessionImpl.scala | 17 ++-
.../engine/spark/session/SparkSessionImpl.scala | 18 +++-
.../spark/operation/SparkOperationSuite.scala | 13 +++
.../it/flink/operation/FlinkOperationSuite.scala | 24 +++++
.../KyuubiOperationHiveEnginePerUserSuite.scala | 24 +++++
.../org/apache/kyuubi/config/KyuubiConf.scala | 10 ++
.../apache/kyuubi/session/AbstractSession.scala | 12 +--
.../kyuubi/operation/SparkMetadataTests.scala | 27 ++---
.../kyuubi/service/TFrontendServiceSuite.scala | 9 +-
.../kyuubi/client/KyuubiSyncThriftClient.scala | 7 ++
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 11 ++
.../KyuubiOperationPerConnectionSuite.scala | 25 +++++
.../operation/KyuubiOperationPerUserSuite.scala | 119 +++++++++++++--------
15 files changed, 254 insertions(+), 76 deletions(-)
[incubator-kyuubi] 01/02: [KYUUBI #3318] Transfer the TGetInfoReq to kyuubi engine side to check the connection valid and keep connection alive
Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
commit 5589406c9dc08fd41044c81e4d154b576b4fda29
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Wed Aug 24 17:35:52 2022 +0800
[KYUUBI #3318] Transfer the TGetInfoReq to kyuubi engine side to check the connection valid and keep connection alive
### _Why are the changes needed?_
Now the connection path is:
`client`-> `kyuubiServer` -> `kyuubiEngine`
Maybe the connection between `client -> kyuubiServer` is valid, but the connection between `kyuubiServer -> kyuubiEngine` is not.
So we need check the whole path.
BTW, before, if customer invoke KyuubiConnection::isValid, it can make the connection between client and kyuubiServer keep alive.
Now, it can make the whole connection path keep alive.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #3318 from turboFei/is_valid.
Closes #3318
af4d6b7b [Fei Wang] Transfer the TGetInfoReq to kyuubi engine side to keep the kyuubi connection alive
Authored-by: Fei Wang <fw...@ebay.com>
Signed-off-by: Fei Wang <fw...@ebay.com>
---
.../apache/kyuubi/session/AbstractSession.scala | 2 +-
.../kyuubi/client/KyuubiSyncThriftClient.scala | 7 +++++++
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 10 ++++++++++
.../KyuubiOperationPerConnectionSuite.scala | 23 ++++++++++++++++++++++
4 files changed, 41 insertions(+), 1 deletion(-)
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
index b67690386..64d691b27 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
@@ -75,7 +75,7 @@ abstract class AbstractSession(
}
}
- private def withAcquireRelease[T](userAccess: Boolean = true)(f: => T): T = {
+ protected def withAcquireRelease[T](userAccess: Boolean = true)(f: => T): T = {
acquire(userAccess)
try f
finally release(userAccess)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index 834c2c0c9..4420916e2 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -227,6 +227,13 @@ class KyuubiSyncThriftClient private (
resp.getOperationHandle
}
+ def getInfo(infoType: TGetInfoType): TGetInfoResp = {
+ val req = new TGetInfoReq(_remoteSessionHandle, infoType)
+ val resp = withLockAcquiredAsyncRequest(GetInfo(req))
+ ThriftUtils.verifyTStatus(resp.getStatus)
+ resp
+ }
+
def getTypeInfo: TOperationHandle = {
val req = new TGetTypeInfoReq(_remoteSessionHandle)
val resp = withLockAcquiredAsyncRequest(GetTypeInfo(req))
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index e515d7146..1b1694e15 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -197,4 +197,14 @@ class KyuubiSessionImpl(
MetricsSystem.tracing(_.decCount(MetricRegistry.name(CONN_OPEN, user)))
}
}
+
+ override def getInfo(infoType: TGetInfoType): TGetInfoValue = {
+ if (client != null) {
+ withAcquireRelease() {
+ client.getInfo(infoType).getInfoValue
+ }
+ } else {
+ super.getInfo(infoType)
+ }
+ }
}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
index 5cc11bf42..4f78c7f42 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
@@ -29,9 +29,11 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi.WithKyuubiServer
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.SESSION_CONF_ADVISOR
+import org.apache.kyuubi.engine.ApplicationState
import org.apache.kyuubi.jdbc.KyuubiHiveDriver
import org.apache.kyuubi.jdbc.hive.KyuubiConnection
import org.apache.kyuubi.plugin.SessionConfAdvisor
+import org.apache.kyuubi.session.KyuubiSessionManager
/**
* UT with Connection level engine shared cost much time, only run basic jdbc tests.
@@ -210,6 +212,27 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with HiveJDBCTe
assert(kyuubiConnection.isClosed)
}
}
+
+ test("transfer the TGetInfoReq to kyuubi engine side to verify the connection valid") {
+ withSessionConf(Map.empty)(Map(KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "false"))() {
+ withJdbcStatement() { statement =>
+ val conn = statement.getConnection.asInstanceOf[KyuubiConnection]
+ assert(conn.isValid(3000))
+ val sessionManager = server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
+ eventually(timeout(10.seconds)) {
+ assert(sessionManager.allSessions().size === 1)
+ }
+ val engineId = sessionManager.allSessions().head.handle.identifier.toString
+ // kill the engine application and wait the engine terminate
+ sessionManager.applicationManager.killApplication(None, engineId)
+ eventually(timeout(30.seconds), interval(100.milliseconds)) {
+ assert(sessionManager.applicationManager.getApplicationInfo(None, engineId)
+ .exists(_.state == ApplicationState.NOT_FOUND))
+ }
+ assert(!conn.isValid(3000))
+ }
+ }
+ }
}
class TestSessionConfAdvisor extends SessionConfAdvisor {
[incubator-kyuubi] 02/02: [KYUUBI #3122] GetInfo supports return server/engine info
Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
commit d9f1d0b25fbb0d0c9fa7c06e004cb03bd67bf551
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Thu Sep 8 02:14:38 2022 +0000
[KYUUBI #3122] GetInfo supports return server/engine info
### _Why are the changes needed?_
Workaround for #3032, close #3323
There are known some ODBC drivers e.g. [Databricks ODBC driver](https://www.databricks.com/spark/odbc-drivers-download) depending on `TGetInfoType.CLI_DBMS_VER` and `TGetInfoType.CLI_DBMS_NAME` to check server compatibilities and abilities.
This PR proposes to introduce a new configuration `kyuubi.server.info.provider=SERVER/ENGINE` to make GetInfo support return either server or engine information.
Since beeline will call GetInfo in the initialization phase, to make sure the beeline fast open experience, in async launch mode, when the engine is not ready, return server info regardless `kyuubi.server.info.provider`.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [x] Add screenshots for manual tests if appropriate
Testing w/ PowerBI
![image](https://user-images.githubusercontent.com/26535726/188945975-0d0fc95c-f989-4025-ad7d-c024e23ec328.png)
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #3122 from pan3793/info.
Closes #3122
742bdbe3 [Cheng Pan] nit
bb85d2ba [Cheng Pan] style
fd75238c [Cheng Pan] fix
9ddb2afd [Cheng Pan] nit
fd8f7979 [Cheng Pan] fix ut
840205e5 [Cheng Pan] nit
f9996d53 [Cheng Pan] GetInfo supports returning engine info
Authored-by: Cheng Pan <ch...@apache.org>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
docs/deployment/settings.md | 1 +
.../engine/flink/session/FlinkSessionImpl.scala | 13 ++-
.../engine/hive/session/HiveSessionImpl.scala | 17 ++-
.../engine/spark/session/SparkSessionImpl.scala | 18 +++-
.../spark/operation/SparkOperationSuite.scala | 13 +++
.../it/flink/operation/FlinkOperationSuite.scala | 24 +++++
.../KyuubiOperationHiveEnginePerUserSuite.scala | 24 +++++
.../org/apache/kyuubi/config/KyuubiConf.scala | 10 ++
.../apache/kyuubi/session/AbstractSession.scala | 10 +-
.../kyuubi/operation/SparkMetadataTests.scala | 27 ++---
.../kyuubi/service/TFrontendServiceSuite.scala | 9 +-
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 13 +--
.../KyuubiOperationPerConnectionSuite.scala | 4 +-
.../operation/KyuubiOperationPerUserSuite.scala | 119 +++++++++++++--------
14 files changed, 220 insertions(+), 82 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 6513ed012..6b7811500 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -416,6 +416,7 @@ kyuubi.operation.status.polling.timeout|PT5S|Timeout(ms) for long polling asynch
Key | Default | Meaning | Type | Since
--- | --- | --- | --- | ---
+kyuubi.server.info.provider|SERVER|The server information provider name, some clients may rely on this information to check the server compatibilities and functionalities. <li>SERVER: Return Kyuubi server information.</li> <li>ENGINE: Return Kyuubi engine information.</li>|string|1.6.1
kyuubi.server.limit.connections.per.ipaddress|<undefined>|Maximum kyuubi server connections per ipaddress. Any user exceeding this limit will not be allowed to connect.|int|1.6.0
kyuubi.server.limit.connections.per.user|<undefined>|Maximum kyuubi server connections per user. Any user exceeding this limit will not be allowed to connect.|int|1.6.0
kyuubi.server.limit.connections.per.user.ipaddress|<undefined>|Maximum kyuubi server connections per user:ipaddress combination. Any user-ipaddress exceeding this limit will not be allowed to connect.|int|1.6.0
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
index 99d223f20..7c6107ee3 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
@@ -19,11 +19,13 @@ package org.apache.kyuubi.engine.flink.session
import scala.util.control.NonFatal
+import org.apache.flink.runtime.util.EnvironmentInformation
import org.apache.flink.table.client.gateway.SqlExecutionException
import org.apache.flink.table.client.gateway.context.SessionContext
import org.apache.flink.table.client.gateway.local.LocalExecutor
-import org.apache.hive.service.rpc.thrift.TProtocolVersion
+import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion}
+import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
import org.apache.kyuubi.session.{AbstractSession, SessionManager}
@@ -66,4 +68,13 @@ class FlinkSessionImpl(
}
super.open()
}
+
+ override def getInfo(infoType: TGetInfoType): TGetInfoValue = withAcquireRelease() {
+ infoType match {
+ case TGetInfoType.CLI_SERVER_NAME | TGetInfoType.CLI_DBMS_NAME =>
+ TGetInfoValue.stringValue("Apache Flink")
+ case TGetInfoType.CLI_DBMS_VER => TGetInfoValue.stringValue(EnvironmentInformation.getVersion)
+ case _ => throw KyuubiSQLException(s"Unrecognized GetInfoType value: $infoType")
+ }
+ }
}
diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionImpl.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionImpl.scala
index f59a69cba..3b85f94df 100644
--- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionImpl.scala
+++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/session/HiveSessionImpl.scala
@@ -21,9 +21,11 @@ import java.util.HashMap
import scala.collection.JavaConverters._
+import org.apache.hive.common.util.HiveVersionInfo
import org.apache.hive.service.cli.session.HiveSession
-import org.apache.hive.service.rpc.thrift.TProtocolVersion
+import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion}
+import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.hive.events.HiveSessionEvent
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.operation.{Operation, OperationHandle}
@@ -54,6 +56,19 @@ class HiveSessionImpl(
super.runOperation(operation)
}
+ override def getInfo(infoType: TGetInfoType): TGetInfoValue = withAcquireRelease() {
+ infoType match {
+ case TGetInfoType.CLI_SERVER_NAME => TGetInfoValue.stringValue("Hive")
+ case TGetInfoType.CLI_DBMS_NAME => TGetInfoValue.stringValue("Apache Hive")
+ case TGetInfoType.CLI_DBMS_VER => TGetInfoValue.stringValue(HiveVersionInfo.getVersion)
+ case TGetInfoType.CLI_ODBC_KEYWORDS => TGetInfoValue.stringValue("Unimplemented")
+ case TGetInfoType.CLI_MAX_COLUMN_NAME_LEN |
+ TGetInfoType.CLI_MAX_SCHEMA_NAME_LEN |
+ TGetInfoType.CLI_MAX_TABLE_NAME_LEN => TGetInfoValue.lenValue(128)
+ case _ => throw KyuubiSQLException(s"Unrecognized GetInfoType value: $infoType")
+ }
+ }
+
override def close(): Unit = {
sessionEvent.endTime = System.currentTimeMillis()
EventBus.post(sessionEvent)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
index 6abbf4266..eda7028cb 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
@@ -17,9 +17,10 @@
package org.apache.kyuubi.engine.spark.session
-import org.apache.hive.service.rpc.thrift.TProtocolVersion
+import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion}
import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.spark.events.SessionEvent
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
@@ -75,11 +76,24 @@ class SparkSessionImpl(
super.runOperation(operation)
}
+ override def getInfo(infoType: TGetInfoType): TGetInfoValue = withAcquireRelease() {
+ infoType match {
+ case TGetInfoType.CLI_SERVER_NAME | TGetInfoType.CLI_DBMS_NAME =>
+ TGetInfoValue.stringValue("Spark SQL")
+ case TGetInfoType.CLI_DBMS_VER => TGetInfoValue.stringValue(org.apache.spark.SPARK_VERSION)
+ case TGetInfoType.CLI_ODBC_KEYWORDS => TGetInfoValue.stringValue("Unimplemented")
+ case TGetInfoType.CLI_MAX_COLUMN_NAME_LEN |
+ TGetInfoType.CLI_MAX_SCHEMA_NAME_LEN |
+ TGetInfoType.CLI_MAX_TABLE_NAME_LEN => TGetInfoValue.lenValue(128)
+ case _ => throw KyuubiSQLException(s"Unrecognized GetInfoType value: $infoType")
+ }
+ }
+
override def close(): Unit = {
sessionEvent.endTime = System.currentTimeMillis()
EventBus.post(sessionEvent)
super.close()
- spark.sessionState.catalog.getTempViewNames().foreach(spark.catalog.uncacheTable(_))
+ spark.sessionState.catalog.getTempViewNames().foreach(spark.catalog.uncacheTable)
sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager].closeILoop(handle)
}
}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
index d1ab4856f..2ac0235c6 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
@@ -26,10 +26,12 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hive.service.rpc.thrift._
+import org.apache.spark.SPARK_VERSION
import org.apache.spark.kyuubi.SparkContextHelper
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.types._
+import org.apache.kyuubi.engine.SemanticVersion
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
import org.apache.kyuubi.engine.spark.schema.SchemaHelper.TIMESTAMP_NTZ
import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
@@ -55,6 +57,17 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with
}
}
+ test("audit Spark engine MetaData") {
+ withJdbcStatement() { statement =>
+ val metaData = statement.getConnection.getMetaData
+ assert(metaData.getDatabaseProductName === "Spark SQL")
+ assert(metaData.getDatabaseProductVersion === SPARK_VERSION)
+ val ver = SemanticVersion(SPARK_VERSION)
+ assert(metaData.getDatabaseMajorVersion === ver.majorVersion)
+ assert(metaData.getDatabaseMinorVersion === ver.minorVersion)
+ }
+ }
+
test("get columns operation") {
val tableName = "spark_get_col_operation"
var schema = new StructType()
diff --git a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
index fa4fb6ad9..b33669702 100644
--- a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
+++ b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
@@ -17,6 +17,8 @@
package org.apache.kyuubi.it.flink.operation
+import org.apache.hive.service.rpc.thrift.{TGetInfoReq, TGetInfoType}
+
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.it.flink.WithKyuubiServerAndFlinkMiniCluster
@@ -69,4 +71,26 @@ class FlinkOperationSuite extends WithKyuubiServerAndFlinkMiniCluster
assert(success)
}
}
+
+ test("server info provider - server") {
+ withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "SERVER"))()() {
+ withSessionHandle { (client, handle) =>
+ val req = new TGetInfoReq()
+ req.setSessionHandle(handle)
+ req.setInfoType(TGetInfoType.CLI_DBMS_NAME)
+ assert(client.GetInfo(req).getInfoValue.getStringValue === "Apache Kyuubi (Incubating)")
+ }
+ }
+ }
+
+ test("server info provider - engine") {
+ withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "ENGINE"))()() {
+ withSessionHandle { (client, handle) =>
+ val req = new TGetInfoReq()
+ req.setSessionHandle(handle)
+ req.setInfoType(TGetInfoType.CLI_DBMS_NAME)
+ assert(client.GetInfo(req).getInfoValue.getStringValue === "Apache Flink")
+ }
+ }
+ }
}
diff --git a/integration-tests/kyuubi-hive-it/src/test/scala/org/apache/kyuubi/it/hive/operation/KyuubiOperationHiveEnginePerUserSuite.scala b/integration-tests/kyuubi-hive-it/src/test/scala/org/apache/kyuubi/it/hive/operation/KyuubiOperationHiveEnginePerUserSuite.scala
index 613cf185c..de4b9f5cb 100644
--- a/integration-tests/kyuubi-hive-it/src/test/scala/org/apache/kyuubi/it/hive/operation/KyuubiOperationHiveEnginePerUserSuite.scala
+++ b/integration-tests/kyuubi-hive-it/src/test/scala/org/apache/kyuubi/it/hive/operation/KyuubiOperationHiveEnginePerUserSuite.scala
@@ -17,6 +17,8 @@
package org.apache.kyuubi.it.hive.operation
+import org.apache.hive.service.rpc.thrift.{TGetInfoReq, TGetInfoType}
+
import org.apache.kyuubi.{HiveEngineTests, Utils, WithKyuubiServer}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
@@ -37,4 +39,26 @@ class KyuubiOperationHiveEnginePerUserSuite extends WithKyuubiServer with HiveEn
}
override protected def jdbcUrl: String = getJdbcUrl
+
+ test("server info provider - server") {
+ withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "SERVER"))()() {
+ withSessionHandle { (client, handle) =>
+ val req = new TGetInfoReq()
+ req.setSessionHandle(handle)
+ req.setInfoType(TGetInfoType.CLI_DBMS_NAME)
+ assert(client.GetInfo(req).getInfoValue.getStringValue === "Apache Kyuubi (Incubating)")
+ }
+ }
+ }
+
+ test("server info provider - engine") {
+ withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "ENGINE"))()() {
+ withSessionHandle { (client, handle) =>
+ val req = new TGetInfoReq()
+ req.setSessionHandle(handle)
+ req.setInfoType(TGetInfoType.CLI_DBMS_NAME)
+ assert(client.GetInfo(req).getInfoValue.getStringValue === "Apache Hive")
+ }
+ }
+ }
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 62590d627..0c5c4c63f 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1691,6 +1691,16 @@ object KyuubiConf {
.stringConf
.createOptional
+ val SERVER_INFO_PROVIDER: ConfigEntry[String] =
+ buildConf("kyuubi.server.info.provider")
+ .doc("The server information provider name, some clients may rely on this information" +
+ " to check the server compatibilities and functionalities." +
+ " <li>SERVER: Return Kyuubi server information.</li>" +
+ " <li>ENGINE: Return Kyuubi engine information.</li>")
+ .version("1.6.1")
+ .stringConf
+ .createWithDefault("SERVER")
+
val ENGINE_SPARK_SHOW_PROGRESS: ConfigEntry[Boolean] =
buildConf("kyuubi.session.engine.spark.showProgress")
.doc("When true, show the progress bar in the spark engine log.")
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
index 64d691b27..96a4ee4a6 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
@@ -19,10 +19,10 @@ package org.apache.kyuubi.session
import scala.collection.JavaConverters._
-import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion, TRowSet, TTableSchema}
+import org.apache.hive.service.rpc.thrift._
import org.apache.kyuubi.{KyuubiSQLException, Logging}
-import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_CLIENT_IP_KEY
import org.apache.kyuubi.operation.{Operation, OperationHandle}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
@@ -55,7 +55,7 @@ abstract class AbstractSession(
val normalizedConf: Map[String, String] = sessionManager.validateAndNormalizeConf(conf)
- override lazy val name: Option[String] = normalizedConf.get(KyuubiConf.SESSION_NAME.key)
+ override lazy val name: Option[String] = normalizedConf.get(SESSION_NAME.key)
final private val opHandleSet = new java.util.HashSet[OperationHandle]
@@ -108,8 +108,8 @@ abstract class AbstractSession(
override def getInfo(infoType: TGetInfoType): TGetInfoValue = withAcquireRelease() {
infoType match {
- case TGetInfoType.CLI_SERVER_NAME => TGetInfoValue.stringValue("Apache Kyuubi (Incubating)")
- case TGetInfoType.CLI_DBMS_NAME => TGetInfoValue.stringValue("Apache Kyuubi (Incubating)")
+ case TGetInfoType.CLI_SERVER_NAME | TGetInfoType.CLI_DBMS_NAME =>
+ TGetInfoValue.stringValue("Apache Kyuubi (Incubating)")
case TGetInfoType.CLI_DBMS_VER => TGetInfoValue.stringValue(org.apache.kyuubi.KYUUBI_VERSION)
case TGetInfoType.CLI_ODBC_KEYWORDS => TGetInfoValue.stringValue("Unimplemented")
case TGetInfoType.CLI_MAX_COLUMN_NAME_LEN |
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkMetadataTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkMetadataTests.scala
index b0c1ed759..b9f6d5823 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkMetadataTests.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkMetadataTests.scala
@@ -22,7 +22,6 @@ import java.sql.{DatabaseMetaData, ResultSet, SQLException, SQLFeatureNotSupport
import scala.util.Random
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiSQLException}
-import org.apache.kyuubi.engine.SemanticVersion
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
// For both `in-memory` and `hive` external catalog
@@ -294,7 +293,7 @@ trait SparkMetadataTests extends HiveJDBCTestHelper {
}
}
- test("audit Kyuubi Hive JDBC connection MetaData") {
+ test("audit Kyuubi Hive JDBC connection common MetaData") {
withJdbcStatement() { statement =>
val metaData = statement.getConnection.getMetaData
Seq(
@@ -405,13 +404,8 @@ trait SparkMetadataTests extends HiveJDBCTestHelper {
assert(metaData.allTablesAreSelectable)
assert(metaData.getClientInfoProperties.next)
- assert(metaData.getDatabaseProductName === "Apache Kyuubi (Incubating)")
- assert(metaData.getDatabaseProductVersion === KYUUBI_VERSION)
assert(metaData.getDriverName === "Kyuubi Project Hive JDBC Shaded Client")
assert(metaData.getDriverVersion === KYUUBI_VERSION)
- val ver = SemanticVersion(KYUUBI_VERSION)
- assert(metaData.getDatabaseMajorVersion === ver.majorVersion)
- assert(metaData.getDatabaseMinorVersion === ver.minorVersion)
assert(
metaData.getIdentifierQuoteString === " ",
"This method returns a space \" \" if identifier quoting is not supported")
@@ -455,21 +449,16 @@ trait SparkMetadataTests extends HiveJDBCTestHelper {
assert(metaData.getDefaultTransactionIsolation === java.sql.Connection.TRANSACTION_NONE)
assert(!metaData.supportsTransactions)
assert(!metaData.getProcedureColumns("", "%", "%", "%").next())
- try {
- assert(!metaData.getPrimaryKeys("", "default", "src").next())
- } catch {
- case e: Exception =>
- assert(e.isInstanceOf[SQLException])
- assert(e.getMessage.contains(KyuubiSQLException.featureNotSupported().getMessage))
+ val e1 = intercept[SQLException] {
+ metaData.getPrimaryKeys("", "default", "src").next()
}
+ assert(e1.getMessage.contains(KyuubiSQLException.featureNotSupported().getMessage))
assert(!metaData.getImportedKeys("", "default", "").next())
- try {
- assert(!metaData.getCrossReference("", "default", "src", "", "default", "src2").next())
- } catch {
- case e: Exception =>
- assert(e.isInstanceOf[SQLException])
- assert(e.getMessage.contains(KyuubiSQLException.featureNotSupported().getMessage))
+
+ val e2 = intercept[SQLException] {
+ metaData.getCrossReference("", "default", "src", "", "default", "src2").next()
}
+ assert(e2.getMessage.contains(KyuubiSQLException.featureNotSupported().getMessage))
assert(!metaData.getIndexInfo("", "default", "src", true, true).next())
assert(metaData.supportsResultSetType(new Random().nextInt()))
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
index 1e730abc6..9972c4f60 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.time._
import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_BIND_HOST, FRONTEND_CONNECTION_URL_USE_HOSTNAME, FRONTEND_THRIFT_BINARY_BIND_HOST, FRONTEND_THRIFT_BINARY_BIND_PORT}
+import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.operation.{OperationHandle, TClientTestUtils}
import org.apache.kyuubi.service.TFrontendService.FeServiceServerContext
import org.apache.kyuubi.session.{AbstractSession, SessionHandle}
@@ -34,7 +34,7 @@ import org.apache.kyuubi.session.{AbstractSession, SessionHandle}
class TFrontendServiceSuite extends KyuubiFunSuite {
protected val server = new NoopTBinaryFrontendServer()
- protected val conf = KyuubiConf()
+ protected val conf: KyuubiConf = KyuubiConf()
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
.set("kyuubi.test.server.should.fail", "false")
.set(KyuubiConf.SESSION_CHECK_INTERVAL, Duration.ofSeconds(5).toMillis)
@@ -54,8 +54,8 @@ class TFrontendServiceSuite extends KyuubiFunSuite {
}
override def afterAll(): Unit = {
- server.getServices.foreach(_.stop())
super.afterAll()
+ server.getServices.foreach(_.stop())
}
private def checkOperationResult(
@@ -166,8 +166,7 @@ class TFrontendServiceSuite extends KyuubiFunSuite {
val req = new TGetInfoReq()
req.setSessionHandle(handle)
req.setInfoType(TGetInfoType.CLI_DBMS_VER)
- val resp = client.GetInfo(req)
- assert(resp.getInfoValue.getStringValue === org.apache.kyuubi.KYUUBI_VERSION)
+ assert(client.GetInfo(req).getInfoValue.getStringValue === org.apache.kyuubi.KYUUBI_VERSION)
req.setInfoType(TGetInfoType.CLI_SERVER_NAME)
assert(client.GetInfo(req).getInfoValue.getStringValue === "Apache Kyuubi (Incubating)")
req.setInfoType(TGetInfoType.CLI_DBMS_NAME)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 1b1694e15..edbb5c45e 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -199,12 +199,13 @@ class KyuubiSessionImpl(
}
override def getInfo(infoType: TGetInfoType): TGetInfoValue = {
- if (client != null) {
- withAcquireRelease() {
- client.getInfo(infoType).getInfoValue
- }
- } else {
- super.getInfo(infoType)
+ sessionConf.get(SERVER_INFO_PROVIDER) match {
+ case "SERVER" => super.getInfo(infoType)
+ case "ENGINE" => withAcquireRelease() {
+ waitForEngineLaunched()
+ client.getInfo(infoType).getInfoValue
+ }
+ case unknown => throw new IllegalArgumentException(s"Unknown server info provider $unknown")
}
}
}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
index 4f78c7f42..db23bf8c0 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
@@ -214,7 +214,9 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with HiveJDBCTe
}
test("transfer the TGetInfoReq to kyuubi engine side to verify the connection valid") {
- withSessionConf(Map.empty)(Map(KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "false"))() {
+ withSessionConf(Map.empty)(Map(
+ KyuubiConf.SERVER_INFO_PROVIDER.key -> "ENGINE",
+ KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "false"))() {
withJdbcStatement() { statement =>
val conn = statement.getConnection.asInstanceOf[KyuubiConnection]
assert(conn.isValid(3000))
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
index c2f1be1d4..0cb38a1cc 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
@@ -20,12 +20,13 @@ package org.apache.kyuubi.operation
import java.util.UUID
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
-import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TStatusCode}
+import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TGetInfoReq, TGetInfoType, TStatusCode}
import org.scalatest.time.SpanSugar._
-import org.apache.kyuubi.{Utils, WithKyuubiServer, WithSimpleDFSService}
+import org.apache.kyuubi.{KYUUBI_VERSION, Utils, WithKyuubiServer, WithSimpleDFSService}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.KYUUBI_ENGINE_ENV_PREFIX
+import org.apache.kyuubi.engine.SemanticVersion
import org.apache.kyuubi.jdbc.hive.KyuubiStatement
import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, SessionHandle}
@@ -43,6 +44,17 @@ class KyuubiOperationPerUserSuite
conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CONF_DIR", getHadoopConfDir)
}
+ test("audit Kyuubi server MetaData") {
+ withJdbcStatement() { statement =>
+ val metaData = statement.getConnection.getMetaData
+ assert(metaData.getDatabaseProductName === "Apache Kyuubi (Incubating)")
+ assert(metaData.getDatabaseProductVersion === KYUUBI_VERSION)
+ val ver = SemanticVersion(KYUUBI_VERSION)
+ assert(metaData.getDatabaseMajorVersion === ver.majorVersion)
+ assert(metaData.getDatabaseMinorVersion === ver.minorVersion)
+ }
+ }
+
test("kyuubi defined function - system_user/session_user") {
withJdbcStatement() { statement =>
val rs = statement.executeQuery("SELECT system_user(), session_user()")
@@ -168,46 +180,45 @@ class KyuubiOperationPerUserSuite
}
test("support to interrupt the thrift request if remote engine is broken") {
- if (!httpMode) {
- withSessionConf(Map(
- KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED.key -> "true",
- KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL.key -> "1000",
- KyuubiConf.ENGINE_ALIVE_TIMEOUT.key -> "1000"))(Map.empty)(
- Map.empty) {
- withSessionHandle { (client, handle) =>
- val preReq = new TExecuteStatementReq()
- preReq.setStatement("select engine_name()")
- preReq.setSessionHandle(handle)
- preReq.setRunAsync(false)
- client.ExecuteStatement(preReq)
-
- val sessionHandle = SessionHandle(handle)
- val session = server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
- .getSession(sessionHandle).asInstanceOf[KyuubiSessionImpl]
- session.client.getEngineAliveProbeProtocol.foreach(_.getTransport.close())
-
- val exitReq = new TExecuteStatementReq()
- exitReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 1000L)," +
- "java_method('java.lang.System', 'exit', 1)")
- exitReq.setSessionHandle(handle)
- exitReq.setRunAsync(true)
- client.ExecuteStatement(exitReq)
-
- val executeStmtReq = new TExecuteStatementReq()
- executeStmtReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 30000l)")
- executeStmtReq.setSessionHandle(handle)
- executeStmtReq.setRunAsync(false)
- val startTime = System.currentTimeMillis()
- val executeStmtResp = client.ExecuteStatement(executeStmtReq)
- assert(executeStmtResp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
- assert(executeStmtResp.getStatus.getErrorMessage.contains(
- "java.net.SocketException: Connection reset") ||
- executeStmtResp.getStatus.getErrorMessage.contains(
- "Caused by: java.net.SocketException: Broken pipe (Write failed)"))
- val elapsedTime = System.currentTimeMillis() - startTime
- assert(elapsedTime < 20 * 1000)
- assert(session.client.asyncRequestInterrupted)
- }
+ assume(!httpMode)
+ withSessionConf(Map(
+ KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED.key -> "true",
+ KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL.key -> "1000",
+ KyuubiConf.ENGINE_ALIVE_TIMEOUT.key -> "1000"))(Map.empty)(
+ Map.empty) {
+ withSessionHandle { (client, handle) =>
+ val preReq = new TExecuteStatementReq()
+ preReq.setStatement("select engine_name()")
+ preReq.setSessionHandle(handle)
+ preReq.setRunAsync(false)
+ client.ExecuteStatement(preReq)
+
+ val sessionHandle = SessionHandle(handle)
+ val session = server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
+ .getSession(sessionHandle).asInstanceOf[KyuubiSessionImpl]
+ session.client.getEngineAliveProbeProtocol.foreach(_.getTransport.close())
+
+ val exitReq = new TExecuteStatementReq()
+ exitReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 1000L)," +
+ "java_method('java.lang.System', 'exit', 1)")
+ exitReq.setSessionHandle(handle)
+ exitReq.setRunAsync(true)
+ client.ExecuteStatement(exitReq)
+
+ val executeStmtReq = new TExecuteStatementReq()
+ executeStmtReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 30000l)")
+ executeStmtReq.setSessionHandle(handle)
+ executeStmtReq.setRunAsync(false)
+ val startTime = System.currentTimeMillis()
+ val executeStmtResp = client.ExecuteStatement(executeStmtReq)
+ assert(executeStmtResp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
+ assert(executeStmtResp.getStatus.getErrorMessage.contains(
+ "java.net.SocketException: Connection reset") ||
+ executeStmtResp.getStatus.getErrorMessage.contains(
+ "Caused by: java.net.SocketException: Broken pipe (Write failed)"))
+ val elapsedTime = System.currentTimeMillis() - startTime
+ assert(elapsedTime < 20 * 1000)
+ assert(session.client.asyncRequestInterrupted)
}
}
}
@@ -243,4 +254,28 @@ class KyuubiOperationPerUserSuite
assert(rs.getString(1) === "3")
}
}
+
+ test("server info provider - server") {
+ assume(!httpMode)
+ withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "SERVER"))()() {
+ withSessionHandle { (client, handle) =>
+ val req = new TGetInfoReq()
+ req.setSessionHandle(handle)
+ req.setInfoType(TGetInfoType.CLI_DBMS_NAME)
+ assert(client.GetInfo(req).getInfoValue.getStringValue === "Apache Kyuubi (Incubating)")
+ }
+ }
+ }
+
+ test("server info provider - engine") {
+ assume(!httpMode)
+ withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "ENGINE"))()() {
+ withSessionHandle { (client, handle) =>
+ val req = new TGetInfoReq()
+ req.setSessionHandle(handle)
+ req.setInfoType(TGetInfoType.CLI_DBMS_NAME)
+ assert(client.GetInfo(req).getInfoValue.getStringValue === "Spark SQL")
+ }
+ }
+ }
}