You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/04/04 02:56:53 UTC

[kyuubi] branch master updated: [KYUUBI #4522] `use:catalog` should execute before than `use:database`

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new f0796ec07 [KYUUBI #4522] `use:catalog` should execute before than `use:database`
f0796ec07 is described below

commit f0796ec0783b386ef3530da73ac744e375986ca5
Author: senmiaoliu <se...@trip.com>
AuthorDate: Tue Apr 4 10:56:43 2023 +0800

    [KYUUBI #4522] `use:catalog` should execute before than `use:database`
    
    ### _Why are the changes needed?_
    
    close #4522
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4648 from lsm1/fix/kyuubi_4522.
    
    Closes #4522
    
    e06046899 [senmiaoliu] use foreach
    bd83d6623 [senmiaoliu] spilt narmalizedConf
    4d8445aac [senmiaoliu] avoid sort
    eda34d480 [senmiaoliu] use catalog first
    
    Authored-by: senmiaoliu <se...@trip.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../engine/flink/session/FlinkSessionImpl.scala    | 45 +++++++++++++---------
 .../engine/spark/session/SparkSessionImpl.scala    | 41 ++++++++++++--------
 .../engine/trino/session/TrinoSessionImpl.scala    |  8 +++-
 3 files changed, 58 insertions(+), 36 deletions(-)

diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
index 75087b48c..a4b6a8a90 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
@@ -57,25 +57,34 @@ class FlinkSessionImpl(
 
   override def open(): Unit = {
     executor.openSession(handle.identifier.toString)
-    normalizedConf.foreach {
-      case ("use:catalog", catalog) =>
-        val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
-        try {
-          tableEnv.useCatalog(catalog)
-        } catch {
-          case NonFatal(e) =>
+
+    val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition { case (k, _) =>
+      Array("use:catalog", "use:database").contains(k)
+    }
+
+    useCatalogAndDatabaseConf.get("use:catalog").foreach { catalog =>
+      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+      try {
+        tableEnv.useCatalog(catalog)
+      } catch {
+        case NonFatal(e) =>
+          throw e
+      }
+    }
+
+    useCatalogAndDatabaseConf.get("use:database").foreach { database =>
+      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+      try {
+        tableEnv.useDatabase(database)
+      } catch {
+        case NonFatal(e) =>
+          if (database != "default") {
             throw e
-        }
-      case ("use:database", database) =>
-        val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
-        try {
-          tableEnv.useDatabase(database)
-        } catch {
-          case NonFatal(e) =>
-            if (database != "default") {
-              throw e
-            }
-        }
+          }
+      }
+    }
+
+    otherConf.foreach {
       case (key, value) => setModifiableConfig(key, value)
     }
     super.open()
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
index 78164ff5f..96fc43e85 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
@@ -54,22 +54,31 @@ class SparkSessionImpl(
   private val sessionEvent = SessionEvent(this)
 
   override def open(): Unit = {
-    normalizedConf.foreach {
-      case ("use:catalog", catalog) =>
-        try {
-          SparkCatalogShim().setCurrentCatalog(spark, catalog)
-        } catch {
-          case e if e.getMessage.contains("Cannot find catalog plugin class for catalog") =>
-            warn(e.getMessage())
-        }
-      case ("use:database", database) =>
-        try {
-          SparkCatalogShim().setCurrentDatabase(spark, database)
-        } catch {
-          case e
-              if database == "default" && e.getMessage != null &&
-                e.getMessage.contains("not found") =>
-        }
+
+    val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition { case (k, _) =>
+      Array("use:catalog", "use:database").contains(k)
+    }
+
+    useCatalogAndDatabaseConf.get("use:catalog").foreach { catalog =>
+      try {
+        SparkCatalogShim().setCurrentCatalog(spark, catalog)
+      } catch {
+        case e if e.getMessage.contains("Cannot find catalog plugin class for catalog") =>
+          warn(e.getMessage())
+      }
+    }
+
+    useCatalogAndDatabaseConf.get("use:database").foreach { database =>
+      try {
+        SparkCatalogShim().setCurrentDatabase(spark, database)
+      } catch {
+        case e
+            if database == "default" && e.getMessage != null &&
+              e.getMessage.contains("not found") =>
+      }
+    }
+
+    otherConf.foreach {
       case (key, value) => setModifiableConfig(key, value)
     }
     KDFRegistry.registerAll(spark)
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
index 81f973b1b..1a96bed73 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
@@ -57,10 +57,14 @@ class TrinoSessionImpl(
   private val sessionEvent = TrinoSessionEvent(this)
 
   override def open(): Unit = {
-    normalizedConf.foreach {
+
+    val (useCatalogAndDatabaseConf, _) = normalizedConf.partition { case (k, _) =>
+      Array("use:catalog", "use:database").contains(k)
+    }
+
+    useCatalogAndDatabaseConf.foreach {
       case ("use:catalog", catalog) => catalogName = catalog
       case ("use:database", database) => databaseName = database
-      case _ => // do nothing
     }
 
     val httpClient = new OkHttpClient.Builder().build()