You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/04/04 02:56:53 UTC
[kyuubi] branch master updated: [KYUUBI #4522] `use:catalog` should execute before than `use:database`
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new f0796ec07 [KYUUBI #4522] `use:catalog` should execute before than `use:database`
f0796ec07 is described below
commit f0796ec0783b386ef3530da73ac744e375986ca5
Author: senmiaoliu <se...@trip.com>
AuthorDate: Tue Apr 4 10:56:43 2023 +0800
[KYUUBI #4522] `use:catalog` should execute before than `use:database`
### _Why are the changes needed?_
close #4522
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4648 from lsm1/fix/kyuubi_4522.
Closes #4522
e06046899 [senmiaoliu] use foreach
bd83d6623 [senmiaoliu] spilt narmalizedConf
4d8445aac [senmiaoliu] avoid sort
eda34d480 [senmiaoliu] use catalog first
Authored-by: senmiaoliu <se...@trip.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../engine/flink/session/FlinkSessionImpl.scala | 45 +++++++++++++---------
.../engine/spark/session/SparkSessionImpl.scala | 41 ++++++++++++--------
.../engine/trino/session/TrinoSessionImpl.scala | 8 +++-
3 files changed, 58 insertions(+), 36 deletions(-)
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
index 75087b48c..a4b6a8a90 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
@@ -57,25 +57,34 @@ class FlinkSessionImpl(
override def open(): Unit = {
executor.openSession(handle.identifier.toString)
- normalizedConf.foreach {
- case ("use:catalog", catalog) =>
- val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
- try {
- tableEnv.useCatalog(catalog)
- } catch {
- case NonFatal(e) =>
+
+ val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition { case (k, _) =>
+ Array("use:catalog", "use:database").contains(k)
+ }
+
+ useCatalogAndDatabaseConf.get("use:catalog").foreach { catalog =>
+ val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+ try {
+ tableEnv.useCatalog(catalog)
+ } catch {
+ case NonFatal(e) =>
+ throw e
+ }
+ }
+
+ useCatalogAndDatabaseConf.get("use:database").foreach { database =>
+ val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+ try {
+ tableEnv.useDatabase(database)
+ } catch {
+ case NonFatal(e) =>
+ if (database != "default") {
throw e
- }
- case ("use:database", database) =>
- val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
- try {
- tableEnv.useDatabase(database)
- } catch {
- case NonFatal(e) =>
- if (database != "default") {
- throw e
- }
- }
+ }
+ }
+ }
+
+ otherConf.foreach {
case (key, value) => setModifiableConfig(key, value)
}
super.open()
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
index 78164ff5f..96fc43e85 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
@@ -54,22 +54,31 @@ class SparkSessionImpl(
private val sessionEvent = SessionEvent(this)
override def open(): Unit = {
- normalizedConf.foreach {
- case ("use:catalog", catalog) =>
- try {
- SparkCatalogShim().setCurrentCatalog(spark, catalog)
- } catch {
- case e if e.getMessage.contains("Cannot find catalog plugin class for catalog") =>
- warn(e.getMessage())
- }
- case ("use:database", database) =>
- try {
- SparkCatalogShim().setCurrentDatabase(spark, database)
- } catch {
- case e
- if database == "default" && e.getMessage != null &&
- e.getMessage.contains("not found") =>
- }
+
+ val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition { case (k, _) =>
+ Array("use:catalog", "use:database").contains(k)
+ }
+
+ useCatalogAndDatabaseConf.get("use:catalog").foreach { catalog =>
+ try {
+ SparkCatalogShim().setCurrentCatalog(spark, catalog)
+ } catch {
+ case e if e.getMessage.contains("Cannot find catalog plugin class for catalog") =>
+ warn(e.getMessage())
+ }
+ }
+
+ useCatalogAndDatabaseConf.get("use:database").foreach { database =>
+ try {
+ SparkCatalogShim().setCurrentDatabase(spark, database)
+ } catch {
+ case e
+ if database == "default" && e.getMessage != null &&
+ e.getMessage.contains("not found") =>
+ }
+ }
+
+ otherConf.foreach {
case (key, value) => setModifiableConfig(key, value)
}
KDFRegistry.registerAll(spark)
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
index 81f973b1b..1a96bed73 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
@@ -57,10 +57,14 @@ class TrinoSessionImpl(
private val sessionEvent = TrinoSessionEvent(this)
override def open(): Unit = {
- normalizedConf.foreach {
+
+ val (useCatalogAndDatabaseConf, _) = normalizedConf.partition { case (k, _) =>
+ Array("use:catalog", "use:database").contains(k)
+ }
+
+ useCatalogAndDatabaseConf.foreach {
case ("use:catalog", catalog) => catalogName = catalog
case ("use:database", database) => databaseName = database
- case _ => // do nothing
}
val httpClient = new OkHttpClient.Builder().build()