You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/04/16 17:27:37 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #2308][SUB-TASK][KPIP-4] Batch job configuration ignore list and pre-defined configuration in server-side

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b42c735f [KYUUBI #2308][SUB-TASK][KPIP-4] Batch job configuration ignore list and pre-defined configuration in server-side
4b42c735f is described below

commit 4b42c735f79f6b2d17cba09e956b2b9fc21d6592
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Sun Apr 17 01:27:27 2022 +0800

    [KYUUBI #2308][SUB-TASK][KPIP-4] Batch job configuration ignore list and pre-defined configuration in server-side
    
    ### _Why are the changes needed?_
    
    To close #2308
    
    Support to ignore some batch configuration items and pre-define some configuration in server side.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2386 from turboFei/KYUUBI_2308_kpip4_batch_conf.
    
    Closes #2308
    
    2e517ec4 [Fei Wang] [KYUUBI #2308][SUB-TASK][KPIP-4] Batch job configuration ignore list and server predefined configurations
    
    Authored-by: Fei Wang <fw...@ebay.com>
    Signed-off-by: Fei Wang <fw...@ebay.com>
---
 docs/deployment/settings.md                         |  1 +
 .../scala/org/apache/kyuubi/config/KyuubiConf.scala | 21 +++++++++++++++++++++
 .../org/apache/kyuubi/session/SessionManager.scala  | 18 ++++++++++++++++++
 .../org/apache/kyuubi/config/KyuubiConfSuite.scala  |  8 ++++++++
 .../kyuubi/operation/BatchJobSubmission.scala       |  3 ++-
 .../kyuubi/session/KyuubiBatchSessionImpl.scala     |  6 +++++-
 .../operation/KyuubiBatchYarnClusterSuite.scala     |  7 +++++--
 7 files changed, 60 insertions(+), 4 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 17b07ced3..304e6441e 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -159,6 +159,7 @@ Key | Default | Meaning | Type | Since
 Key | Default | Meaning | Type | Since
 --- | --- | --- | --- | ---
 <code>kyuubi.batch.application.check.interval</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT5S</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The interval to check batch job application information.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.6.0</div>
+<code>kyuubi.batch.conf.ignore.list</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of ignored keys for batch conf. If the batch conf contains any of them, the key and the corresponding value will be removed silently during batch job submission. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tamper [...]
 
 
 ### Credentials
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 273831b06..d202f761e 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -103,6 +103,11 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
     sys.env ++ getAllWithPrefix(KYUUBI_ENGINE_ENV_PREFIX, "")
   }
 
+  /** Get all batch conf as map */
+  def getBatchConf(batchType: String): Map[String, String] = {
+    getAllWithPrefix(s"$KYUUBI_BATCH_CONF_PREFIX.${batchType.toLowerCase(Locale.ROOT)}", "")
+  }
+
   /**
    * Retrieve key-value pairs from [[KyuubiConf]] starting with `dropped.remainder`, and put them to
    * the result map with the `dropped` of key being dropped.
@@ -165,6 +170,7 @@ object KyuubiConf {
   final val KYUUBI_CONF_FILE_NAME = "kyuubi-defaults.conf"
   final val KYUUBI_HOME = "KYUUBI_HOME"
   final val KYUUBI_ENGINE_ENV_PREFIX = "kyuubi.engineEnv"
+  final val KYUUBI_BATCH_CONF_PREFIX = "kyuubi.batchConf"
 
   val kyuubiConfEntries: java.util.Map[String, ConfigEntry[_]] =
     java.util.Collections.synchronizedMap(new java.util.HashMap[String, ConfigEntry[_]]())
@@ -813,6 +819,21 @@ object KyuubiConf {
       .timeConf
       .createWithDefaultString("PT5S")
 
+  val BATCH_CONF_IGNORE_LIST: ConfigEntry[Seq[String]] =
+    buildConf("kyuubi.batch.conf.ignore.list")
+      .doc("A comma separated list of ignored keys for batch conf. If the batch conf contains" +
+        " any of them, the key and the corresponding value will be removed silently during batch" +
+        " job submission." +
+        " Note that this rule is for server-side protection defined via administrators to" +
+        " prevent some essential configs from tampering." +
+        " You can also pre-define some config for batch job submission with prefix:" +
+        " kyuubi.batchConf.[batchType]. For example, you can pre-define `spark.master`" +
+        " for spark batch job with key `kyuubi.batchConf.spark.spark.master`.")
+      .version("1.6.0")
+      .stringConf
+      .toSequence()
+      .createWithDefault(Nil)
+
   val SERVER_EXEC_POOL_SIZE: ConfigEntry[Int] =
     buildConf("kyuubi.backend.server.exec.pool.size")
       .doc("Number of threads in the operation execution thread pool of Kyuubi server")
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
index 992902c40..c0025f9b0 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
@@ -162,10 +162,13 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
 
   private var _confRestrictList: Set[String] = _
   private var _confIgnoreList: Set[String] = _
+  private var _batchConfIgnoreList: Set[String] = _
   private lazy val _confRestrictMatchList: Set[String] =
     _confRestrictList.filter(_.endsWith(".*")).map(_.stripSuffix(".*"))
   private lazy val _confIgnoreMatchList: Set[String] =
     _confIgnoreList.filter(_.endsWith(".*")).map(_.stripSuffix(".*"))
+  private lazy val _batchConfIgnoreMatchList: Set[String] =
+    _batchConfIgnoreList.filter(_.endsWith(".*")).map(_.stripSuffix(".*"))
 
   // strip prefix and validate whether if key is restricted, ignored or valid
   def validateKey(key: String, value: String): Option[(String, String)] = {
@@ -206,6 +209,20 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
     case (k, v) => validateKey(k, v)
   }
 
+  // validate whether if a batch key should be ignored
+  def validateBatchKey(key: String, value: String): Option[(String, String)] = {
+    if (_batchConfIgnoreMatchList.exists(key.startsWith(_)) || _batchConfIgnoreList.contains(key)) {
+      warn(s"$key is a ignored batch key according to the server-side configuration")
+      None
+    } else {
+      Some((key, value))
+    }
+  }
+
+  def validateBatchConf(config: Map[String, String]): Map[String, String] = config.flatMap {
+    case (k, v) => validateBatchKey(k, v)
+  }
+
   override def initialize(conf: KyuubiConf): Unit = synchronized {
     addService(operationManager)
     initOperationLogRootDir()
@@ -232,6 +249,7 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
 
     _confRestrictList = conf.get(SESSION_CONF_RESTRICT_LIST).toSet
     _confIgnoreList = conf.get(SESSION_CONF_IGNORE_LIST).toSet
+    _batchConfIgnoreList = conf.get(BATCH_CONF_IGNORE_LIST).toSet
 
     execPool = ThreadUtils.newDaemonQueuedThreadPool(
       poolSize,
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala
index ee2d562e4..05cf23396 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala
@@ -171,4 +171,12 @@ class KyuubiConfSuite extends KyuubiFunSuite {
     kyuubiConf.set(ENGINE_SHARE_LEVEL_SUBDOMAIN.key, path)
     assert(kyuubiConf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN).get == path)
   }
+
+  test("get pre-defined batch conf for different batch types") {
+    val kyuubiConf = KyuubiConf()
+    kyuubiConf.set(s"$KYUUBI_BATCH_CONF_PREFIX.spark.spark.yarn.tags", "kyuubi")
+    kyuubiConf.set(s"$KYUUBI_BATCH_CONF_PREFIX.flink.yarn.tags", "kyuubi")
+    assert(kyuubiConf.getBatchConf("spark") == Map("spark.yarn.tags" -> "kyuubi"))
+    assert(kyuubiConf.getBatchConf("flink") == Map("yarn.tags" -> "kyuubi"))
+  }
 }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 8303238c2..d8e901187 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -82,11 +82,12 @@ class BatchJobSubmission(session: KyuubiBatchSessionImpl, batchRequest: BatchReq
   private def submitBatchJob(): Unit = {
     builder = Option(batchRequest.batchType).map(_.toUpperCase(Locale.ROOT)) match {
       case Some("SPARK") =>
+        val batchSparkConf = session.sessionConf.getBatchConf("spark")
         new SparkBatchProcessBuilder(
           session.user,
           session.sessionConf,
           session.batchId,
-          batchRequest,
+          batchRequest.copy(conf = batchSparkConf ++ batchRequest.conf),
           getOperationLog)
 
       case _ =>
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
index 57beeb4eb..497c4fb88 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
@@ -40,8 +40,12 @@ class KyuubiBatchSessionImpl(
   override val handle: SessionHandle = sessionManager.newBatchSessionHandle(protocol)
   val batchId: String = handle.identifier.toString
 
+  // TODO: Support batch conf advisor
+  override val normalizedConf: Map[String, String] =
+    sessionManager.validateBatchConf(Option(batchRequest.conf).getOrElse(Map.empty))
+
   private[kyuubi] lazy val batchJobSubmissionOp = sessionManager.operationManager
-    .newBatchJobSubmissionOperation(this, batchRequest)
+    .newBatchJobSubmissionOperation(this, batchRequest.copy(conf = normalizedConf))
 
   private val sessionEvent = KyuubiSessionEvent(this)
   EventBus.post(sessionEvent)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala
index 7f8835277..28f434faf 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala
@@ -32,7 +32,10 @@ import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager}
 class KyuubiBatchYarnClusterSuite extends WithKyuubiServerOnYarn {
   override protected val connectionConf: Map[String, String] = Map.empty
 
-  override protected val kyuubiServerConf: KyuubiConf = KyuubiConf()
+  override protected val kyuubiServerConf: KyuubiConf = {
+    KyuubiConf().set(s"$KYUUBI_BATCH_CONF_PREFIX.spark.spark.master", "yarn")
+      .set(BATCH_CONF_IGNORE_LIST, Seq("spark.master"))
+  }
 
   private def sessionManager(): KyuubiSessionManager =
     server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
@@ -47,7 +50,7 @@ class KyuubiBatchYarnClusterSuite extends WithKyuubiServerOnYarn {
       sparkProcessBuilder.mainClass,
       "spark-batch-submission",
       Map(
-        "spark.master" -> "yarn",
+        "spark.master" -> "local",
         s"spark.${ENGINE_SPARK_MAX_LIFETIME.key}" -> "5000",
         s"spark.${ENGINE_CHECK_INTERVAL.key}" -> "1000"),
       Seq.empty[String])