You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2024/01/30 15:40:03 UTC
(kyuubi) branch branch-1.8 updated: [KYUUBI #6028] Exited spark-submit process should not block batch submit queue
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.8 by this push:
new 6b12e31b6 [KYUUBI #6028] Exited spark-submit process should not block batch submit queue
6b12e31b6 is described below
commit 6b12e31b68640b20951b0336cd190ef15f3b34e5
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Tue Jan 30 23:38:52 2024 +0800
[KYUUBI #6028] Exited spark-submit process should not block batch submit queue
While enabling batch implementation V2 with the following configurations
```
kyuubi.batch.impl.version=2
kyuubi.batch.submitter.enabled=true
kyuubi.batch.submitter.threads=48
spark.master=yarn
spark.submit.deployMode=cluster
spark.yarn.submit.waitAppCompletion=false
```
I found that the batch jobs will be blocked in the DB queue once a YARN queue has no resources, this brings an issue, the subsequential batch jobs that are going to be submitted to another YARN queue also be queued in DB, rather than YARN queue.
```
mysql> select state, engine_state, count(1) from metadata where state in ('INITIALIZED', 'PENDING', 'RUNNING') group by state, engine_state;
+-------------+--------------+----------+
| state | engine_state | count(1) |
+-------------+--------------+----------+
| INITIALIZED | NULL | 166 |
| PENDING | NULL | 1 |
| RUNNING | PENDING | 148 |
| RUNNING | RUNNING | 415 |
+-------------+--------------+----------+
```
The submitter queue whose size is controlled by `kyuubi.batch.submitter.threads` is designed to address the `spark-submit` process concurrency issue, too many `spark-submit` processes may run out of the Kyuubi server's node CPU/memory resources and eventually crash the service. For Spark YARN cluster mode, if set `spark.yarn.submit.waitAppCompletion=false`, the local `spark-submit` process exits immediately once the Application goes ACCEPTED status, even no resource could be allocated [...]
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)
Pass GA, and roll out into internal cluster.
---
- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6028 from pan3793/batch-submit.
Closes #6028
05fcc758f [Cheng Pan] Exited spark-submit process should not block batch submit queue
Authored-by: Cheng Pan <ch...@apache.org>
Signed-off-by: Cheng Pan <ch...@apache.org>
(cherry picked from commit 208354c327104be0549503f760ce61d376396025)
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala | 2 +-
.../src/main/scala/org/apache/kyuubi/engine/EngineRef.scala | 2 +-
.../src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala | 8 +++-----
.../org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala | 2 +-
.../scala/org/apache/kyuubi/operation/BatchJobSubmission.scala | 3 +++
.../main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala | 9 +++++++--
.../scala/org/apache/kyuubi/session/KyuubiBatchSession.scala | 2 ++
7 files changed, 18 insertions(+), 10 deletions(-)
diff --git a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
index ea804575e..79852a42b 100644
--- a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
+++ b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
@@ -193,7 +193,7 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
Seq("_123", "spark_exec", "spark@", "a" * 238).foreach { invalid =>
conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, invalid)
val builder = new SparkProcessBuilder("test", conf)
- val e = intercept[KyuubiException](builder.validateConf)
+ val e = intercept[KyuubiException](builder.validateConf())
assert(e.getMessage === s"'$invalid' in spark.kubernetes.executor.podNamePrefix is" +
s" invalid. must conform https://kubernetes.io/docs/concepts/overview/" +
"working-with-objects/names/#dns-subdomain-names and the value length <= 237")
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index e15645133..952ed1ef5 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -215,7 +215,7 @@ private[kyuubi] class EngineRef(
acquiredPermit = true
val redactedCmd = builder.toString
info(s"Launching engine:\n$redactedCmd")
- builder.validateConf
+ builder.validateConf()
val process = builder.start
var exitValue: Option[Int] = None
var lastApplicationInfo: Option[ApplicationInfo] = None
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index 23196bf1d..64276493a 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -24,7 +24,6 @@ import java.nio.file.{Files, Path, Paths}
import scala.collection.JavaConverters._
-import com.google.common.annotations.VisibleForTesting
import com.google.common.collect.EvictingQueue
import org.apache.commons.lang3.StringUtils.containsIgnoreCase
@@ -164,9 +163,8 @@ trait ProcBuilder {
// Visible for test
@volatile private[kyuubi] var logCaptureThreadReleased: Boolean = true
private var logCaptureThread: Thread = _
- private var process: Process = _
- @VisibleForTesting
- @volatile private[kyuubi] var processLaunched: Boolean = _
+ @volatile private[kyuubi] var process: Process = _
+ @volatile private[kyuubi] var processLaunched: Boolean = false
private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized {
val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT)
@@ -204,7 +202,7 @@ trait ProcBuilder {
file
}
- def validateConf: Unit = {}
+ def validateConf(): Unit = {}
final def start: Process = synchronized {
process = processBuilder.start()
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 972284f5c..f991d9f2b 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -295,7 +295,7 @@ class SparkProcessBuilder(
conf.getOption(KUBERNETES_NAMESPACE_KEY).orElse(defaultsConf.get(KUBERNETES_NAMESPACE_KEY))
}
- override def validateConf: Unit = Validator.validateConf(conf)
+ override def validateConf(): Unit = Validator.validateConf(conf)
// For spark on kubernetes, spark pod using env SPARK_USER_NAME as current user
def setSparkUserName(userName: String, buffer: mutable.Buffer[String]): Unit = {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 3bb17e1aa..b8795ef61 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -100,6 +100,9 @@ class BatchJobSubmission(
getOperationLog)
}
+ def startupProcessAlive: Boolean =
+ builder.processLaunched && Option(builder.process).exists(_.isAlive)
+
override def currentApplicationInfo(): Option[ApplicationInfo] = {
if (isTerminal(state) && _applicationInfo.map(_.state).exists(ApplicationState.isTerminated)) {
return _applicationInfo
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
index 2bfbbce2a..e2736267f 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
@@ -94,8 +94,13 @@ class KyuubiBatchService(
metadata.appState match {
// app that is not submitted to resource manager
case None | Some(ApplicationState.NOT_FOUND) => false
- // app that is pending in resource manager
- case Some(ApplicationState.PENDING) => false
+ // app that is pending in resource manager while the local startup
+ // process is alive. For example, in Spark YARN cluster mode, if set
+ // spark.yarn.submit.waitAppCompletion=false, the local spark-submit
+ // process exits immediately once Application goes ACCEPTED status,
+ // even no resource could be allocated for the AM container.
+ case Some(ApplicationState.PENDING) if batchSession.startupProcessAlive =>
+ false
// not sure, added for safe
case Some(ApplicationState.UNKNOWN) => false
case _ => true
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
index 8489e6d30..6a9f81045 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
@@ -112,6 +112,8 @@ class KyuubiBatchSession(
batchArgs,
metadata)
+ def startupProcessAlive: Boolean = batchJobSubmissionOp.startupProcessAlive
+
private def waitMetadataRequestsRetryCompletion(): Unit = {
val batchId = batchJobSubmissionOp.batchId
sessionManager.getMetadataRequestsRetryRef(batchId).foreach {