You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/03/26 19:08:03 UTC

[spark] branch master updated: [SPARK-24793][K8S] Enhance spark-submit for app management

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 05168e7  [SPARK-24793][K8S] Enhance spark-submit for app management
05168e7 is described below

commit 05168e725d2a17c4164ee5f9aa068801ec2454f4
Author: Stavros Kontopoulos <st...@lightbend.com>
AuthorDate: Tue Mar 26 11:50:23 2019 -0700

    [SPARK-24793][K8S] Enhance spark-submit for app management
    
    - supports `--kill` & `--status` flags.
    - supports globs which is useful in general check this long standing [issue](https://github.com/kubernetes/kubernetes/issues/17144#issuecomment-272052461) for kubectl.
    
    Manually against running apps. Example output:
    
    Submission Id reported at launch time:
    
    ```
    2019-01-20 23:47:56 INFO  Client:58 - Waiting for application spark-pi with submissionId spark:spark-pi-1548020873671-driver to finish...
    ```
    
    Killing the app:
    
    ```
    ./bin/spark-submit --kill spark:spark-pi-1548020873671-driver --master  k8s://https://192.168.2.8:8443
    2019-01-20 23:48:07 WARN  Utils:70 - Your hostname, universe resolves to a loopback address: 127.0.0.1; using 192.168.2.8 instead (on interface wlp2s0)
    2019-01-20 23:48:07 WARN  Utils:70 - Set SPARK_LOCAL_IP if you need to bind to another address
    
    ```
    
    App terminates with 143 (SIGTERM, since we have tiny this should lead to [graceful shutdown](https://cloud.google.com/solutions/best-practices-for-building-containers)):
    
    ```
    2019-01-20 23:48:08 INFO  LoggingPodStatusWatcherImpl:58 - State changed, new state:
    	 pod name: spark-pi-1548020873671-driver
    	 namespace: spark
    	 labels: spark-app-selector -> spark-e4730c80e1014b72aa77915a2203ae05, spark-role -> driver
    	 pod uid: 0ba9a794-1cfd-11e9-8215-a434d9270a65
    	 creation time: 2019-01-20T21:47:55Z
    	 service account name: spark-sa
    	 volumes: spark-local-dir-1, spark-conf-volume, spark-sa-token-b7wcm
    	 node name: minikube
    	 start time: 2019-01-20T21:47:55Z
    	 phase: Running
    	 container status:
    		 container name: spark-kubernetes-driver
    		 container image: skonto/spark:k8s-3.0.0
    		 container state: running
    		 container started at: 2019-01-20T21:48:00Z
    2019-01-20 23:48:09 INFO  LoggingPodStatusWatcherImpl:58 - State changed, new state:
    	 pod name: spark-pi-1548020873671-driver
    	 namespace: spark
    	 labels: spark-app-selector -> spark-e4730c80e1014b72aa77915a2203ae05, spark-role -> driver
    	 pod uid: 0ba9a794-1cfd-11e9-8215-a434d9270a65
    	 creation time: 2019-01-20T21:47:55Z
    	 service account name: spark-sa
    	 volumes: spark-local-dir-1, spark-conf-volume, spark-sa-token-b7wcm
    	 node name: minikube
    	 start time: 2019-01-20T21:47:55Z
    	 phase: Failed
    	 container status:
    		 container name: spark-kubernetes-driver
    		 container image: skonto/spark:k8s-3.0.0
    		 container state: terminated
    		 container started at: 2019-01-20T21:48:00Z
    		 container finished at: 2019-01-20T21:48:08Z
    		 exit code: 143
    		 termination reason: Error
    2019-01-20 23:48:09 INFO  LoggingPodStatusWatcherImpl:58 - Container final statuses:
    	 container name: spark-kubernetes-driver
    	 container image: skonto/spark:k8s-3.0.0
    	 container state: terminated
    	 container started at: 2019-01-20T21:48:00Z
    	 container finished at: 2019-01-20T21:48:08Z
    	 exit code: 143
    	 termination reason: Error
    2019-01-20 23:48:09 INFO  Client:58 - Application spark-pi with submissionId spark:spark-pi-1548020873671-driver finished.
    2019-01-20 23:48:09 INFO  ShutdownHookManager:58 - Shutdown hook called
    2019-01-20 23:48:09 INFO  ShutdownHookManager:58 - Deleting directory /tmp/spark-f114b2e0-5605-4083-9203-a4b1c1f6059e
    
    ```
    
    Glob scenario:
    
    ```
    ./bin/spark-submit --status spark:spark-pi* --master  k8s://https://192.168.2.8:8443
    2019-01-20 22:27:44 WARN  Utils:70 - Your hostname, universe resolves to a loopback address: 127.0.0.1; using 192.168.2.8 instead (on interface wlp2s0)
    2019-01-20 22:27:44 WARN  Utils:70 - Set SPARK_LOCAL_IP if you need to bind to another address
    Application status (driver):
    	 pod name: spark-pi-1547948600328-driver
    	 namespace: spark
    	 labels: spark-app-selector -> spark-f13f01702f0b4503975ce98252d59b94, spark-role -> driver
    	 pod uid: c576e1c6-1c54-11e9-8215-a434d9270a65
    	 creation time: 2019-01-20T01:43:22Z
    	 service account name: spark-sa
    	 volumes: spark-local-dir-1, spark-conf-volume, spark-sa-token-b7wcm
    	 node name: minikube
    	 start time: 2019-01-20T01:43:22Z
    	 phase: Running
    	 container status:
    		 container name: spark-kubernetes-driver
    		 container image: skonto/spark:k8s-3.0.0
    		 container state: running
    		 container started at: 2019-01-20T01:43:27Z
    Application status (driver):
    	 pod name: spark-pi-1547948792539-driver
    	 namespace: spark
    	 labels: spark-app-selector -> spark-006d252db9b24f25b5069df357c30264, spark-role -> driver
    	 pod uid: 38375b4b-1c55-11e9-8215-a434d9270a65
    	 creation time: 2019-01-20T01:46:35Z
    	 service account name: spark-sa
    	 volumes: spark-local-dir-1, spark-conf-volume, spark-sa-token-b7wcm
    	 node name: minikube
    	 start time: 2019-01-20T01:46:35Z
    	 phase: Succeeded
    	 container status:
    		 container name: spark-kubernetes-driver
    		 container image: skonto/spark:k8s-3.0.0
    		 container state: terminated
    		 container started at: 2019-01-20T01:46:39Z
    		 container finished at: 2019-01-20T01:46:56Z
    		 exit code: 0
    		 termination reason: Completed
    
    ```
    
    Closes #23599 from skonto/submit_ops_extension.
    
    Authored-by: Stavros Kontopoulos <st...@lightbend.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../org/apache/spark/deploy/SparkSubmit.scala      |  61 ++++++-
 .../apache/spark/deploy/SparkSubmitArguments.scala |  17 +-
 .../spark/deploy/rest/RestSubmissionClient.scala   |   9 +-
 .../org/apache/spark/util/CommandLineUtils.scala   |   8 +-
 .../org/apache/spark/deploy/SparkSubmitSuite.scala |  17 ++
 docs/running-on-kubernetes.md                      |  67 ++++++--
 resource-managers/kubernetes/core/pom.xml          |   6 +
 .../org.apache.spark.deploy.SparkSubmitOperation   |   1 +
 .../scala/org/apache/spark/deploy/k8s/Config.scala |   7 +
 .../spark/deploy/k8s/submit/K8sSubmitOps.scala     | 188 +++++++++++++++++++++
 .../k8s/submit/KubernetesClientApplication.scala   |   9 +-
 .../spark/deploy/k8s/submit/K8sSubmitOpSuite.scala | 156 +++++++++++++++++
 12 files changed, 504 insertions(+), 42 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index b4d7462..9efaaa7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -22,9 +22,10 @@ import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowab
 import java.net.{URI, URL}
 import java.security.PrivilegedExceptionAction
 import java.text.ParseException
-import java.util.UUID
+import java.util.{ServiceLoader, UUID}
 
 import scala.annotation.tailrec
+import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
 import scala.util.{Properties, Try}
 
@@ -96,20 +97,35 @@ private[spark] class SparkSubmit extends Logging {
   }
 
   /**
-   * Kill an existing submission using the REST protocol. Standalone and Mesos cluster mode only.
+   * Kill an existing submission.
    */
   private def kill(args: SparkSubmitArguments): Unit = {
-    new RestSubmissionClient(args.master)
-      .killSubmission(args.submissionToKill)
+    if (RestSubmissionClient.supportsRestClient(args.master)) {
+      new RestSubmissionClient(args.master)
+        .killSubmission(args.submissionToKill)
+    } else {
+      val sparkConf = args.toSparkConf()
+      sparkConf.set("spark.master", args.master)
+      SparkSubmitUtils
+        .getSubmitOperations(args.master)
+        .kill(args.submissionToKill, sparkConf)
+    }
   }
 
   /**
-   * Request the status of an existing submission using the REST protocol.
-   * Standalone and Mesos cluster mode only.
+   * Request the status of an existing submission.
    */
   private def requestStatus(args: SparkSubmitArguments): Unit = {
-    new RestSubmissionClient(args.master)
-      .requestSubmissionStatus(args.submissionToRequestStatusFor)
+    if (RestSubmissionClient.supportsRestClient(args.master)) {
+      new RestSubmissionClient(args.master)
+        .requestSubmissionStatus(args.submissionToRequestStatusFor)
+    } else {
+      val sparkConf = args.toSparkConf()
+      sparkConf.set("spark.master", args.master)
+      SparkSubmitUtils
+        .getSubmitOperations(args.master)
+        .printSubmissionStatus(args.submissionToRequestStatusFor, sparkConf)
+    }
   }
 
   /** Print version information to the log. */
@@ -320,7 +336,8 @@ private[spark] class SparkSubmit extends Logging {
       }
     }
 
-    args.sparkProperties.foreach { case (k, v) => sparkConf.set(k, v) }
+    // update spark config from args
+    args.toSparkConf(Option(sparkConf))
     val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
     val targetDir = Utils.createTempDir()
 
@@ -1336,6 +1353,23 @@ private[spark] object SparkSubmitUtils {
     }
   }
 
+  private[deploy] def getSubmitOperations(master: String): SparkSubmitOperation = {
+    val loader = Utils.getContextOrSparkClassLoader
+    val serviceLoaders =
+      ServiceLoader.load(classOf[SparkSubmitOperation], loader)
+        .asScala
+        .filter(_.supports(master))
+
+    serviceLoaders.size match {
+      case x if x > 1 =>
+        throw new SparkException(s"Multiple($x) external SparkSubmitOperations " +
+          s"clients registered for master url ${master}.")
+      case 1 => serviceLoaders.headOption.get
+      case _ =>
+        throw new IllegalArgumentException(s"No external SparkSubmitOperations " +
+          s"clients found for master url: '$master'")
+    }
+  }
 }
 
 /**
@@ -1348,3 +1382,12 @@ private case class OptionAssigner(
     deployMode: Int,
     clOption: String = null,
     confKey: String = null)
+
+private[spark] trait SparkSubmitOperation {
+
+  def kill(submissionId: String, conf: SparkConf): Unit
+
+  def printSubmissionStatus(submissionId: String, conf: SparkConf): Unit
+
+  def supports(master: String): Boolean
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index e7954d1..ed1324b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -29,7 +29,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.io.Source
 import scala.util.Try
 
-import org.apache.spark.{SparkException, SparkUserAppException}
+import org.apache.spark.{SparkConf, SparkException, SparkUserAppException}
 import org.apache.spark.deploy.SparkSubmitAction._
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config.DYN_ALLOCATION_ENABLED
@@ -305,19 +305,12 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
   }
 
   private def validateKillArguments(): Unit = {
-    if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
-      error("Killing submissions is only supported in standalone or Mesos mode!")
-    }
     if (submissionToKill == null) {
       error("Please specify a submission to kill.")
     }
   }
 
   private def validateStatusRequestArguments(): Unit = {
-    if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
-      error(
-        "Requesting submission statuses is only supported in standalone or Mesos mode!")
-    }
     if (submissionToRequestStatusFor == null) {
       error("Please specify a submission to request status for.")
     }
@@ -574,6 +567,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
         |
         | Spark standalone or Mesos with cluster deploy mode only:
         |  --supervise                 If given, restarts the driver on failure.
+        |
+        | Spark standalone, Mesos or K8s with cluster deploy mode only:
         |  --kill SUBMISSION_ID        If given, kills the driver specified.
         |  --status SUBMISSION_ID      If given, requests the status of the driver specified.
         |
@@ -662,4 +657,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
 
   private def error(msg: String): Unit = throw new SparkException(msg)
 
+  private[deploy] def toSparkConf(sparkConf: Option[SparkConf] = None): SparkConf = {
+    // either use an existing config or create a new empty one
+    sparkProperties.foldLeft(sparkConf.getOrElse(new SparkConf())) {
+      case (conf, (k, v)) => conf.set(k, v)
+    }
+  }
 }
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
index afa413f..1648ba5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
@@ -61,8 +61,6 @@ import org.apache.spark.util.Utils
 private[spark] class RestSubmissionClient(master: String) extends Logging {
   import RestSubmissionClient._
 
-  private val supportedMasterPrefixes = Seq("spark://", "mesos://")
-
   private val masters: Array[String] = if (master.startsWith("spark://")) {
     Utils.parseStandaloneMasterUrls(master)
   } else {
@@ -409,6 +407,8 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
 
 private[spark] object RestSubmissionClient {
 
+  val supportedMasterPrefixes = Seq("spark://", "mesos://")
+
   // SPARK_HOME and SPARK_CONF_DIR are filtered out because they are usually wrong
   // on the remote machine (SPARK-12345) (SPARK-25934)
   private val BLACKLISTED_SPARK_ENV_VARS = Set("SPARK_ENV_LOADED", "SPARK_HOME", "SPARK_CONF_DIR")
@@ -424,6 +424,10 @@ private[spark] object RestSubmissionClient {
       (k.startsWith("SPARK_") && !BLACKLISTED_SPARK_ENV_VARS.contains(k)) || k.startsWith("MESOS_")
     }
   }
+
+  private[spark] def supportsRestClient(master: String): Boolean = {
+    supportedMasterPrefixes.exists(master.startsWith)
+  }
 }
 
 private[spark] class RestSubmissionClientApp extends SparkApplication {
@@ -456,5 +460,4 @@ private[spark] class RestSubmissionClientApp extends SparkApplication {
     val env = RestSubmissionClient.filterSystemEnvironment(sys.env)
     run(appResource, mainClass, appArgs, conf, env)
   }
-
 }
diff --git a/core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala b/core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala
index 4b6602b..add1146 100644
--- a/core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala
@@ -25,8 +25,12 @@ import org.apache.spark.SparkException
  * Contains basic command line parsing functionality and methods to parse some common Spark CLI
  * options.
  */
-private[spark] trait CommandLineUtils {
+private[spark] trait CommandLineUtils extends CommandLineLoggingUtils {
 
+  def main(args: Array[String]): Unit
+}
+
+private[spark] trait CommandLineLoggingUtils {
   // Exposed for testing
   private[spark] var exitFn: Int => Unit = (exitCode: Int) => System.exit(exitCode)
 
@@ -41,6 +45,4 @@ private[spark] trait CommandLineUtils {
     printMessage("Run with --help for usage help or --verbose for debug output")
     exitFn(1)
   }
-
-  def main(args: Array[String]): Unit
 }
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 3051d65..641751f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -1239,6 +1239,23 @@ class SparkSubmitSuite
 
     conf.get(nonDelimSpaceFromFile._1) should be ("blah")
   }
+
+  test("get a Spark configuration from arguments") {
+    val testConf = "spark.test.hello" -> "world"
+    val masterConf = "spark.master" -> "yarn"
+    val clArgs = Seq(
+      "--conf", s"${testConf._1}=${testConf._2}",
+      "--conf", s"${masterConf._1}=${masterConf._2}",
+      "--class", "Foo",
+      "app.jar")
+    val conf = new SparkSubmitArguments(clArgs).toSparkConf()
+     Seq(
+       testConf,
+       masterConf
+     ).foreach { case (k, v) =>
+       conf.get(k) should be (v)
+     }
+  }
 }
 
 object SparkSubmitSuite extends SparkFunSuite with TimeLimits {
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 40ff62c..7e26d77 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -62,7 +62,7 @@ logs and remains in "completed" state in the Kubernetes API until it's eventuall
 
 Note that in the completed state, the driver pod does *not* use any computational or memory resources.
 
-The driver and executor pod scheduling is handled by Kubernetes. Communication to the Kubernetes API is done via fabric8, and we are 
+The driver and executor pod scheduling is handled by Kubernetes. Communication to the Kubernetes API is done via fabric8, and we are
 currently running <code>kubernetes-client</code> version <code>4.1.0</code>. Make sure that when you are making infrastructure additions that you are aware of said version. It is possible to schedule the
 driver and executor pods on a subset of available nodes through a [node selector](https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#nodeselector)
 using the configuration property for it. It will be possible to use more advanced
@@ -89,7 +89,7 @@ $ ./bin/docker-image-tool.sh -r <repo> -t my-tag push
 ```
 This will build using the projects provided default `Dockerfiles`. To see more options available for customising the behaviour of this tool, including providing custom `Dockerfiles`, please run with the `-h` flag.
 
-By default `bin/docker-image-tool.sh` builds docker image for running JVM jobs. You need to opt-in to build additional 
+By default `bin/docker-image-tool.sh` builds docker image for running JVM jobs. You need to opt-in to build additional
 language binding docker images.
 
 Example usage is
@@ -250,7 +250,7 @@ To mount a volume of any of the types above into the driver pod, use the followi
 --conf spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.path=<mount path>
 --conf spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.readOnly=<true|false>
 --conf spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.subPath=<mount subPath>
-``` 
+```
 
 Specifically, `VolumeType` can be one of the following values: `hostPath`, `emptyDir`, and `persistentVolumeClaim`. `VolumeName` is the name you want to use for the volume under the `volumes` field in the pod specification.
 
@@ -258,7 +258,7 @@ Each supported type of volumes may have some specific configuration options, whi
 
 ```
 spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].options.[OptionName]=<value>
-``` 
+```
 
 For example, the claim name of a `persistentVolumeClaim` with volume name `checkpointpvc` can be specified using the following property:
 
@@ -266,7 +266,7 @@ For example, the claim name of a `persistentVolumeClaim` with volume name `check
 spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=check-point-pvc-claim
 ```
 
-The configuration properties for mounting volumes into the executor pods use prefix `spark.kubernetes.executor.` instead of `spark.kubernetes.driver.`. For a complete list of available options for each supported type of volumes, please refer to the [Spark Properties](#spark-properties) section below. 
+The configuration properties for mounting volumes into the executor pods use prefix `spark.kubernetes.executor.` instead of `spark.kubernetes.driver.`. For a complete list of available options for each supported type of volumes, please refer to the [Spark Properties](#spark-properties) section below.
 
 ## Local Storage
 
@@ -403,6 +403,36 @@ RBAC authorization and how to configure Kubernetes service accounts for pods, pl
 [Using RBAC Authorization](https://kubernetes.io/docs/admin/authorization/rbac/) and
 [Configure Service Accounts for Pods](https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/).
 
+## Spark Application Management
+
+Kubernetes provides simple application management via the spark-submit CLI tool in cluster mode.
+Users can kill a job by providing the submission ID that is printed when submitting their job.
+The submission ID follows the format ``namespace:driver-pod-name``.
+If user omits the namespace then the namespace set in current k8s context is used.
+For example if user has set a specific namespace as follows `kubectl config set-context minikube --namespace=spark`
+then the `spark` namespace will be used by default. On the other hand, if there is no namespace added to the specific context
+then all namespaces will be considered by default. That means operations will affect all Spark applications matching the given submission ID regardless of namespace.
+Moreover, spark-submit for application management uses the same backend code that is used for submitting the driver, so the same properties
+like `spark.kubernetes.context` etc., can be re-used.
+
+For example:
+```bash
+$ spark-submit --kill spark:spark-pi-1547948636094-driver --master k8s://https://192.168.2.8:8443
+```
+Users also can list the application status by using the `--status` flag:
+
+```bash
+$ spark-submit --status spark:spark-pi-1547948636094-driver --master  k8s://https://192.168.2.8:8443
+```
+Both operations support glob patterns. For example user can run:
+```bash
+$ spark-submit --kill spark:spark-pi* --master  k8s://https://192.168.2.8:8443
+```
+The above will kill all application with the specific prefix.
+
+User can specify the grace period for pod termination via the `spark.kubernetes.appKillPodDeletionGracePeriod` property,
+using `--conf` as means to provide it (default value for all K8s pods is <a href="https://kubernetes.io/docs/concepts/workloads/pods/pod">30 secs</a>).
+
 ## Future Work
 
 There are several Spark on Kubernetes features that are currently being worked on or planned to be worked on. Those features are expected to eventually make it into future versions of the spark-kubernetes integration.
@@ -411,7 +441,6 @@ Some of these include:
 
 * Dynamic Resource Allocation and External Shuffle Service
 * Local File Dependency Management
-* Spark Application Management
 * Job Queues and Resource Management
 
 # Configuration
@@ -426,10 +455,10 @@ See the [configuration page](configuration.html) for information on Spark config
   <td><code>spark.kubernetes.context</code></td>
   <td><code>(none)</code></td>
   <td>
-    The context from the user Kubernetes configuration file used for the initial 
+    The context from the user Kubernetes configuration file used for the initial
     auto-configuration of the Kubernetes client library.  When not specified then
-    the users current context is used.  <strong>NB:</strong> Many of the 
-    auto-configured settings can be overridden by the use of other Spark 
+    the users current context is used.  <strong>NB:</strong> Many of the
+    auto-configured settings can be overridden by the use of other Spark
     configuration properties e.g. <code>spark.kubernetes.namespace</code>.
   </td>
 </tr>
@@ -762,7 +791,7 @@ See the [configuration page](configuration.html) for information on Spark config
   <td>
     Specify the cpu request for each executor pod. Values conform to the Kubernetes <a href="https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu">convention</a>.
     Example values include 0.1, 500m, 1.5, 5, etc., with the definition of cpu units documented in <a href="https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#cpu-units">CPU units</a>.
-    This is distinct from <code>spark.executor.cores</code>: it is only used and takes precedence over <code>spark.executor.cores</code> for specifying the executor pod cpu request if set. Task 
+    This is distinct from <code>spark.executor.cores</code>: it is only used and takes precedence over <code>spark.executor.cores</code> for specifying the executor pod cpu request if set. Task
     parallelism, e.g., number of tasks an executor can run concurrently is not affected by this.
   </td>
 </tr>
@@ -900,14 +929,14 @@ See the [configuration page](configuration.html) for information on Spark config
   <td><code>0.1</code></td>
   <td>
     This sets the Memory Overhead Factor that will allocate memory to non-JVM memory, which includes off-heap memory allocations, non-JVM tasks, and various systems processes. For JVM-based jobs this value will default to 0.10 and 0.40 for non-JVM jobs.
-    This is done as non-JVM tasks need more non-JVM heap space and such tasks commonly fail with "Memory Overhead Exceeded" errors. This prempts this error with a higher default. 
+    This is done as non-JVM tasks need more non-JVM heap space and such tasks commonly fail with "Memory Overhead Exceeded" errors. This prempts this error with a higher default.
   </td>
 </tr>
 <tr>
   <td><code>spark.kubernetes.pyspark.pythonVersion</code></td>
   <td><code>"3"</code></td>
   <td>
-   This sets the major Python version of the docker image used to run the driver and executor containers. Can either be 2 or 3. 
+   This sets the major Python version of the docker image used to run the driver and executor containers. Can either be 2 or 3.
   </td>
 </tr>
 <tr>
@@ -931,7 +960,7 @@ See the [configuration page](configuration.html) for information on Spark config
   <td><code>spark.kubernetes.hadoop.configMapName</code></td>
   <td><code>(none)</code></td>
   <td>
-    Specify the name of the ConfigMap, containing the HADOOP_CONF_DIR files, to be mounted on the driver 
+    Specify the name of the ConfigMap, containing the HADOOP_CONF_DIR files, to be mounted on the driver
     and executors for custom Hadoop configuration.
   </td>
 </tr>
@@ -940,14 +969,14 @@ See the [configuration page](configuration.html) for information on Spark config
   <td><code>(none)</code></td>
   <td>
     Specify the name of the secret where your existing delegation tokens are stored. This removes the need for the job user
-    to provide any kerberos credentials for launching a job. 
+    to provide any kerberos credentials for launching a job.
   </td>
 </tr>
 <tr>
   <td><code>spark.kubernetes.kerberos.tokenSecret.itemKey</code></td>
   <td><code>(none)</code></td>
   <td>
-    Specify the item key of the data where your existing delegation tokens are stored. This removes the need for the job user 
+    Specify the item key of the data where your existing delegation tokens are stored. This removes the need for the job user
     to provide any kerberos credentials for launching a job.
   </td>
 </tr>
@@ -1018,6 +1047,13 @@ See the [configuration page](configuration.html) for information on Spark config
     Request timeout in milliseconds for the kubernetes client in driver to use when requesting executors.
   </td>
 </tr>
+<tr>  
+  <td><code>spark.kubernetes.appKillPodDeletionGracePeriod</code></td>
+  <td>(none)</td>
+  <td>
+  Specify the grace period in seconds when deleting a Spark application using spark-submit.
+  </td>
+</tr>
 </table>
 
 #### Pod template properties
@@ -1155,7 +1191,6 @@ The following affect the driver and executor containers. All other containers in
     The cpu limits are set by <code>spark.kubernetes.{driver,executor}.limit.cores</code>. The cpu is set by
     <code>spark.{driver,executor}.cores</code>. The memory request and limit are set by summing the values of
     <code>spark.{driver,executor}.memory</code> and <code>spark.{driver,executor}.memoryOverhead</code>.
-
   </td>
 </tr>
 <tr>
diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml
index 23106cb..85fd613 100644
--- a/resource-managers/kubernetes/core/pom.xml
+++ b/resource-managers/kubernetes/core/pom.xml
@@ -54,6 +54,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>io.fabric8</groupId>
       <artifactId>kubernetes-client</artifactId>
       <version>${kubernetes.client.version}</version>
diff --git a/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.SparkSubmitOperation b/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.SparkSubmitOperation
new file mode 100644
index 0000000..d589e6b
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.SparkSubmitOperation
@@ -0,0 +1 @@
+org.apache.spark.deploy.k8s.submit.K8SSparkSubmitOperation
\ No newline at end of file
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 83b5a75..b33b125 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -325,6 +325,13 @@ private[spark] object Config extends Logging {
       .booleanConf
       .createWithDefault(true)
 
+  val KUBERNETES_SUBMIT_GRACE_PERIOD =
+    ConfigBuilder("spark.kubernetes.appKillPodDeletionGracePeriod")
+      .doc("Time to wait for graceful deletion of Spark pods when spark-submit" +
+        " is used for killing an application.")
+      .timeConf(TimeUnit.SECONDS)
+      .createOptional
+
   val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
   val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
   val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala
new file mode 100644
index 0000000..d45c067
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import scala.collection.JavaConverters._
+
+import K8SSparkSubmitOperation.getGracePeriod
+import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodList}
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.{NonNamespaceOperation, PodResource}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkSubmitOperation
+import org.apache.spark.deploy.k8s.{KubernetesUtils, SparkKubernetesClientFactory}
+import org.apache.spark.deploy.k8s.Config.{KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX, KUBERNETES_SUBMIT_GRACE_PERIOD}
+import org.apache.spark.deploy.k8s.Constants.{SPARK_POD_DRIVER_ROLE, SPARK_ROLE_LABEL}
+import org.apache.spark.deploy.k8s.KubernetesUtils.formatPodState
+import org.apache.spark.util.{CommandLineLoggingUtils, Utils}
+
+private sealed trait K8sSubmitOp extends CommandLineLoggingUtils {
+  type NON_NAMESPACED_PODS =
+    NonNamespaceOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
+  def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
+      (implicit client: KubernetesClient): Unit
+  def executeOnGlob(pods: List[Pod], ns: Option[String], sparkConf: SparkConf)
+      (implicit client: KubernetesClient): Unit
+  def listPodsInNameSpace(namespace: Option[String])
+      (implicit client: KubernetesClient): NON_NAMESPACED_PODS = {
+    namespace match {
+      case Some(ns) => client.pods.inNamespace(ns)
+      case None => client.pods
+    }
+  }
+}
+
+private class KillApplication extends K8sSubmitOp  {
+  override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
+      (implicit client: KubernetesClient): Unit = {
+    val podToDelete = listPodsInNameSpace(namespace).withName(pName)
+
+    if (Option(podToDelete).isDefined) {
+      getGracePeriod(sparkConf) match {
+        case Some(period) => podToDelete.withGracePeriod(period).delete()
+        case _ => podToDelete.delete()
+      }
+    } else {
+      printMessage("Application not found.")
+    }
+  }
+
+  override def executeOnGlob(pods: List[Pod], namespace: Option[String], sparkConf: SparkConf)
+      (implicit client: KubernetesClient): Unit = {
+    if (pods.nonEmpty) {
+      pods.foreach { pod => printMessage(s"Deleting driver pod: ${pod.getMetadata.getName}.") }
+      val listedPods = listPodsInNameSpace(namespace)
+
+      getGracePeriod(sparkConf) match {
+        case Some(period) =>
+          // this is not using the batch api because no option is provided
+          // when using the grace period.
+          pods.foreach { pod =>
+            listedPods
+              .withName(pod.getMetadata.getName)
+              .withGracePeriod(period)
+              .delete()
+          }
+        case _ => listedPods.delete(pods.asJava)
+      }
+    } else {
+      printMessage("No applications found.")
+    }
+  }
+}
+
+private class ListStatus extends K8sSubmitOp {
+  override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
+      (implicit client: KubernetesClient): Unit = {
+    val pod = listPodsInNameSpace(namespace).withName(pName).get()
+    if (Option(pod).isDefined) {
+      printMessage("Application status (driver): " +
+        Option(pod).map(formatPodState).getOrElse("unknown."))
+    } else {
+      printMessage("Application not found.")
+    }
+  }
+
+  override def executeOnGlob(pods: List[Pod], ns: Option[String], sparkConf: SparkConf)
+      (implicit client: KubernetesClient): Unit = {
+    if (pods.nonEmpty) {
+      for (pod <- pods) {
+        printMessage("Application status (driver): " +
+          Option(pod).map(formatPodState).getOrElse("unknown."))
+      }
+    } else {
+      printMessage("No applications found.")
+    }
+  }
+}
+
+private[spark] class K8SSparkSubmitOperation extends SparkSubmitOperation
+  with CommandLineLoggingUtils {
+
+  private def isGlob(name: String): Boolean = {
+    name.last == '*'
+  }
+
+  def execute(submissionId: String, sparkConf: SparkConf, op: K8sSubmitOp): Unit = {
+    val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
+    submissionId.split(":", 2) match {
+      case Array(part1, part2@_*) =>
+        val namespace = if (part2.isEmpty) None else Some(part1)
+        val pName = if (part2.isEmpty) part1 else part2.headOption.get
+        Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
+          master,
+          namespace,
+          KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
+          SparkKubernetesClientFactory.ClientType.Submission,
+          sparkConf,
+          None,
+          None)
+        ) { kubernetesClient =>
+          implicit val client: KubernetesClient = kubernetesClient
+          if (isGlob(pName)) {
+            val ops = namespace match {
+              case Some(ns) =>
+                kubernetesClient
+                  .pods
+                  .inNamespace(ns)
+              case None =>
+                kubernetesClient
+                  .pods
+            }
+            val pods = ops
+              .list()
+              .getItems
+              .asScala
+              .filter { pod =>
+                val meta = pod.getMetadata
+                meta.getName.startsWith(pName.stripSuffix("*")) &&
+                  meta.getLabels.get(SPARK_ROLE_LABEL) == SPARK_POD_DRIVER_ROLE
+              }.toList
+            op.executeOnGlob(pods, namespace, sparkConf)
+          } else {
+            op.executeOnPod(pName, namespace, sparkConf)
+          }
+        }
+      case _ =>
+        printErrorAndExit(s"Submission ID: {$submissionId} is invalid.")
+    }
+  }
+
+  override def kill(submissionId: String, conf: SparkConf): Unit = {
+    printMessage(s"Submitting a request to kill submission " +
+      s"${submissionId} in ${conf.get("spark.master")}. " +
+      s"Grace period in secs: ${getGracePeriod(conf).getOrElse("not set.")}")
+    execute(submissionId, conf, new KillApplication)
+  }
+
+  override def printSubmissionStatus(submissionId: String, conf: SparkConf): Unit = {
+    printMessage(s"Submitting a request for the status of submission" +
+      s" ${submissionId} in ${conf.get("spark.master")}.")
+    execute(submissionId, conf, new ListStatus)
+  }
+
+  override def supports(master: String): Boolean = {
+    master.startsWith("k8s://")
+  }
+}
+
+private object K8SSparkSubmitOperation {
+  def getGracePeriod(sparkConf: SparkConf): Option[Long] = {
+    sparkConf.get(KUBERNETES_SUBMIT_GRACE_PERIOD)
+  }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 92d5176..11bbad9 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -141,12 +141,15 @@ private[spark] class Client(
           throw e
       }
 
+      val sId = s"${Option(conf.namespace).map(_ + ":").getOrElse("")}" +
+        s"${resolvedDriverPod.getMetadata.getName}"
       if (waitForAppCompletion) {
-        logInfo(s"Waiting for application ${conf.appName} to finish...")
+        logInfo(s"Waiting for application ${conf.appName} with submission ID ${sId} to finish...")
         watcher.awaitCompletion()
-        logInfo(s"Application ${conf.appName} finished.")
+        logInfo(s"Application ${conf.appName} with submission ID ${sId} finished.")
       } else {
-        logInfo(s"Deployed Spark application ${conf.appName} into Kubernetes.")
+        logInfo(s"Deployed Spark application ${conf.appName} with " +
+          s"submission ID ${sId} into Kubernetes.")
       }
     }
   }
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOpSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOpSuite.scala
new file mode 100644
index 0000000..d8be132
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOpSuite.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.io.PrintStream
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.PodResource
+import org.mockito.{ArgumentMatchers, Mock, MockitoAnnotations}
+import org.mockito.Mockito.{times, verify, when}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Config.KUBERNETES_SUBMIT_GRACE_PERIOD
+import org.apache.spark.deploy.k8s.Constants.{SPARK_APP_ID_LABEL, SPARK_POD_DRIVER_ROLE, SPARK_ROLE_LABEL}
+import org.apache.spark.deploy.k8s.Fabric8Aliases.PODS
+import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils.TEST_SPARK_APP_ID
+
+class K8sSubmitOpSuite extends SparkFunSuite with BeforeAndAfter {
+  private val driverPodName1 = "driver1"
+  private val driverPodName2 = "driver2"
+  private val driverPod1 = buildDriverPod(driverPodName1, "1")
+  private val driverPod2 = buildDriverPod(driverPodName2, "2")
+  private val podList = List(driverPod1, driverPod2)
+  private val namespace = "test"
+
+  @Mock
+  private var podOperations: PODS = _
+
+  @Mock
+  private var driverPodOperations1: PodResource[Pod, DoneablePod] = _
+
+  @Mock
+  private var driverPodOperations2: PodResource[Pod, DoneablePod] = _
+
+  @Mock
+  private var kubernetesClient: KubernetesClient = _
+
+  @Mock
+  private var err: PrintStream = _
+
+  before {
+    MockitoAnnotations.initMocks(this)
+    when(kubernetesClient.pods()).thenReturn(podOperations)
+    when(podOperations.inNamespace(namespace)).thenReturn(podOperations)
+    when(podOperations.delete(podList.asJava)).thenReturn(true)
+    when(podOperations.withName(driverPodName1)).thenReturn(driverPodOperations1)
+    when(podOperations.withName(driverPodName2)).thenReturn(driverPodOperations2)
+    when(driverPodOperations1.get).thenReturn(driverPod1)
+    when(driverPodOperations1.delete()).thenReturn(true)
+    when(driverPodOperations2.get).thenReturn(driverPod2)
+    when(driverPodOperations2.delete()).thenReturn(true)
+  }
+
+  test("List app status") {
+    implicit val kubeClient: KubernetesClient = kubernetesClient
+    val listStatus = new ListStatus
+    listStatus.printStream = err
+    listStatus.executeOnPod(driverPodName1, Option(namespace), new SparkConf())
+    // scalastyle:off
+    verify(err).println(ArgumentMatchers.eq(getPodStatus(driverPodName1, "1")))
+    // scalastyle:on
+  }
+
+  test("List status for multiple apps with glob") {
+    implicit val kubeClient: KubernetesClient = kubernetesClient
+    val listStatus = new ListStatus
+    listStatus.printStream = err
+    listStatus.executeOnGlob(podList, Option(namespace), new SparkConf())
+    // scalastyle:off
+    verify(err).println(ArgumentMatchers.eq(getPodStatus(driverPodName1, "1")))
+    verify(err).println(ArgumentMatchers.eq(getPodStatus(driverPodName2, "2")))
+    // scalastyle:on
+  }
+
+  test("Kill app") {
+    implicit val kubeClient: KubernetesClient = kubernetesClient
+    val killApp = new KillApplication
+    killApp.executeOnPod(driverPodName1, Option(namespace), new SparkConf())
+    verify(driverPodOperations1, times(1)).delete()
+  }
+
+  test("Kill app with gracePeriod") {
+    implicit val kubeClient: KubernetesClient = kubernetesClient
+    val killApp = new KillApplication
+    val conf = new SparkConf().set(KUBERNETES_SUBMIT_GRACE_PERIOD, 1L)
+    when(driverPodOperations1.withGracePeriod(1L)).thenReturn(driverPodOperations1)
+    killApp.executeOnPod(driverPodName1, Option(namespace), conf)
+    verify(driverPodOperations1, times(1)).withGracePeriod(1L)
+    verify(driverPodOperations1, times(1)).delete()
+  }
+
+  test("Kill multiple apps with glob without gracePeriod") {
+    implicit val kubeClient: KubernetesClient = kubernetesClient
+    val killApp = new KillApplication
+    killApp.printStream = err
+    killApp.executeOnGlob(podList, Option(namespace), new SparkConf())
+    verify(podOperations, times(1)).delete(podList.asJava)
+    // scalastyle:off
+    verify(err).println(ArgumentMatchers.eq(s"Deleting driver pod: $driverPodName1."))
+    verify(err).println(ArgumentMatchers.eq(s"Deleting driver pod: $driverPodName2."))
+    // scalastyle:on
+  }
+
+  private def buildDriverPod(podName: String, id: String): Pod = {
+    new PodBuilder()
+      .withNewMetadata()
+        .withName(podName)
+        .withNamespace(namespace)
+        .addToLabels(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)
+        .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
+        .withUid(s"driver-pod-$id")
+      .endMetadata()
+      .withNewSpec()
+        .withServiceAccountName(s"test$id")
+        .withVolumes()
+        .withNodeName(s"testNode$id")
+      .endSpec()
+      .withNewStatus()
+        .withPhase("Running")
+      .endStatus()
+      .build()
+  }
+
+  private def getPodStatus(podName: String, id: String): String = {
+    "Application status (driver): " +
+      s"""|${"\n\t"} pod name: $podName
+          |${"\t"} namespace: N/A
+          |${"\t"} labels: spark-app-selector -> spark-app-id, spark-role -> driver
+          |${"\t"} pod uid: driver-pod-$id
+          |${"\t"} creation time: N/A
+          |${"\t"} service account name: test$id
+          |${"\t"} volumes: N/A
+          |${"\t"} node name: testNode$id
+          |${"\t"} start time: N/A
+          |${"\t"} phase: Running
+          |${"\t"} container status: N/A""".stripMargin
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org