You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2021/08/10 09:28:07 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #914] Use isEmpty to trigger default init queries

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6dd8dfb  [KYUUBI #914] Use isEmpty to trigger default init queries
6dd8dfb is described below

commit 6dd8dfb70c2cbe7f92cf42ca68444c404767f699
Author: Kent Yao <ya...@apache.org>
AuthorDate: Tue Aug 10 17:27:47 2021 +0800

    [KYUUBI #914] Use isEmpty to trigger default init queries
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    
    1 use isEmpty instead show for the init SQLs
    2 add job desc for SQL tab of Spark UI
    3 make it simple.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #914 from yaooqinn/isempty.
    
    Closes #914
    
    70785da7 [Kent Yao] Use isEmpty to trigger default init queries
    438009e4 [Kent Yao] Use isEmpty to trigger default init queries
    
    Authored-by: Kent Yao <ya...@apache.org>
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 docs/deployment/settings.md                        |  4 +--
 .../kyuubi/engine/spark/SparkSQLEngine.scala       | 19 +++++--------
 .../spark/session/SparkSQLSessionManager.scala     | 21 +++++++--------
 .../src/main/scala/org/apache/kyuubi/Utils.scala   |  4 +--
 .../org/apache/kyuubi/config/ConfigBuilder.scala   |  4 +--
 .../org/apache/kyuubi/config/ConfigHelpers.scala   |  4 +--
 .../org/apache/kyuubi/config/KyuubiConf.scala      | 31 ++++++++++++----------
 .../apache/kyuubi/config/ConfigBuilderSuite.scala  |  2 +-
 .../org/apache/kyuubi/metrics/MetricsConf.scala    |  2 +-
 9 files changed, 42 insertions(+), 49 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 61c7f5d..0cc54c5 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -148,8 +148,8 @@ kyuubi\.engine<br>\.deregister\.exception<br>\.ttl|<div style='width: 65pt;word-
 kyuubi\.engine<br>\.deregister\.job\.max<br>\.failures|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>4</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Number of failures of job before deregistering the engine.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.2.0</div>
 kyuubi\.engine\.event<br>\.json\.log\.path|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>/tmp/kyuubi/events</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The location of all the engine events go for the builtin JSON logger</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.3.0</div>
 kyuubi\.engine\.event<br>\.loggers|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of engine history loggers, where engine/session/operation etc events go.<ul> <li>SPARK: the events will be written to the spark history events</li> <li>JSON: the events will be written to the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be done</li> <li>CUSTOM: to be done. [...]
-kyuubi\.engine<br>\.initialize\.sql|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SHOW DATABASES</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.2.0</div>
-kyuubi\.engine\.session<br>\.initialize\.sql|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SHOW DATABASES</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.3.0</div>
+kyuubi\.engine<br>\.initialize\.sql|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SHOW DATABASES</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.2.0</div>
+kyuubi\.engine\.session<br>\.initialize\.sql|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SHOW DATABASES</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.3.0</div>
 kyuubi\.engine\.share<br>\.level|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>USER</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Engines will be shared in different levels, available configs are: <ul> <li>CONNECTION: engine will not be shared but only used by the current client connection</li> <li>USER: engine will be shared by all sessions created by a unique username, see also kyuubi.engine.share.level.sub.domain</li> <li>SERVER: the Ap [...]
 kyuubi\.engine\.share<br>\.level\.sub\.domain|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Allow end-users to create a sub-domain for the share level of an engine. A sub-domain is a case-insensitive string values in `^[a-zA-Z_]{1,10}$` form. For example, for `USER` share level, an end-user can share a certain engine within a sub-domain, not for all of its clients. End-users a [...]
 kyuubi\.engine\.single<br>\.spark\.session|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>false</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When set to true, this engine is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.3.0</div>
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index 3c18cf2..b21fa35 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.kyuubi.Logging
 import org.apache.kyuubi.Utils._
 import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch
 import org.apache.kyuubi.engine.spark.events.{EngineEvent, EventLoggingService}
 import org.apache.kyuubi.ha.HighAvailabilityConf._
@@ -115,19 +116,11 @@ object SparkSQLEngine extends Logging {
     }
 
     val session = SparkSession.builder.config(sparkConf).getOrCreate
-    kyuubiConf.get(KyuubiConf.ENGINE_INITIALIZE_SQL)
-      .split(";")
-      .filter(_.trim.nonEmpty)
-      .foreach { sql =>
-        info(s"Execute engine initializing sql: $sql")
-        session.sql(sql).show
-      }
-    kyuubiConf.get(KyuubiConf.ENGINE_SESSION_INITIALIZE_SQL)
-      .split(";")
-      .filter(_.trim.nonEmpty)
-      .foreach { sql =>
-        info(s"Execute session initializing sql: $sql")
-        session.sql(sql).show
+    (kyuubiConf.get(ENGINE_INITIALIZE_SQL) ++ kyuubiConf.get(ENGINE_SESSION_INITIALIZE_SQL))
+      .foreach { sqlStr =>
+        session.sparkContext.setJobGroup(appName, sqlStr, interruptOnCancel = true)
+        debug(s"Execute session initializing sql: $sqlStr")
+        session.sql(sqlStr).isEmpty
       }
     session
   }
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index d1874a8..d919479 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -43,6 +43,8 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
 
   val operationManager = new SparkSQLOperationManager()
 
+  private lazy val singleSparkSession = conf.get(ENGINE_SINGLE_SPARK_SESSION)
+
   override def openSession(
       protocol: TProtocolVersion,
       user: String,
@@ -53,20 +55,15 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
     val sessionImpl = new SparkSessionImpl(protocol, user, password, ipAddress, conf, this)
     val handle = sessionImpl.handle
     try {
-      val sparkSession = if (this.conf.get(ENGINE_SINGLE_SPARK_SESSION)) {
+      val sparkSession = if (singleSparkSession) {
         spark
       } else {
-        spark.newSession()
-      }
-
-      if (!this.conf.get(ENGINE_SINGLE_SPARK_SESSION)) {
-        this.conf.get(ENGINE_SESSION_INITIALIZE_SQL)
-          .split(";")
-          .filter(_.trim.nonEmpty)
-          .foreach { sql =>
-            info(s"Execute session initializing sql: $sql")
-            sparkSession.sql(sql).show
-          }
+        val ss = spark.newSession()
+        this.conf.get(ENGINE_SESSION_INITIALIZE_SQL).foreach { sqlStr =>
+          ss.sparkContext.setJobGroup(handle.identifier.toString, sqlStr, interruptOnCancel = true)
+          ss.sql(sqlStr).isEmpty
+        }
+        ss
       }
 
       sessionImpl.normalizedConf.foreach {
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
index c1860f3..5fd4768 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
@@ -35,9 +35,9 @@ object Utils extends Logging {
 
   import org.apache.kyuubi.config.KyuubiConf._
 
-  def strToSeq(s: String): Seq[String] = {
+  def strToSeq(s: String, sp: String = ","): Seq[String] = {
     require(s != null)
-    s.split(",").map(_.trim).filter(_.nonEmpty)
+    s.split(sp).map(_.trim).filter(_.nonEmpty)
   }
 
   def getSystemProperties: Map[String, String] = {
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/ConfigBuilder.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/ConfigBuilder.scala
index 8012a55..a0a497a 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/ConfigBuilder.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/ConfigBuilder.scala
@@ -145,9 +145,9 @@ private[kyuubi] case class TypedConfigBuilder[T](
   }
 
   /** Turns the config entry into a sequence of values of the underlying type. */
-  def toSequence: TypedConfigBuilder[Seq[T]] = {
+  def toSequence(sp: String = ","): TypedConfigBuilder[Seq[T]] = {
     parent._type = "seq"
-    TypedConfigBuilder(parent, strToSeq(_, fromStr), seqToStr(_, toStr))
+    TypedConfigBuilder(parent, strToSeq(_, fromStr, sp), seqToStr(_, toStr))
   }
 
   def createOptional: OptionalConfigEntry[T] = {
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/ConfigHelpers.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/ConfigHelpers.scala
index 3538e1a..225f1b5 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/ConfigHelpers.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/ConfigHelpers.scala
@@ -21,8 +21,8 @@ import org.apache.kyuubi.Utils
 
 object ConfigHelpers {
 
-  def strToSeq[T](str: String, converter: String => T): Seq[T] = {
-    Utils.strToSeq(str).map(converter)
+  def strToSeq[T](str: String, converter: String => T, sp: String): Seq[T] = {
+    Utils.strToSeq(str, sp).map(converter)
   }
 
   def seqToStr[T](v: Seq[T], stringConverter: T => String): String = {
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index aa11fcf..069dd71 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -463,7 +463,7 @@ object KyuubiConf {
         " configurations via SET syntax.")
       .version("1.2.0")
       .stringConf
-      .toSequence
+      .toSequence()
       .createWithDefault(Nil)
 
   val SESSION_CONF_RESTRICT_LIST: ConfigEntry[Seq[String]] =
@@ -476,7 +476,7 @@ object KyuubiConf {
         " configurations via SET syntax.")
       .version("1.2.0")
       .stringConf
-      .toSequence
+      .toSequence()
       .createWithDefault(Nil)
 
   val SERVER_EXEC_POOL_SIZE: ConfigEntry[Int] =
@@ -606,22 +606,25 @@ object KyuubiConf {
     .version("1.2.0")
     .fallbackConf(LEGACY_ENGINE_SHARE_LEVEL)
 
-  val ENGINE_INITIALIZE_SQL: ConfigEntry[String] = buildConf("engine.initialize.sql")
-    .doc("SemiColon-separated list of SQL statements to be initialized in the newly created " +
-      "engine before queries. This configuration can not be used in JDBC url due to " +
-      "the limitation of Beeline/JDBC driver.")
-    .version("1.2.0")
-    .stringConf
-    .createWithDefault("SHOW DATABASES")
+  val ENGINE_INITIALIZE_SQL: ConfigEntry[Seq[String]] =
+    buildConf("engine.initialize.sql")
+      .doc("SemiColon-separated list of SQL statements to be initialized in the newly created " +
+        "engine before queries. This configuration can not be used in JDBC url due to " +
+        "the limitation of Beeline/JDBC driver.")
+      .version("1.2.0")
+      .stringConf
+      .toSequence(";")
+      .createWithDefaultString("SHOW DATABASES")
 
-  val ENGINE_SESSION_INITIALIZE_SQL: ConfigEntry[String] =
+  val ENGINE_SESSION_INITIALIZE_SQL: ConfigEntry[Seq[String]] =
     buildConf("engine.session.initialize.sql")
       .doc("SemiColon-separated list of SQL statements to be initialized in the newly created " +
         "engine session before queries. This configuration can not be used in JDBC url due to " +
         "the limitation of Beeline/JDBC driver.")
       .version("1.3.0")
       .stringConf
-      .createWithDefault("SHOW DATABASES")
+      .toSequence(";")
+      .createWithDefaultString("SHOW DATABASES")
 
   val ENGINE_DEREGISTER_EXCEPTION_CLASSES: ConfigEntry[Seq[String]] =
     buildConf("engine.deregister.exception.classes")
@@ -629,7 +632,7 @@ object KyuubiConf {
         " whose class matches the specified classes, the engine would deregister itself.")
       .version("1.2.0")
       .stringConf
-      .toSequence
+      .toSequence()
       .createWithDefault(Nil)
 
   val ENGINE_DEREGISTER_EXCEPTION_MESSAGES: ConfigEntry[Seq[String]] =
@@ -639,7 +642,7 @@ object KyuubiConf {
         " deregister itself.")
       .version("1.2.0")
       .stringConf
-      .toSequence
+      .toSequence()
       .createWithDefault(Nil)
 
   val ENGINE_DEREGISTER_JOB_MAX_FAILURES: ConfigEntry[Int] =
@@ -698,7 +701,7 @@ object KyuubiConf {
       .version("1.3.0")
       .stringConf
       .transform(_.toUpperCase(Locale.ROOT))
-      .toSequence
+      .toSequence()
       .checkValue(_.toSet.subsetOf(Set("SPARK", "JSON", "JDBC", "CUSTOM")),
         "Unsupported event loggers")
       .createWithDefault(Nil)
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/config/ConfigBuilderSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/config/ConfigBuilderSuite.scala
index 8dc27fa..df16964 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/config/ConfigBuilderSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/config/ConfigBuilderSuite.scala
@@ -64,7 +64,7 @@ class ConfigBuilderSuite extends KyuubiFunSuite {
     assert(stringConf.defaultVal.get === "kent, yao")
     val sequenceConf = ConfigBuilder("kyuubi.sequence.conf")
       .stringConf
-      .toSequence
+      .toSequence()
       .createWithDefault(Nil)
     assert(sequenceConf.defaultVal.get.isEmpty)
     val kyuubiConf = KyuubiConf().set(sequenceConf.key, "kyuubi,kent")
diff --git a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConf.scala b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConf.scala
index b3fa798..5e1112d 100644
--- a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConf.scala
+++ b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConf.scala
@@ -45,7 +45,7 @@ object MetricsConf {
     .version("1.2.0")
     .stringConf
     .transform(_.toUpperCase())
-    .toSequence
+    .toSequence()
     .createWithDefault(Seq(JSON.toString))
 
   val METRICS_CONSOLE_INTERVAL: ConfigEntry[Long] = buildConf("metrics.console.interval")