You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2023/05/12 00:30:28 UTC
[kyuubi] branch master updated: [KYUUBI #4792] [MINOR] Enhance hardcode session keywords and remove unused code
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 3fc23970c [KYUUBI #4792] [MINOR] Enhance hardcode session keywords and remove unused code
3fc23970c is described below
commit 3fc23970c6b971089a381d7b2900f02e23b2cdb7
Author: fwang12 <fw...@ebay.com>
AuthorDate: Fri May 12 08:30:18 2023 +0800
[KYUUBI #4792] [MINOR] Enhance hardcode session keywords and remove unused code
### _Why are the changes needed?_
As title.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4792 from turboFei/remove_unused.
Closes #4792
fe568af7e [fwang12] server conf
97f510020 [fwang12] save
c44e70a58 [fwang12] remove unused code
Authored-by: fwang12 <fw...@ebay.com>
Signed-off-by: fwang12 <fw...@ebay.com>
---
.../apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala | 6 +++---
.../apache/kyuubi/engine/spark/session/SparkSessionImpl.scala | 6 +++---
.../apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala | 8 ++++----
.../src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala | 1 +
.../src/main/scala/org/apache/kyuubi/session/package.scala | 2 ++
.../main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala | 9 ++-------
.../src/main/scala/org/apache/kyuubi/engine/EngineRef.scala | 4 +++-
.../main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala | 6 ++----
8 files changed, 20 insertions(+), 22 deletions(-)
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
index 09f5ac943..10a48f1a1 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
@@ -30,7 +30,7 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtoco
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
-import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
+import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager, USE_CATALOG, USE_DATABASE}
class FlinkSessionImpl(
protocol: TProtocolVersion,
@@ -62,10 +62,10 @@ class FlinkSessionImpl(
val executor = fSession.createExecutor(Configuration.fromMap(fSession.getSessionConfig))
val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition { case (k, _) =>
- Array("use:catalog", "use:database").contains(k)
+ Array(USE_CATALOG, USE_DATABASE).contains(k)
}
- useCatalogAndDatabaseConf.get("use:catalog").foreach { catalog =>
+ useCatalogAndDatabaseConf.get(USE_CATALOG).foreach { catalog =>
try {
executor.executeStatement(OperationHandle.create, s"USE CATALOG $catalog")
} catch {
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
index 96fc43e85..40a0c8c7f 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
@@ -28,7 +28,7 @@ import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
import org.apache.kyuubi.engine.spark.udf.KDFRegistry
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.operation.{Operation, OperationHandle}
-import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
+import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager, USE_CATALOG, USE_DATABASE}
class SparkSessionImpl(
protocol: TProtocolVersion,
@@ -56,10 +56,10 @@ class SparkSessionImpl(
override def open(): Unit = {
val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition { case (k, _) =>
- Array("use:catalog", "use:database").contains(k)
+ Array(USE_CATALOG, USE_DATABASE).contains(k)
}
- useCatalogAndDatabaseConf.get("use:catalog").foreach { catalog =>
+ useCatalogAndDatabaseConf.get(USE_CATALOG).foreach { catalog =>
try {
SparkCatalogShim().setCurrentCatalog(spark, catalog)
} catch {
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
index 1a96bed73..6869e54dc 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
@@ -35,7 +35,7 @@ import org.apache.kyuubi.engine.trino.{TrinoConf, TrinoContext, TrinoStatement}
import org.apache.kyuubi.engine.trino.event.TrinoSessionEvent
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.operation.{Operation, OperationHandle}
-import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
+import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager, USE_CATALOG, USE_DATABASE}
class TrinoSessionImpl(
protocol: TProtocolVersion,
@@ -59,12 +59,12 @@ class TrinoSessionImpl(
override def open(): Unit = {
val (useCatalogAndDatabaseConf, _) = normalizedConf.partition { case (k, _) =>
- Array("use:catalog", "use:database").contains(k)
+ Array(USE_CATALOG, USE_DATABASE).contains(k)
}
useCatalogAndDatabaseConf.foreach {
- case ("use:catalog", catalog) => catalogName = catalog
- case ("use:database", database) => databaseName = database
+ case (USE_CATALOG, catalog) => catalogName = catalog
+ case (USE_DATABASE, database) => databaseName = database
}
val httpClient = new OkHttpClient.Builder().build()
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 8da336102..9ae1898c5 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1863,6 +1863,7 @@ object KyuubiConf {
.doc("This parameter is introduced as a server-side parameter " +
"controlling the upper limit of the engine pool.")
.version("1.4.0")
+ .serverOnly
.intConf
.checkValue(s => s > 0 && s < 33, "Invalid engine pool threshold, it should be in [1, 32]")
.createWithDefault(9)
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/package.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/package.scala
index 40abded98..63b17dd4d 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/package.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/package.scala
@@ -25,6 +25,8 @@ package object session {
val HIVECONF_PREFIX = "hiveconf:"
val HIVEVAR_PREFIX = "hivevar:"
val METACONF_PREFIX = "metaconf:"
+ val USE_CATALOG = "use:catalog"
+ val USE_DATABASE = "use:database"
val SPARK_PREFIX = "spark."
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
index a63646d9b..28806e915 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
@@ -26,7 +26,7 @@ import scala.util.{Failure, Success, Try}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.io.Text
-import org.apache.hadoop.security.{Credentials, SecurityUtil, UserGroupInformation}
+import org.apache.hadoop.security.{Credentials, SecurityUtil}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -36,12 +36,7 @@ import org.apache.kyuubi.config.KyuubiConf
object KyuubiHadoopUtils extends Logging {
- private val subjectField =
- classOf[UserGroupInformation].getDeclaredField("subject")
- subjectField.setAccessible(true)
-
- private val tokenMapField =
- classOf[Credentials].getDeclaredField("tokenMap")
+ private val tokenMapField = classOf[Credentials].getDeclaredField("tokenMap")
tokenMapField.setAccessible(true)
def newHadoopConf(
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 765f36949..227cdd6c8 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -43,6 +43,7 @@ import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT,
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.plugin.GroupProvider
+import org.apache.kyuubi.server.KyuubiServer
/**
* The description and functionality of an engine at server side
@@ -69,7 +70,8 @@ private[kyuubi] class EngineRef(
private val engineType: EngineType = EngineType.withName(conf.get(ENGINE_TYPE))
// Server-side engine pool size threshold
- private val poolThreshold: Int = conf.get(ENGINE_POOL_SIZE_THRESHOLD)
+ private val poolThreshold: Int = Option(KyuubiServer.kyuubiServer).map(_.getConf)
+ .getOrElse(KyuubiConf()).get(ENGINE_POOL_SIZE_THRESHOLD)
private val clientPoolSize: Int = conf.get(ENGINE_POOL_SIZE)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 80df5c44d..8d5132ba4 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -64,11 +64,9 @@ class KyuubiSessionImpl(
}
}
- // TODO: needs improve the hardcode
optimizedConf.foreach {
- case ("use:catalog", _) =>
- case ("use:database", _) =>
- case ("kyuubi.engine.pool.size.threshold", _) =>
+ case (USE_CATALOG, _) =>
+ case (USE_DATABASE, _) =>
case (key, value) => sessionConf.set(key, value)
}