You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2021/07/28 13:33:03 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #857] Support init SQL for each session

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new b963b75  [KYUUBI #857] Support init SQL for each session
b963b75 is described below

commit b963b75d580b8306b134b39ab876816c2d4c7a9f
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Wed Jul 28 21:32:48 2021 +0800

    [KYUUBI #857] Support init SQL for each session
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request
    
    Closes #857 from pan3793/seesion-init-sql.
    
    Closes #857
    
    f6f59196 [Cheng Pan] remove unnecessary change
    4960830a [Cheng Pan] run ENGINE_SESSION_INITIALIZE_SQL when engine startup
    6e8100fa [Cheng Pan] address comments
    08f4af62 [Cheng Pan] address comments
    2bb1bb85 [Cheng Pan] fix import
    880b38ec [Cheng Pan] session initialize sql will only execute once in single session mode
    2a646890 [Cheng Pan] test session initialize sql will not execute in single session mode
    f03f3eff [Cheng Pan] update doc
    6e4a72f2 [Cheng Pan] address comments
    34285e25 [Cheng Pan] format
    6d978025 [Cheng Pan] revert log4j conf change
    881a89e0 [Cheng Pan] improve test
    b9ec18c9 [Cheng Pan] fix ut and add log
    b4f586d8 [Cheng Pan] Support init SQL for each session
    
    Lead-authored-by: Cheng Pan <ch...@apache.org>
    Co-authored-by: Cheng Pan <37...@qq.com>
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 docs/deployment/settings.md                        |  3 +-
 .../kyuubi/engine/spark/SparkSQLEngine.scala       | 15 ++++++++-
 .../spark/session/SparkSQLSessionManager.scala     | 12 +++++++-
 .../engine/spark/session/SingleSessionSuite.scala  | 34 ++++++++++++++++++--
 .../org/apache/kyuubi/config/KyuubiConf.scala      | 14 +++++++--
 .../kyuubi/engine/spark/InitializeSQLSuite.scala   | 36 ++++++++++++++++++----
 6 files changed, 100 insertions(+), 14 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 1f7857a..274d254 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -146,7 +146,8 @@ kyuubi\.engine<br>\.deregister\.exception<br>\.classes|<div style='width: 65pt;w
 kyuubi\.engine<br>\.deregister\.exception<br>\.messages|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of exception messages. If there is any exception thrown, whose message or stacktrace matches the specified message list, the engine would deregister itself.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.2.0</div>
 kyuubi\.engine<br>\.deregister\.exception<br>\.ttl|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT30M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Time to live(TTL) for exceptions pattern specified in kyuubi.engine.deregister.exception.classes and kyuubi.engine.deregister.exception.messages to deregister engines. Once the total error count hits the kyuubi.engine.deregister.job.max.failures within the TTL, an engine will deregister itself [...]
 kyuubi\.engine<br>\.deregister\.job\.max<br>\.failures|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>4</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Number of failures of job before deregistering the engine.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.2.0</div>
-kyuubi\.engine<br>\.initialize\.sql|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SHOW DATABASES</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.2.0</div>
+kyuubi\.engine<br>\.initialize\.sql|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SHOW DATABASES</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.2.0</div>
+kyuubi\.engine\.session<br>\.initialize\.sql|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SHOW DATABASES</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.3.0</div>
 kyuubi\.engine\.share<br>\.level|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>USER</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Engines will be shared in different levels, available configs are: <ul> <li>CONNECTION: engine will not be shared but only used by the current client connection</li> <li>USER: engine will be shared by all sessions created by a unique username, see also kyuubi.engine.share.level.sub.domain</li> <li>SERVER: the Ap [...]
 kyuubi\.engine\.share<br>\.level\.sub\.domain|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Allow end-users to create a sub-domain for the share level of an engine. A sub-domain is a case-insensitive string values in `^[a-zA-Z_]{1,10}$` form. For example, for `USER` share level, an end-user can share a certain engine within a sub-domain, not for all of its clients. End-users a [...]
 kyuubi\.engine\.single<br>\.spark\.session|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>false</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When set to true, this engine is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.3.0</div>
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index 367dc60..edc4369 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -100,7 +100,20 @@ object SparkSQLEngine extends Logging {
     }
 
     val session = SparkSession.builder.config(sparkConf).getOrCreate
-    kyuubiConf.get(KyuubiConf.ENGINE_INITIALIZE_SQL).split(";").foreach(session.sql(_).show)
+    kyuubiConf.get(KyuubiConf.ENGINE_INITIALIZE_SQL)
+      .split(";")
+      .filter(_.trim.nonEmpty)
+      .foreach { sql =>
+        info(s"Execute engine initializing sql: $sql")
+        session.sql(sql).show
+      }
+    kyuubiConf.get(KyuubiConf.ENGINE_SESSION_INITIALIZE_SQL)
+      .split(";")
+      .filter(_.trim.nonEmpty)
+      .foreach { sql =>
+        info(s"Execute session initializing sql: $sql")
+        session.sql(sql).show
+      }
     session
   }
 
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index 521022e..d1874a8 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -21,7 +21,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
 import org.apache.spark.sql.{AnalysisException, SparkSession}
 
 import org.apache.kyuubi.KyuubiSQLException
-import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SHARE_LEVEL, ENGINE_SINGLE_SPARK_SESSION}
+import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.engine.ShareLevel
 import org.apache.kyuubi.engine.spark.SparkSQLEngine
 import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
@@ -59,6 +59,16 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
         spark.newSession()
       }
 
+      if (!this.conf.get(ENGINE_SINGLE_SPARK_SESSION)) {
+        this.conf.get(ENGINE_SESSION_INITIALIZE_SQL)
+          .split(";")
+          .filter(_.trim.nonEmpty)
+          .foreach { sql =>
+            info(s"Execute session initializing sql: $sql")
+            sparkSession.sql(sql).show
+          }
+      }
+
       sessionImpl.normalizedConf.foreach {
         case ("use:database", database) => sparkSession.catalog.setCurrentDatabase(database)
         case (key, value) => setModifiableConfig(sparkSession, key, value)
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SingleSessionSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SingleSessionSuite.scala
index 81e6c38..0dc48f7 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SingleSessionSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SingleSessionSuite.scala
@@ -25,11 +25,24 @@ class SingleSessionSuite extends WithSparkSQLEngine with JDBCTestUtils {
 
   override def withKyuubiConf: Map[String, String] = {
     Map(ENGINE_SHARE_LEVEL.key -> "SERVER",
-      ENGINE_SINGLE_SPARK_SESSION.key -> "true")
+      ENGINE_SINGLE_SPARK_SESSION.key -> "true",
+      (ENGINE_SESSION_INITIALIZE_SQL.key,
+        "CREATE DATABASE IF NOT EXISTS INIT_DB_SOLO;" +
+        "CREATE TABLE IF NOT EXISTS INIT_DB_SOLO.test(a int) USING CSV;" +
+        "INSERT INTO INIT_DB_SOLO.test VALUES (2);")
+    )
   }
 
-  override protected def jdbcUrl: String = s"jdbc:hive2://${engine.connectionUrl}/;#" +
-    s"spark.ui.enabled=false"
+  override def afterAll(): Unit = {
+    withJdbcStatement() { statement =>
+      statement.executeQuery("DROP TABLE IF EXISTS INIT_DB_SOLO.test")
+      statement.executeQuery("DROP DATABASE IF EXISTS INIT_DB_SOLO")
+    }
+    super.afterAll()
+  }
+
+  override protected def jdbcUrl: String =
+    s"jdbc:hive2://${engine.connectionUrl}/;#spark.ui.enabled=false"
 
   test("test single session") {
     withJdbcStatement() { statement =>
@@ -41,4 +54,19 @@ class SingleSessionSuite extends WithSparkSQLEngine with JDBCTestUtils {
     }
   }
 
+  test("test session initialize sql will only execute once in single session mode") {
+    withJdbcStatement() { statement =>
+      val result = statement.executeQuery("SELECT COUNT(*) FROM INIT_DB_SOLO.test WHERE a = 2")
+      assert(result.next())
+      assert(result.getLong(1) == 1)
+      assert(!result.next())
+    }
+    // the same session
+    withJdbcStatement() { statement =>
+      val result = statement.executeQuery("SELECT COUNT(*) FROM INIT_DB_SOLO.test WHERE a = 2")
+      assert(result.next())
+      assert(result.getLong(1) == 1)
+      assert(!result.next())
+    }
+  }
 }
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 5aeb4ee..36ef367 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -608,11 +608,21 @@ object KyuubiConf {
 
   val ENGINE_INITIALIZE_SQL: ConfigEntry[String] = buildConf("engine.initialize.sql")
     .doc("SemiColon-separated list of SQL statements to be initialized in the newly created " +
-      "engine before queries.")
+      "engine before queries. This configuration can not be used in JDBC url due to " +
+      "the limitation of Beeline/JDBC driver.")
     .version("1.2.0")
     .stringConf
     .createWithDefault("SHOW DATABASES")
 
+  val ENGINE_SESSION_INITIALIZE_SQL: ConfigEntry[String] =
+    buildConf("engine.session.initialize.sql")
+      .doc("SemiColon-separated list of SQL statements to be initialized in the newly created " +
+        "engine session before queries. This configuration can not be used in JDBC url due to " +
+        "the limitation of Beeline/JDBC driver.")
+      .version("1.3.0")
+      .stringConf
+      .createWithDefault("SHOW DATABASES")
+
   val ENGINE_DEREGISTER_EXCEPTION_CLASSES: ConfigEntry[Seq[String]] =
     buildConf("engine.deregister.exception.classes")
       .doc("A comma separated list of exception classes. If there is any exception thrown," +
@@ -660,7 +670,7 @@ object KyuubiConf {
     .stringConf
     .createOptional
 
-  val ENGINE_SINGLE_SPARK_SESSION =
+  val ENGINE_SINGLE_SPARK_SESSION: ConfigEntry[Boolean] =
     buildConf("engine.single.spark.session")
       .doc("When set to true, this engine is running in a single session mode. " +
         "All the JDBC/ODBC connections share the temporary views, function registries, " +
diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/spark/InitializeSQLSuite.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/spark/InitializeSQLSuite.scala
index 6189f90..e4e0eb4 100644
--- a/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/spark/InitializeSQLSuite.scala
+++ b/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/spark/InitializeSQLSuite.scala
@@ -19,15 +19,20 @@ package org.apache.kyuubi.engine.spark
 
 import org.apache.kyuubi.WithKyuubiServer
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_INITIALIZE_SQL
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_INITIALIZE_SQL, ENGINE_SESSION_INITIALIZE_SQL}
 import org.apache.kyuubi.operation.JDBCTestUtils
 
 class InitializeSQLSuite extends WithKyuubiServer with JDBCTestUtils {
   override protected val conf: KyuubiConf = {
-    KyuubiConf().set(ENGINE_INITIALIZE_SQL.key,
-      "CREATE DATABASE IF NOT EXISTS INIT_DB;" +
-        "CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" +
-        "INSERT OVERWRITE TABLE INIT_DB.test VALUES (1);")
+    KyuubiConf()
+      .set(ENGINE_INITIALIZE_SQL.key,
+        "CREATE DATABASE IF NOT EXISTS INIT_DB;" +
+          "CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" +
+          "INSERT OVERWRITE TABLE INIT_DB.test VALUES (1);")
+      .set(ENGINE_SESSION_INITIALIZE_SQL.key,
+        "CREATE DATABASE IF NOT EXISTS INIT_DB;" +
+          "CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" +
+          "INSERT INTO INIT_DB.test VALUES (2);")
   }
 
   override def afterAll(): Unit = {
@@ -40,12 +45,31 @@ class InitializeSQLSuite extends WithKyuubiServer with JDBCTestUtils {
 
   test("KYUUBI-457: Support configurable initialize sql statement for engine startup") {
     withJdbcStatement() { statement =>
-      val result = statement.executeQuery("SELECT * FROM INIT_DB.test")
+      val result = statement.executeQuery("SELECT * FROM INIT_DB.test WHERE a = 1")
       assert(result.next())
       assert(result.getInt(1) == 1)
       assert(!result.next())
     }
   }
 
+  test("Support configurable initialize sql statement for engine session creation") {
+    var currentSessionCnt: Long = -1
+    withJdbcStatement() { statement =>
+      val result = statement.executeQuery("SELECT COUNT(*) FROM INIT_DB.test WHERE a = 2")
+      assert(result.next())
+      currentSessionCnt = result.getLong(1)
+      assert(currentSessionCnt >= 1)
+      assert(!result.next())
+    }
+    // new session
+    withJdbcStatement() { statement =>
+      val result = statement.executeQuery("SELECT COUNT(*) FROM INIT_DB.test WHERE a = 2")
+      assert(result.next())
+      // use great than or equals to support concurrent test
+      assert(result.getLong(1) >= currentSessionCnt + 1)
+      assert(!result.next())
+    }
+  }
+
   override protected def jdbcUrl: String = getJdbcUrl
 }