You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2023/05/12 00:30:28 UTC

[kyuubi] branch master updated: [KYUUBI #4792] [MINOR] Enhance hardcode session keywords and remove unused code

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fc23970c [KYUUBI #4792] [MINOR] Enhance hardcode session keywords and remove unused code
3fc23970c is described below

commit 3fc23970c6b971089a381d7b2900f02e23b2cdb7
Author: fwang12 <fw...@ebay.com>
AuthorDate: Fri May 12 08:30:18 2023 +0800

    [KYUUBI #4792] [MINOR] Enhance hardcode session keywords and remove unused code
    
    ### _Why are the changes needed?_
    
    As title.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4792 from turboFei/remove_unused.
    
    Closes #4792
    
    fe568af7e [fwang12] server conf
    97f510020 [fwang12] save
    c44e70a58 [fwang12] remove unused code
    
    Authored-by: fwang12 <fw...@ebay.com>
    Signed-off-by: fwang12 <fw...@ebay.com>
---
 .../apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala    | 6 +++---
 .../apache/kyuubi/engine/spark/session/SparkSessionImpl.scala    | 6 +++---
 .../apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala    | 8 ++++----
 .../src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala     | 1 +
 .../src/main/scala/org/apache/kyuubi/session/package.scala       | 2 ++
 .../main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala    | 9 ++-------
 .../src/main/scala/org/apache/kyuubi/engine/EngineRef.scala      | 4 +++-
 .../main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala | 6 ++----
 8 files changed, 20 insertions(+), 22 deletions(-)

diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
index 09f5ac943..10a48f1a1 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
@@ -30,7 +30,7 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtoco
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
 import org.apache.kyuubi.engine.flink.FlinkEngineUtils
-import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
+import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager, USE_CATALOG, USE_DATABASE}
 
 class FlinkSessionImpl(
     protocol: TProtocolVersion,
@@ -62,10 +62,10 @@ class FlinkSessionImpl(
     val executor = fSession.createExecutor(Configuration.fromMap(fSession.getSessionConfig))
 
     val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition { case (k, _) =>
-      Array("use:catalog", "use:database").contains(k)
+      Array(USE_CATALOG, USE_DATABASE).contains(k)
     }
 
-    useCatalogAndDatabaseConf.get("use:catalog").foreach { catalog =>
+    useCatalogAndDatabaseConf.get(USE_CATALOG).foreach { catalog =>
       try {
         executor.executeStatement(OperationHandle.create, s"USE CATALOG $catalog")
       } catch {
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
index 96fc43e85..40a0c8c7f 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
@@ -28,7 +28,7 @@ import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
 import org.apache.kyuubi.engine.spark.udf.KDFRegistry
 import org.apache.kyuubi.events.EventBus
 import org.apache.kyuubi.operation.{Operation, OperationHandle}
-import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
+import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager, USE_CATALOG, USE_DATABASE}
 
 class SparkSessionImpl(
     protocol: TProtocolVersion,
@@ -56,10 +56,10 @@ class SparkSessionImpl(
   override def open(): Unit = {
 
     val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition { case (k, _) =>
-      Array("use:catalog", "use:database").contains(k)
+      Array(USE_CATALOG, USE_DATABASE).contains(k)
     }
 
-    useCatalogAndDatabaseConf.get("use:catalog").foreach { catalog =>
+    useCatalogAndDatabaseConf.get(USE_CATALOG).foreach { catalog =>
       try {
         SparkCatalogShim().setCurrentCatalog(spark, catalog)
       } catch {
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
index 1a96bed73..6869e54dc 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
@@ -35,7 +35,7 @@ import org.apache.kyuubi.engine.trino.{TrinoConf, TrinoContext, TrinoStatement}
 import org.apache.kyuubi.engine.trino.event.TrinoSessionEvent
 import org.apache.kyuubi.events.EventBus
 import org.apache.kyuubi.operation.{Operation, OperationHandle}
-import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
+import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager, USE_CATALOG, USE_DATABASE}
 
 class TrinoSessionImpl(
     protocol: TProtocolVersion,
@@ -59,12 +59,12 @@ class TrinoSessionImpl(
   override def open(): Unit = {
 
     val (useCatalogAndDatabaseConf, _) = normalizedConf.partition { case (k, _) =>
-      Array("use:catalog", "use:database").contains(k)
+      Array(USE_CATALOG, USE_DATABASE).contains(k)
     }
 
     useCatalogAndDatabaseConf.foreach {
-      case ("use:catalog", catalog) => catalogName = catalog
-      case ("use:database", database) => databaseName = database
+      case (USE_CATALOG, catalog) => catalogName = catalog
+      case (USE_DATABASE, database) => databaseName = database
     }
 
     val httpClient = new OkHttpClient.Builder().build()
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 8da336102..9ae1898c5 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1863,6 +1863,7 @@ object KyuubiConf {
     .doc("This parameter is introduced as a server-side parameter " +
       "controlling the upper limit of the engine pool.")
     .version("1.4.0")
+    .serverOnly
     .intConf
     .checkValue(s => s > 0 && s < 33, "Invalid engine pool threshold, it should be in [1, 32]")
     .createWithDefault(9)
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/package.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/package.scala
index 40abded98..63b17dd4d 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/package.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/package.scala
@@ -25,6 +25,8 @@ package object session {
   val HIVECONF_PREFIX = "hiveconf:"
   val HIVEVAR_PREFIX = "hivevar:"
   val METACONF_PREFIX = "metaconf:"
+  val USE_CATALOG = "use:catalog"
+  val USE_DATABASE = "use:database"
 
   val SPARK_PREFIX = "spark."
 }
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
index a63646d9b..28806e915 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
@@ -26,7 +26,7 @@ import scala.util.{Failure, Success, Try}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
 import org.apache.hadoop.io.Text
-import org.apache.hadoop.security.{Credentials, SecurityUtil, UserGroupInformation}
+import org.apache.hadoop.security.{Credentials, SecurityUtil}
 import org.apache.hadoop.security.token.{Token, TokenIdentifier}
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
 import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -36,12 +36,7 @@ import org.apache.kyuubi.config.KyuubiConf
 
 object KyuubiHadoopUtils extends Logging {
 
-  private val subjectField =
-    classOf[UserGroupInformation].getDeclaredField("subject")
-  subjectField.setAccessible(true)
-
-  private val tokenMapField =
-    classOf[Credentials].getDeclaredField("tokenMap")
+  private val tokenMapField = classOf[Credentials].getDeclaredField("tokenMap")
   tokenMapField.setAccessible(true)
 
   def newHadoopConf(
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 765f36949..227cdd6c8 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -43,6 +43,7 @@ import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT,
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.plugin.GroupProvider
+import org.apache.kyuubi.server.KyuubiServer
 
 /**
  * The description and functionality of an engine at server side
@@ -69,7 +70,8 @@ private[kyuubi] class EngineRef(
   private val engineType: EngineType = EngineType.withName(conf.get(ENGINE_TYPE))
 
   // Server-side engine pool size threshold
-  private val poolThreshold: Int = conf.get(ENGINE_POOL_SIZE_THRESHOLD)
+  private val poolThreshold: Int = Option(KyuubiServer.kyuubiServer).map(_.getConf)
+    .getOrElse(KyuubiConf()).get(ENGINE_POOL_SIZE_THRESHOLD)
 
   private val clientPoolSize: Int = conf.get(ENGINE_POOL_SIZE)
 
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 80df5c44d..8d5132ba4 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -64,11 +64,9 @@ class KyuubiSessionImpl(
     }
   }
 
-  // TODO: needs improve the hardcode
   optimizedConf.foreach {
-    case ("use:catalog", _) =>
-    case ("use:database", _) =>
-    case ("kyuubi.engine.pool.size.threshold", _) =>
+    case (USE_CATALOG, _) =>
+    case (USE_DATABASE, _) =>
     case (key, value) => sessionConf.set(key, value)
   }