You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/03/21 16:09:05 UTC
[spark] branch master updated: [SPARK-42813][K8S] Print application info when waitAppCompletion is false
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ba1badd2adf [SPARK-42813][K8S] Print application info when waitAppCompletion is false
ba1badd2adf is described below
commit ba1badd2adfd175c6680dff90e14b8aaa6cecd78
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Tue Mar 21 09:08:18 2023 -0700
[SPARK-42813][K8S] Print application info when waitAppCompletion is false
### What changes were proposed in this pull request?
On K8s cluster mode,
1. when `spark.kubernetes.submission.waitAppCompletion=false`, print the application information on `spark-submit` exit, as it did before [SPARK-35174](https://issues.apache.org/jira/browse/SPARK-35174)
2. add `appId` in the output message
### Why are the changes needed?
On K8s cluster mode, when `spark.kubernetes.submission.waitAppCompletion=false`,
before [SPARK-35174](https://issues.apache.org/jira/browse/SPARK-35174), the `spark-submit` will exit quickly w/ the basic application information.
```
logInfo(s"Deployed Spark application ${conf.appName} with submission ID $sId into Kubernetes")
```
After [SPARK-35174](https://issues.apache.org/jira/browse/SPARK-35174), those part of code is unreachable, so nothing is output.
This PR also proposes to add `appId` in the output message, to make it consistent w/ the context (if you look at the `LoggingPodStatusWatcherImpl`, this is kind of an exception, `... application $appId ...` is used in other places), and YARN.
https://github.com/apache/spark/blob/8860f69455e5a722626194c4797b4b42cccd4510/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L1311-L1318
### Does this PR introduce _any_ user-facing change?
Yes, changes contain
1) when `spark.kubernetes.submission.waitAppCompletion=false`, the user can see the app information when `spark-submit` exit.
2) the end of `spark-submit` information contains app id now, which is consistent w/ the context and other resource managers like YARN.
### How was this patch tested?
Pass CI.
Closes #40444 from pan3793/SPARK-42813.
Authored-by: Cheng Pan <ch...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../k8s/submit/KubernetesClientApplication.scala | 9 +++++-
.../k8s/submit/LoggingPodStatusWatcher.scala | 12 ++++----
.../spark/deploy/k8s/submit/ClientSuite.scala | 32 ++++++++++++++++++++--
3 files changed, 43 insertions(+), 10 deletions(-)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 14d3c4d1f42..9f9b5655e26 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -180,8 +180,8 @@ private[spark] class Client(
throw e
}
+ val sId = Client.submissionId(conf.namespace, driverPodName)
if (conf.get(WAIT_FOR_APP_COMPLETION)) {
- val sId = Seq(conf.namespace, driverPodName).mkString(":")
breakable {
while (true) {
val podWithName = kubernetesClient
@@ -202,10 +202,17 @@ private[spark] class Client(
}
}
}
+ } else {
+ logInfo(s"Deployed Spark application ${conf.appName} with application ID ${conf.appId} " +
+ s"and submission ID $sId into Kubernetes")
}
}
}
+private[spark] object Client {
+ def submissionId(namespace: String, driverPodName: String): String = s"$namespace:$driverPodName"
+}
+
/**
* Main class and entry point of application submission in KUBERNETES mode.
*/
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
index 81d91457253..bc8b023b5ec 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
@@ -95,8 +95,9 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
this.notifyAll()
}
- override def watchOrStop(sId: String): Boolean = if (conf.get(WAIT_FOR_APP_COMPLETION)) {
- logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...")
+ override def watchOrStop(sId: String): Boolean = {
+ logInfo(s"Waiting for application ${conf.appName} with application ID ${conf.appId} " +
+ s"and submission ID $sId to finish...")
val interval = conf.get(REPORT_INTERVAL)
synchronized {
while (!podCompleted && !resourceTooOldReceived) {
@@ -109,12 +110,9 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
logInfo(
pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" }
.getOrElse("No containers were found in the driver pod."))
- logInfo(s"Application ${conf.appName} with submission ID $sId finished")
+ logInfo(s"Application ${conf.appName} with application ID ${conf.appId} " +
+ s"and submission ID $sId finished")
}
podCompleted
- } else {
- logInfo(s"Deployed Spark application ${conf.appName} with submission ID $sId into Kubernetes")
- // Always act like the application has completed since we don't want to wait for app completion
- true
}
}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index a8c25ab5002..8c2be6c142d 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -33,8 +33,10 @@ import org.scalatestplus.mockito.MockitoSugar._
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{Config, _}
+import org.apache.spark.deploy.k8s.Config.WAIT_FOR_APP_COMPLETION
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.deploy.k8s.submit.Client.submissionId
import org.apache.spark.util.Utils
class ClientSuite extends SparkFunSuite with BeforeAndAfter {
@@ -181,7 +183,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
when(podsWithNamespace.resource(fullExpectedPod())).thenReturn(namedPods)
when(namedPods.create()).thenReturn(podWithOwnerReference())
when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch])
- when(loggingPodStatusWatcher.watchOrStop(kconf.namespace + ":" + POD_NAME)).thenReturn(true)
+ val sId = submissionId(kconf.namespace, POD_NAME)
+ when(loggingPodStatusWatcher.watchOrStop(sId)).thenReturn(true)
doReturn(resourceList)
.when(kubernetesClient)
.resourceList(createdResourcesArgumentCaptor.capture())
@@ -343,6 +346,31 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
kubernetesClient,
loggingPodStatusWatcher)
submissionClient.run()
- verify(loggingPodStatusWatcher).watchOrStop(kconf.namespace + ":driver")
+ verify(loggingPodStatusWatcher).watchOrStop(submissionId(kconf.namespace, POD_NAME))
+ }
+
+ test("SPARK-42813: Print application info when waitAppCompletion is false") {
+ val appName = "SPARK-42813"
+ val logAppender = new LogAppender
+ withLogAppender(logAppender) {
+ val sparkConf = new SparkConf(loadDefaults = false)
+ .set("spark.app.name", appName)
+ .set(WAIT_FOR_APP_COMPLETION, false)
+ kconf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf,
+ resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX))
+ when(driverBuilder.buildFromFeatures(kconf, kubernetesClient))
+ .thenReturn(BUILT_KUBERNETES_SPEC)
+ val submissionClient = new Client(
+ kconf,
+ driverBuilder,
+ kubernetesClient,
+ loggingPodStatusWatcher)
+ submissionClient.run()
+ }
+ val appId = KubernetesTestConf.APP_ID
+ val sId = submissionId(kconf.namespace, POD_NAME)
+ assert(logAppender.loggingEvents.map(_.getMessage.getFormattedMessage).contains(
+ s"Deployed Spark application $appName with application ID $appId " +
+ s"and submission ID $sId into Kubernetes"))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org