You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2024/01/30 15:40:03 UTC

(kyuubi) branch branch-1.8 updated: [KYUUBI #6028] Exited spark-submit process should not block batch submit queue

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.8 by this push:
     new 6b12e31b6 [KYUUBI #6028] Exited spark-submit process should not block batch submit queue
6b12e31b6 is described below

commit 6b12e31b68640b20951b0336cd190ef15f3b34e5
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Tue Jan 30 23:38:52 2024 +0800

    [KYUUBI #6028] Exited spark-submit process should not block batch submit queue
    
    While enabling batch implementation V2 with the following configurations
    ```
    kyuubi.batch.impl.version=2
    kyuubi.batch.submitter.enabled=true
    kyuubi.batch.submitter.threads=48
    spark.master=yarn
    spark.submit.deployMode=cluster
    spark.yarn.submit.waitAppCompletion=false
    ```
    
    I found that the batch jobs will be blocked in the DB queue once a YARN queue has no resources, this brings an issue, the subsequential batch jobs that are going to be submitted to another YARN queue also be queued in DB, rather than YARN queue.
    
    ```
    mysql> select state, engine_state, count(1) from metadata where state in ('INITIALIZED', 'PENDING', 'RUNNING') group by state, engine_state;
    +-------------+--------------+----------+
    | state       | engine_state | count(1) |
    +-------------+--------------+----------+
    | INITIALIZED | NULL         |      166 |
    | PENDING     | NULL         |        1 |
    | RUNNING     | PENDING      |      148 |
    | RUNNING     | RUNNING      |      415 |
    +-------------+--------------+----------+
    ```
    
    The submitter queue whose size is controlled by `kyuubi.batch.submitter.threads` is designed to address the `spark-submit` process concurrency issue, too many `spark-submit` processes may run out of the Kyuubi server's node CPU/memory resources and eventually crash the service. For Spark YARN cluster mode, if set `spark.yarn.submit.waitAppCompletion=false`, the local `spark-submit` process exits immediately once the Application goes ACCEPTED status, even no resource could be allocated [...]
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing functionality to change)
    
    Pass GA, and roll out into internal cluster.
    
    ---
    
    - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6028 from pan3793/batch-submit.
    
    Closes #6028
    
    05fcc758f [Cheng Pan] Exited spark-submit process should not block batch submit queue
    
    Authored-by: Cheng Pan <ch...@apache.org>
    Signed-off-by: Cheng Pan <ch...@apache.org>
    (cherry picked from commit 208354c327104be0549503f760ce61d376396025)
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala      | 2 +-
 .../src/main/scala/org/apache/kyuubi/engine/EngineRef.scala      | 2 +-
 .../src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala    | 8 +++-----
 .../org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala     | 2 +-
 .../scala/org/apache/kyuubi/operation/BatchJobSubmission.scala   | 3 +++
 .../main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala | 9 +++++++--
 .../scala/org/apache/kyuubi/session/KyuubiBatchSession.scala     | 2 ++
 7 files changed, 18 insertions(+), 10 deletions(-)

diff --git a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
index ea804575e..79852a42b 100644
--- a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
+++ b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
@@ -193,7 +193,7 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
     Seq("_123", "spark_exec", "spark@", "a" * 238).foreach { invalid =>
       conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, invalid)
       val builder = new SparkProcessBuilder("test", conf)
-      val e = intercept[KyuubiException](builder.validateConf)
+      val e = intercept[KyuubiException](builder.validateConf())
       assert(e.getMessage === s"'$invalid' in spark.kubernetes.executor.podNamePrefix is" +
         s" invalid. must conform https://kubernetes.io/docs/concepts/overview/" +
         "working-with-objects/names/#dns-subdomain-names and the value length <= 237")
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index e15645133..952ed1ef5 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -215,7 +215,7 @@ private[kyuubi] class EngineRef(
       acquiredPermit = true
       val redactedCmd = builder.toString
       info(s"Launching engine:\n$redactedCmd")
-      builder.validateConf
+      builder.validateConf()
       val process = builder.start
       var exitValue: Option[Int] = None
       var lastApplicationInfo: Option[ApplicationInfo] = None
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index 23196bf1d..64276493a 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -24,7 +24,6 @@ import java.nio.file.{Files, Path, Paths}
 
 import scala.collection.JavaConverters._
 
-import com.google.common.annotations.VisibleForTesting
 import com.google.common.collect.EvictingQueue
 import org.apache.commons.lang3.StringUtils.containsIgnoreCase
 
@@ -164,9 +163,8 @@ trait ProcBuilder {
   // Visible for test
   @volatile private[kyuubi] var logCaptureThreadReleased: Boolean = true
   private var logCaptureThread: Thread = _
-  private var process: Process = _
-  @VisibleForTesting
-  @volatile private[kyuubi] var processLaunched: Boolean = _
+  @volatile private[kyuubi] var process: Process = _
+  @volatile private[kyuubi] var processLaunched: Boolean = false
 
   private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized {
     val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT)
@@ -204,7 +202,7 @@ trait ProcBuilder {
     file
   }
 
-  def validateConf: Unit = {}
+  def validateConf(): Unit = {}
 
   final def start: Process = synchronized {
     process = processBuilder.start()
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 972284f5c..f991d9f2b 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -295,7 +295,7 @@ class SparkProcessBuilder(
     conf.getOption(KUBERNETES_NAMESPACE_KEY).orElse(defaultsConf.get(KUBERNETES_NAMESPACE_KEY))
   }
 
-  override def validateConf: Unit = Validator.validateConf(conf)
+  override def validateConf(): Unit = Validator.validateConf(conf)
 
   // For spark on kubernetes, spark pod using env SPARK_USER_NAME as current user
   def setSparkUserName(userName: String, buffer: mutable.Buffer[String]): Unit = {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 3bb17e1aa..b8795ef61 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -100,6 +100,9 @@ class BatchJobSubmission(
       getOperationLog)
   }
 
+  def startupProcessAlive: Boolean =
+    builder.processLaunched && Option(builder.process).exists(_.isAlive)
+
   override def currentApplicationInfo(): Option[ApplicationInfo] = {
     if (isTerminal(state) && _applicationInfo.map(_.state).exists(ApplicationState.isTerminated)) {
       return _applicationInfo
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
index 2bfbbce2a..e2736267f 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
@@ -94,8 +94,13 @@ class KyuubiBatchService(
                   metadata.appState match {
                     // app that is not submitted to resource manager
                     case None | Some(ApplicationState.NOT_FOUND) => false
-                    // app that is pending in resource manager
-                    case Some(ApplicationState.PENDING) => false
+                    // app that is pending in resource manager while the local startup
+                    // process is alive. For example, in Spark YARN cluster mode, if set
+                    // spark.yarn.submit.waitAppCompletion=false, the local spark-submit
+                    // process exits immediately once Application goes ACCEPTED status,
+                    // even no resource could be allocated for the AM container.
+                    case Some(ApplicationState.PENDING) if batchSession.startupProcessAlive =>
+                      false
                     // not sure, added for safe
                     case Some(ApplicationState.UNKNOWN) => false
                     case _ => true
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
index 8489e6d30..6a9f81045 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
@@ -112,6 +112,8 @@ class KyuubiBatchSession(
       batchArgs,
       metadata)
 
+  def startupProcessAlive: Boolean = batchJobSubmissionOp.startupProcessAlive
+
   private def waitMetadataRequestsRetryCompletion(): Unit = {
     val batchId = batchJobSubmissionOp.batchId
     sessionManager.getMetadataRequestsRetryRef(batchId).foreach {