You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/03/14 15:04:33 UTC
[kyuubi] branch branch-1.7 updated: [KYUUBI #4467][K8S][BATCH] Tolerate Driver Pod ephemerally invisible after submitting
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new 07e63d913 [KYUUBI #4467][K8S][BATCH] Tolerate Driver Pod ephemerally invisible after submitting
07e63d913 is described below
commit 07e63d9134aa8d1f5258e3e9d0f5118c6075fe25
Author: zwangsheng <22...@qq.com>
AuthorDate: Tue Mar 14 23:01:43 2023 +0800
[KYUUBI #4467][K8S][BATCH] Tolerate Driver Pod ephemerally invisible after submitting
The following discussion assumes using Spark cluster mode w/ `waitCompletion=false`.
In Spark on Yarn, the application is visible immediately after `spark-submit` is returned, but things are different in Spark on K8s, Driver Pod is ephemerally invisible after submitting, so NOT_FOUND is returned instead of UNKNOWN or PENDING.
To tolerate the above case, `kyuubi.engine.submit.timeout` is introduced, ApplicationManager will report UNKNOWN instead of NOT_FOUND during the Driver Pod scheduling period.
More detail in #4467
1. Remove `KubernetesApplicationOperation`'s `JpsApplicationOperation` for handle Client Deploy Mode(`YarnApplicationOperation` doesn't handle this either)
2. Add engine submit timeout for `KubernetesApplicationOperation` to return Unknown status when not found driver pod in time range.
3. GetApplicationInfo with it's submit time
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4469 from zwangsheng/4467.
Closes #4467
562b67463 [zwangsheng] [KYUUBI #4467] Fix Setting.md
362c43d1b [zwangsheng] [KYUUBI #4467] Fix Setting.md
ac69f4d81 [zwangsheng] [KYUUBI #4467] Add Config Desc
d2b9fb660 [zwangsheng] [KYUUBI #4467] save tab
eac880fcf [zwangsheng] [KYUUBI #4467] Ingnore Kubernetes Operation for client mode test
7a20b97a4 [Cheng Pan] Update kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
aa4c7716a [zwangsheng] [KYUUBI #4467] Ingnore Kubernetes Operation for client mode test
c5bd888ab [zwangsheng] [KYUUBI #4467] note it test
a86dcefba [zwangsheng] [KYUUBI #4467] Using default none
aed7f8794 [Cheng Pan] Update docs/deployment/settings.md
490df7dc0 [zwangsheng] [KYUUBI #4467] fix complie
33f3a5be8 [zwangsheng] [KYUUBI #4467] fix comments
4745790cf [zwangsheng] [KYUUBI #4467] Fix IT Test
924cfe38e [zwangsheng] [KYUUBI #4467] Fix Setting.md
5f8aeaacc [zwangsheng] [KYUUBI #4467] KubernetesApplicationOperation Wait if not fount driver pod in limit time range
Lead-authored-by: zwangsheng <22...@qq.com>
Co-authored-by: Cheng Pan <pa...@gmail.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
(cherry picked from commit b23c87c318a67445abb65252e547844471a65b3d)
Signed-off-by: Cheng Pan <ch...@apache.org>
---
docs/deployment/settings.md | 1 +
.../test/spark/SparkOnKubernetesTestsSuite.scala | 3 +-
.../org/apache/kyuubi/config/KyuubiConf.scala | 9 +++
.../kyuubi/engine/ApplicationOperation.scala | 3 +-
.../scala/org/apache/kyuubi/engine/EngineRef.scala | 5 +-
.../kyuubi/engine/JpsApplicationOperation.scala | 2 +-
.../engine/KubernetesApplicationOperation.scala | 75 +++++++++++++---------
.../kyuubi/engine/KyuubiApplicationManager.scala | 5 +-
.../kyuubi/engine/YarnApplicationOperation.scala | 2 +-
.../kyuubi/operation/BatchJobSubmission.scala | 10 ++-
.../kyuubi/server/api/v1/BatchesResource.scala | 3 +-
11 files changed, 78 insertions(+), 40 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 1488b0ab1..840d7d518 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -160,6 +160,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.engine.spark.python.env.archive | <undefined> | Portable Python env archive used for Spark engine Python language mode. [...]
| kyuubi.engine.spark.python.env.archive.exec.path | bin/python | The Python exec path under the Python env archive. [...]
| kyuubi.engine.spark.python.home.archive | <undefined> | Spark archive containing $SPARK_HOME/python directory, which is used to init session Python worker for Python language mode. [...]
+| kyuubi.engine.submit.timeout | PT30S | Period to tolerant Driver Pod ephemerally invisible after submitting. In some Resource Managers, e.g. K8s, the Driver Pod is not invisible immediately after `spark-submit` is returned. [...]
| kyuubi.engine.trino.event.loggers | JSON | A comma-separated list of engine history loggers, where engine/session/operation etc events go.<ul> <li>JSON: the events will be written to the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be done</li> <li>CUSTOM: to be done.</li></ul> [...]
| kyuubi.engine.trino.extra.classpath | <undefined> | The extra classpath for the Trino query engine, for configuring other libs which may need by the Trino engine [...]
| kyuubi.engine.trino.java.options | <undefined> | The extra Java options for the Trino query engine [...]
diff --git a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
index 14db8b408..831504608 100644
--- a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
+++ b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
@@ -125,6 +125,7 @@ class SparkClusterModeOnKubernetesSuite
override protected def jdbcUrl: String = getJdbcUrl
}
+// [KYUUBI #4467] KubernetesApplicationOperator doesn't support client mode
class KyuubiOperationKubernetesClusterClientModeSuite
extends SparkClientModeOnKubernetesSuiteBase {
private lazy val k8sOperation: KubernetesApplicationOperation = {
@@ -136,7 +137,7 @@ class KyuubiOperationKubernetesClusterClientModeSuite
private def sessionManager: KyuubiSessionManager =
server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
- test("Spark Client Mode On Kubernetes Kyuubi KubernetesApplicationOperation Suite") {
+ ignore("Spark Client Mode On Kubernetes Kyuubi KubernetesApplicationOperation Suite") {
val batchRequest = newSparkBatchRequest(conf.getAll ++ Map(
KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString))
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 9b6a8645b..fa51af7ff 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2520,6 +2520,15 @@ object KyuubiConf {
.booleanConf
.createWithDefault(true)
+ val ENGINE_SUBMIT_TIMEOUT: ConfigEntry[Long] =
+ buildConf("kyuubi.engine.submit.timeout")
+ .doc("Period to tolerant Driver Pod ephemerally invisible after submitting. " +
+ "In some Resource Managers, e.g. K8s, the Driver Pod is not invisible immediately " +
+ "after `spark-submit` is returned.")
+ .version("1.7.1")
+ .timeConf
+ .createWithDefaultString("PT30S")
+
/**
* Holds information about keys that have been deprecated.
*
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
index 93d495895..00db372ce 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
@@ -56,9 +56,10 @@ trait ApplicationOperation {
* Get the engine/application status by the unique application tag
*
* @param tag the unique application tag for engine instance.
+ * @param submitTime engine submit to resourceManager time
* @return [[ApplicationInfo]]
*/
- def getApplicationInfoByTag(tag: String): ApplicationInfo
+ def getApplicationInfoByTag(tag: String, submitTime: Option[Long] = None): ApplicationInfo
}
object ApplicationState extends Enumeration {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 84b7707e8..1f38ea3c2 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -216,7 +216,10 @@ private[kyuubi] class EngineRef(
// check the engine application state from engine manager and fast fail on engine terminate
if (exitValue == Some(0)) {
Option(engineManager).foreach { engineMgr =>
- engineMgr.getApplicationInfo(builder.clusterManager(), engineRefId).foreach { appInfo =>
+ engineMgr.getApplicationInfo(
+ builder.clusterManager(),
+ engineRefId,
+ Some(started)).foreach { appInfo =>
if (ApplicationState.isTerminated(appInfo.state)) {
MetricsSystem.tracing { ms =>
ms.incCount(MetricRegistry.name(ENGINE_FAIL, appUser))
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
index bd482b86b..ce2e05461 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
@@ -84,7 +84,7 @@ class JpsApplicationOperation extends ApplicationOperation {
killJpsApplicationByTag(tag, true)
}
- override def getApplicationInfoByTag(tag: String): ApplicationInfo = {
+ override def getApplicationInfoByTag(tag: String, submitTime: Option[Long]): ApplicationInfo = {
val commandOption = getEngine(tag)
if (commandOption.nonEmpty) {
val idAndCmd = commandOption.get
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index bee69b117..26e9202c7 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -31,16 +31,15 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
@volatile
private var kubernetesClient: KubernetesClient = _
- private var jpsOperation: JpsApplicationOperation = _
- override def initialize(conf: KyuubiConf): Unit = {
- jpsOperation = new JpsApplicationOperation
- jpsOperation.initialize(conf)
+ private var submitTimeout: Long = _
+ override def initialize(conf: KyuubiConf): Unit = {
info("Start initializing Kubernetes Client.")
kubernetesClient = KubernetesUtils.buildKubernetesClient(conf) match {
case Some(client) =>
info(s"Initialized Kubernetes Client connect to: ${client.getMasterUrl}")
+ submitTimeout = conf.get(KyuubiConf.ENGINE_SUBMIT_TIMEOUT)
client
case None =>
warn("Fail to init Kubernetes Client for Kubernetes Application Operation")
@@ -49,6 +48,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
}
override def isSupported(clusterManager: Option[String]): Boolean = {
+ // TODO add deploy mode to check whether is supported
kubernetesClient != null && clusterManager.nonEmpty &&
clusterManager.get.toLowerCase.startsWith("k8s")
}
@@ -72,8 +72,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
s"Operation of deleted appId: ${podList.get(0).getMetadata.getName} is completed")
}
} else {
- // client mode
- jpsOperation.killApplicationByTag(tag)
+ (
+ false,
+ s"Target Pod(tag: $tag) is not found, due to pod have been deleted or not created")
}
} catch {
case e: Exception =>
@@ -84,33 +85,45 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
}
}
- override def getApplicationInfoByTag(tag: String): ApplicationInfo = {
- if (kubernetesClient != null) {
- debug(s"Getting application info from Kubernetes cluster by $tag tag")
- try {
- val operation = findDriverPodByTag(tag)
- val podList = operation.list().getItems
- if (podList.size() != 0) {
- val pod = podList.get(0)
- val info = ApplicationInfo(
- // spark pods always tag label `spark-app-selector:<spark-app-id>`
- id = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL),
- name = pod.getMetadata.getName,
- state = KubernetesApplicationOperation.toApplicationState(pod.getStatus.getPhase),
- error = Option(pod.getStatus.getReason))
- debug(s"Successfully got application info by $tag: $info")
- info
- } else {
- // client mode
- jpsOperation.getApplicationInfoByTag(tag)
- }
- } catch {
- case e: Exception =>
- error(s"Failed to get application with $tag, due to ${e.getMessage}")
+ override def getApplicationInfoByTag(tag: String, submitTime: Option[Long]): ApplicationInfo = {
+ if (kubernetesClient == null) {
+ throw new IllegalStateException("Methods initialize and isSupported must be called ahead")
+ }
+ debug(s"Getting application info from Kubernetes cluster by $tag tag")
+ try {
+ val operation = findDriverPodByTag(tag)
+ val podList = operation.list().getItems
+ if (podList.size() != 0) {
+ val pod = podList.get(0)
+ val info = ApplicationInfo(
+ // spark pods always tag label `spark-app-selector:<spark-app-id>`
+ id = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL),
+ name = pod.getMetadata.getName,
+ state = KubernetesApplicationOperation.toApplicationState(pod.getStatus.getPhase),
+ error = Option(pod.getStatus.getReason))
+ debug(s"Successfully got application info by $tag: $info")
+ return info
+ }
+ // Kyuubi should wait second if pod is not be created
+ submitTime match {
+ case Some(time) =>
+ val elapsedTime = System.currentTimeMillis() - time
+ if (elapsedTime > submitTimeout) {
+ error(s"Can't find target driver pod by tag: $tag, " +
+ s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.")
+ ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
+ } else {
+ warn("Wait for driver pod to be created, " +
+ s"elapsed time: ${elapsedTime}ms, return UNKNOWN status")
+ ApplicationInfo(id = null, name = null, ApplicationState.UNKNOWN)
+ }
+ case None =>
ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
}
- } else {
- throw new IllegalStateException("Methods initialize and isSupported must be called ahead")
+ } catch {
+ case e: Exception =>
+ error(s"Failed to get application with $tag, due to ${e.getMessage}")
+ ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
}
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index 70c130012..9b23e550d 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -84,10 +84,11 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
def getApplicationInfo(
clusterManager: Option[String],
- tag: String): Option[ApplicationInfo] = {
+ tag: String,
+ submitTime: Option[Long] = None): Option[ApplicationInfo] = {
val operation = operations.find(_.isSupported(clusterManager))
operation match {
- case Some(op) => Some(op.getApplicationInfoByTag(tag))
+ case Some(op) => Some(op.getApplicationInfoByTag(tag, submitTime))
case None => None
}
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
index b38b1daa2..e836e65da 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
@@ -75,7 +75,7 @@ class YarnApplicationOperation extends ApplicationOperation with Logging {
}
}
- override def getApplicationInfoByTag(tag: String): ApplicationInfo = {
+ override def getApplicationInfoByTag(tag: String, submitTime: Option[Long]): ApplicationInfo = {
if (yarnClient != null) {
debug(s"Getting application info from Yarn cluster by $tag tag")
val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index f061d977d..883afcf58 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -105,8 +105,16 @@ class BatchJobSubmission(
override protected def currentApplicationInfo: Option[ApplicationInfo] = {
if (isTerminal(state) && _applicationInfo.nonEmpty) return _applicationInfo
// only the ApplicationInfo with non-empty id is valid for the operation
+ val submitTime = if (_appStartTime <= 0) {
+ System.currentTimeMillis()
+ } else {
+ _appStartTime
+ }
val applicationInfo =
- applicationManager.getApplicationInfo(builder.clusterManager(), batchId).filter(_.id != null)
+ applicationManager.getApplicationInfo(
+ builder.clusterManager(),
+ batchId,
+ Some(submitTime)).filter(_.id != null)
applicationInfo.foreach { _ =>
if (_appStartTime <= 0) {
_appStartTime = System.currentTimeMillis()
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 0ef3c8bac..4814996a4 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -296,7 +296,8 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
error(s"Error redirecting get batch[$batchId] to ${metadata.kyuubiInstance}", e)
val batchAppStatus = sessionManager.applicationManager.getApplicationInfo(
metadata.clusterManager,
- batchId)
+ batchId,
+ Some(metadata.createTime))
buildBatch(metadata, batchAppStatus)
}
}