You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ej...@apache.org on 2019/05/22 23:16:41 UTC

[spark] branch master updated: [SPARK-23153][K8S] Support client dependencies with a Hadoop Compatible File System

This is an automated email from the ASF dual-hosted git repository.

eje pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e74570  [SPARK-23153][K8S] Support client dependencies with a Hadoop Compatible File System
5e74570 is described below

commit 5e74570c8f5e7dfc1ca1c53c177827c5cea57bf1
Author: Stavros Kontopoulos <st...@lightbend.com>
AuthorDate: Wed May 22 16:15:42 2019 -0700

    [SPARK-23153][K8S] Support client dependencies with a Hadoop Compatible File System
    
    ## What changes were proposed in this pull request?
    - solves the current issue with --packages in cluster mode (there is no ticket for it). Also note of some [issues](https://issues.apache.org/jira/browse/SPARK-22657) of the past here when hadoop libs are used at the spark submit side.
    - supports spark.jars, spark.files, app jar.
    
    It works as follows:
    Spark submit uploads the deps to the HCFS. Then the driver serves the deps via the Spark file server.
    No hcfs uris are propagated.
    
    The related design document is [here](https://docs.google.com/document/d/1peg_qVhLaAl4weo5C51jQicPwLclApBsdR1To2fgc48/edit). the next option to add is the RSS but has to be improved given the discussion in the past about it (Spark 2.3).
    ## How was this patch tested?
    
    - Run integration test suite.
    - Run an example using S3:
    
    ```
     ./bin/spark-submit \
    ...
     --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.6 \
     --deploy-mode cluster \
     --name spark-pi \
     --class org.apache.spark.examples.SparkPi \
     --conf spark.executor.memory=1G \
     --conf spark.kubernetes.namespace=spark \
     --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa \
     --conf spark.driver.memory=1G \
     --conf spark.executor.instances=2 \
     --conf spark.sql.streaming.metricsEnabled=true \
     --conf "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp" \
     --conf spark.kubernetes.container.image.pullPolicy=Always \
     --conf spark.kubernetes.container.image=skonto/spark:k8s-3.0.0 \
     --conf spark.kubernetes.file.upload.path=s3a://fdp-stavros-test \
     --conf spark.hadoop.fs.s3a.access.key=... \
     --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
     --conf spark.hadoop.fs.s3a.fast.upload=true \
     --conf spark.kubernetes.executor.deleteOnTermination=false \
     --conf spark.hadoop.fs.s3a.secret.key=... \
     --conf spark.files=client:///...resolv.conf \
    file:///my.jar **
    ```
    Added integration tests based on [Ceph nano](https://github.com/ceph/cn). Looks very [active](http://www.sebastien-han.fr/blog/2019/02/24/Ceph-nano-is-getting-better-and-better/).
    Unfortunately minio needs hadoop >= 2.8.
    
    Closes #23546 from skonto/support-client-deps.
    
    Authored-by: Stavros Kontopoulos <st...@lightbend.com>
    Signed-off-by: Erik Erlandson <ee...@redhat.com>
---
 .../org/apache/spark/deploy/SparkSubmit.scala      |  91 ++++++--
 .../org/apache/spark/deploy/SparkSubmitSuite.scala |  12 +-
 docs/running-on-kubernetes.md                      |  37 +++-
 .../scala/org/apache/spark/deploy/k8s/Config.scala |   7 +
 .../apache/spark/deploy/k8s/KubernetesUtils.scala  |  82 ++++++-
 .../k8s/features/BasicDriverFeatureStep.scala      |  10 +-
 .../k8s/features/DriverCommandFeatureStep.scala    |  10 +-
 .../kubernetes/integration-tests/pom.xml           |   6 +
 .../k8s/integrationtest/DepsTestsSuite.scala       | 239 +++++++++++++++++++++
 .../k8s/integrationtest/KubernetesSuite.scala      |  34 +--
 .../integrationtest/KubernetesTestComponents.scala |   3 +
 .../deploy/k8s/integrationtest/PVTestsSuite.scala  |  28 +--
 .../spark/deploy/k8s/integrationtest/Utils.scala   |  54 ++++-
 .../backend/minikube/Minikube.scala                |   7 +-
 14 files changed, 545 insertions(+), 75 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 59b638b..926e2df 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -29,6 +29,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
 import scala.util.{Properties, Try}
 
+import org.apache.commons.io.FilenameUtils
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
 import org.apache.hadoop.fs.{FileSystem, Path}
@@ -222,7 +223,7 @@ private[spark] class SparkSubmit extends Logging {
     // Return values
     val childArgs = new ArrayBuffer[String]()
     val childClasspath = new ArrayBuffer[String]()
-    val sparkConf = new SparkConf()
+    val sparkConf = args.toSparkConf()
     var childMainClass = ""
 
     // Set the cluster manager
@@ -313,6 +314,9 @@ private[spark] class SparkSubmit extends Logging {
     val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
     val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
     val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
+    val isKubernetesClient = clusterManager == KUBERNETES && deployMode == CLIENT
+    val isKubernetesClusterModeDriver = isKubernetesClient &&
+      sparkConf.getBoolean("spark.kubernetes.submitInDriver", false)
     val isMesosClient = clusterManager == MESOS && deployMode == CLIENT
 
     if (!isMesosCluster && !isStandAloneCluster) {
@@ -323,9 +327,25 @@ private[spark] class SparkSubmit extends Logging {
         args.ivySettingsPath)
 
       if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
-        args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
-        if (args.isPython || isInternal(args.primaryResource)) {
-          args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
+        // In K8s client mode, when in the driver, add resolved jars early as we might need
+        // them at the submit time for artifact downloading.
+        // For example we might use the dependencies for downloading
+        // files from a Hadoop Compatible fs eg. S3. In this case the user might pass:
+        // --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6
+        if (isKubernetesClusterModeDriver) {
+          val loader = getSubmitClassLoader(sparkConf)
+          for (jar <- resolvedMavenCoordinates.split(",")) {
+            addJarToClasspath(jar, loader)
+          }
+        } else if (isKubernetesCluster) {
+          // We need this in K8s cluster mode so that we can upload local deps
+          // via the k8s application, like in cluster mode driver
+          childClasspath ++= resolvedMavenCoordinates.split(",")
+        } else {
+          args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
+          if (args.isPython || isInternal(args.primaryResource)) {
+            args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
+          }
         }
       }
 
@@ -380,6 +400,17 @@ private[spark] class SparkSubmit extends Logging {
       localPyFiles = Option(args.pyFiles).map {
         downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
       }.orNull
+
+      if (isKubernetesClusterModeDriver) {
+        // Replace with the downloaded local jar path to avoid propagating hadoop compatible uris.
+        // Executors will get the jars from the Spark file server.
+        // Explicitly download the related files here
+        args.jars = renameResourcesToLocalFS(args.jars, localJars)
+        val localFiles = Option(args.files).map {
+          downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
+        }.orNull
+        args.files = renameResourcesToLocalFS(args.files, localFiles)
+      }
     }
 
     // When running in YARN, for some remote resources with scheme:
@@ -535,11 +566,13 @@ private[spark] class SparkSubmit extends Logging {
       OptionAssigner(args.pyFiles, ALL_CLUSTER_MGRS, CLUSTER, confKey = SUBMIT_PYTHON_FILES.key),
 
       // Propagate attributes for dependency resolution at the driver side
-      OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.packages"),
-      OptionAssigner(args.repositories, STANDALONE | MESOS, CLUSTER,
-        confKey = "spark.jars.repositories"),
-      OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.ivy"),
-      OptionAssigner(args.packagesExclusions, STANDALONE | MESOS,
+      OptionAssigner(args.packages, STANDALONE | MESOS | KUBERNETES,
+        CLUSTER, confKey = "spark.jars.packages"),
+      OptionAssigner(args.repositories, STANDALONE | MESOS | KUBERNETES,
+        CLUSTER, confKey = "spark.jars.repositories"),
+      OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS | KUBERNETES,
+        CLUSTER, confKey = "spark.jars.ivy"),
+      OptionAssigner(args.packagesExclusions, STANDALONE | MESOS | KUBERNETES,
         CLUSTER, confKey = "spark.jars.excludes"),
 
       // Yarn only
@@ -777,6 +810,21 @@ private[spark] class SparkSubmit extends Logging {
     (childArgs, childClasspath, sparkConf, childMainClass)
   }
 
+  private def renameResourcesToLocalFS(resources: String, localResources: String): String = {
+    if (resources != null && localResources != null) {
+      val localResourcesSeq = Utils.stringToSeq(localResources)
+      Utils.stringToSeq(resources).map { resource =>
+        val filenameRemote = FilenameUtils.getName(new URI(resource).getPath)
+        localResourcesSeq.find { localUri =>
+          val filenameLocal = FilenameUtils.getName(new URI(localUri).getPath)
+          filenameRemote == filenameLocal
+        }.getOrElse(resource)
+      }.mkString(",")
+    } else {
+      resources
+    }
+  }
+
   // [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with
   // renewer set to the YARN ResourceManager.  Since YARN isn't configured in Mesos or Kubernetes
   // mode, we must trick it into thinking we're YARN.
@@ -787,6 +835,19 @@ private[spark] class SparkSubmit extends Logging {
     sparkConf.set(key, shortUserName)
   }
 
+  private def getSubmitClassLoader(sparkConf: SparkConf): MutableURLClassLoader = {
+    val loader =
+      if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
+        new ChildFirstURLClassLoader(new Array[URL](0),
+          Thread.currentThread.getContextClassLoader)
+      } else {
+        new MutableURLClassLoader(new Array[URL](0),
+          Thread.currentThread.getContextClassLoader)
+      }
+    Thread.currentThread.setContextClassLoader(loader)
+    loader
+  }
+
   /**
    * Run the main method of the child class using the submit arguments.
    *
@@ -814,17 +875,7 @@ private[spark] class SparkSubmit extends Logging {
       logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")
       logInfo("\n")
     }
-
-    val loader =
-      if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
-        new ChildFirstURLClassLoader(new Array[URL](0),
-          Thread.currentThread.getContextClassLoader)
-      } else {
-        new MutableURLClassLoader(new Array[URL](0),
-          Thread.currentThread.getContextClassLoader)
-      }
-    Thread.currentThread.setContextClassLoader(loader)
-
+    val loader = getSubmitClassLoader(sparkConf)
     for (jar <- childClasspath) {
       addJarToClasspath(jar, loader)
     }
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index b0c187d..65c9cb9 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -1325,12 +1325,12 @@ class SparkSubmitSuite
       "--class", "Foo",
       "app.jar")
     val conf = new SparkSubmitArguments(clArgs).toSparkConf()
-     Seq(
-       testConf,
-       masterConf
-     ).foreach { case (k, v) =>
-       conf.get(k) should be (v)
-     }
+    Seq(
+      testConf,
+      masterConf
+    ).foreach { case (k, v) =>
+      conf.get(k) should be (v)
+    }
   }
 }
 
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 72833cc..8a424b5 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -208,8 +208,31 @@ If your application's dependencies are all hosted in remote locations like HDFS
 by their appropriate remote URIs. Also, application dependencies can be pre-mounted into custom-built Docker images.
 Those dependencies can be added to the classpath by referencing them with `local://` URIs and/or setting the
 `SPARK_EXTRA_CLASSPATH` environment variable in your Dockerfiles. The `local://` scheme is also required when referring to
-dependencies in custom-built Docker images in `spark-submit`. Note that using application dependencies from the submission
-client's local file system is currently not yet supported.
+dependencies in custom-built Docker images in `spark-submit`. We support dependencies from the submission
+client's local file system using the `file://` scheme or without a scheme (using a full path), where the destination should be a Hadoop compatible filesystem.
+A typical example of this using S3 is via passing the following options:
+
+```
+...
+--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.6
+--conf spark.kubernetes.file.upload.path=s3a://<s3-bucket>/path
+--conf spark.hadoop.fs.s3a.access.key=...
+--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
+--conf spark.hadoop.fs.s3a.fast.upload=true
+--conf spark.hadoop.fs.s3a.secret.key=....
+--conf spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp
+file:///full/path/to/app.jar
+```
+The app jar file will be uploaded to the S3 and then when the driver is launched it will be downloaded
+to the driver pod and will be added to its classpath. Spark will generate a subdir under the upload path with a random name
+to avoid conflicts with spark apps running in parallel. User could manage the subdirs created according to his needs.
+
+The client scheme is supported for the application jar, and dependencies specified by properties `spark.jars` and `spark.files`.
+
+Important: all client-side dependencies will be uploaded to the given path with a flat directory structure so
+file names must be unique otherwise files will be overwritten. Also make sure in the derived k8s image default ivy dir
+has the required access rights or modify the settings as above. The latter is also important if you use `--packages` in
+cluster mode.
 
 ## Secret Management
 Kubernetes [Secrets](https://kubernetes.io/docs/concepts/configuration/secret/) can be used to provide credentials for a
@@ -455,7 +478,6 @@ There are several Spark on Kubernetes features that are currently being worked o
 Some of these include:
 
 * Dynamic Resource Allocation and External Shuffle Service
-* Local File Dependency Management
 * Job Queues and Resource Management
 
 # Configuration
@@ -1078,6 +1100,15 @@ See the [configuration page](configuration.html) for information on Spark config
   Specify the grace period in seconds when deleting a Spark application using spark-submit.
   </td>
 </tr>
+<tr>
+  <td><code>spark.kubernetes.file.upload.path</code></td>
+  <td>(none)</td>
+  <td>
+    Path to store files at the spark submit side in cluster mode. For example:
+    <code>spark.kubernetes.file.upload.path=s3a://<s3-bucket>/path</code>
+    File should specified as <code>file://path/to/file </code> or absolute path.
+  </td>
+</tr>
 </table>
 
 #### Pod template properties
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index cc1bfd9..7e99220 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -338,6 +338,13 @@ private[spark] object Config extends Logging {
       .timeConf(TimeUnit.SECONDS)
       .createOptional
 
+  val KUBERNETES_FILE_UPLOAD_PATH =
+    ConfigBuilder("spark.kubernetes.file.upload.path")
+      .doc("Hadoop compatible file system path where files from the local file system " +
+        "will be uploded to in cluster mode.")
+      .stringConf
+      .createOptional
+
   val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
   val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
   val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index 3f7fcec..a571035 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -16,18 +16,25 @@
  */
 package org.apache.spark.deploy.k8s
 
-import java.io.File
+import java.io.{File, IOException}
+import java.net.URI
 import java.security.SecureRandom
+import java.util.UUID
 
 import scala.collection.JavaConverters._
 
 import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder}
 import io.fabric8.kubernetes.client.KubernetesClient
 import org.apache.commons.codec.binary.Hex
+import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.k8s.Config.KUBERNETES_FILE_UPLOAD_PATH
 import org.apache.spark.internal.Logging
+import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.util.{Clock, SystemClock, Utils}
+import org.apache.spark.util.Utils.getHadoopFileSystem
 
 private[spark] object KubernetesUtils extends Logging {
 
@@ -209,4 +216,77 @@ private[spark] object KubernetesUtils extends Logging {
     Hex.encodeHexString(random) + time
   }
 
+  /**
+   * Upload files and modify their uris
+   */
+  def uploadAndTransformFileUris(fileUris: Iterable[String], conf: Option[SparkConf] = None)
+    : Iterable[String] = {
+    fileUris.map { uri =>
+      uploadFileUri(uri, conf)
+    }
+  }
+
+  private def isLocalDependency(uri: URI): Boolean = {
+    uri.getScheme match {
+      case null | "file" => true
+      case _ => false
+    }
+  }
+
+  def isLocalAndResolvable(resource: String): Boolean = {
+    resource != SparkLauncher.NO_RESOURCE &&
+      isLocalDependency(Utils.resolveURI(resource))
+  }
+
+  def renameMainAppResource(resource: String, conf: SparkConf): String = {
+    if (isLocalAndResolvable(resource)) {
+      SparkLauncher.NO_RESOURCE
+    } else {
+      resource
+   }
+  }
+
+  def uploadFileUri(uri: String, conf: Option[SparkConf] = None): String = {
+    conf match {
+      case Some(sConf) =>
+        if (sConf.get(KUBERNETES_FILE_UPLOAD_PATH).isDefined) {
+          val fileUri = Utils.resolveURI(uri)
+          try {
+            val hadoopConf = SparkHadoopUtil.get.newConfiguration(sConf)
+            val uploadPath = sConf.get(KUBERNETES_FILE_UPLOAD_PATH).get
+            val fs = getHadoopFileSystem(Utils.resolveURI(uploadPath), hadoopConf)
+            val randomDirName = s"spark-upload-${UUID.randomUUID()}"
+            fs.mkdirs(new Path(s"${uploadPath}/${randomDirName}"))
+            val targetUri = s"${uploadPath}/${randomDirName}/${fileUri.getPath.split("/").last}"
+            log.info(s"Uploading file: ${fileUri.getPath} to dest: $targetUri...")
+            uploadFileToHadoopCompatibleFS(new Path(fileUri.getPath), new Path(targetUri), fs)
+            targetUri
+          } catch {
+            case e: Exception =>
+              throw new SparkException(s"Uploading file ${fileUri.getPath} failed...", e)
+          }
+        } else {
+          throw new SparkException("Please specify " +
+            "spark.kubernetes.file.upload.path property.")
+        }
+      case _ => throw new SparkException("Spark configuration is missing...")
+    }
+  }
+
+  /**
+   * Upload a file to a Hadoop-compatible filesystem.
+   */
+  private def uploadFileToHadoopCompatibleFS(
+      src: Path,
+      dest: Path,
+      fs: FileSystem,
+      delSrc : Boolean = false,
+      overwrite: Boolean = true): Unit = {
+    try {
+      fs.copyFromLocalFile(false, true, src, dest)
+    } catch {
+      case e: IOException =>
+        throw new SparkException(s"Error uploading file ${src.getName}", e)
+    }
+  }
 }
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index 61fc67f..92463df 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -27,7 +27,6 @@ import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.submit._
 import org.apache.spark.internal.config._
-import org.apache.spark.internal.config.UI._
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.Utils
 
@@ -156,6 +155,15 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
       KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.resourceNamePrefix,
       KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
       MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
+    // try upload local, resolvable files to a hadoop compatible file system
+    Seq(JARS, FILES).foreach { key =>
+      val value = conf.get(key).filter(uri => KubernetesUtils.isLocalAndResolvable(uri))
+      val resolved = KubernetesUtils.uploadAndTransformFileUris(value, Some(conf.sparkConf))
+      if (resolved.nonEmpty) {
+        additionalProps.put(key.key, resolved.mkString(","))
+      }
+    }
     additionalProps.toMap
   }
 }
+
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
index 9c9cd1e..7faf0d7 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
@@ -24,9 +24,7 @@ import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.submit._
-import org.apache.spark.internal.config._
 import org.apache.spark.launcher.SparkLauncher
-import org.apache.spark.util.Utils
 
 /**
  * Creates the driver command for running the user app, and propagates needed configuration so
@@ -88,11 +86,17 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
   }
 
   private def baseDriverContainer(pod: SparkPod, resource: String): ContainerBuilder = {
+    // re-write primary resource, app jar is also added to spark.jars by default in SparkSubmit
+    val resolvedResource = if (conf.mainAppResource.isInstanceOf[JavaMainAppResource]) {
+      KubernetesUtils.renameMainAppResource(resource, conf.sparkConf)
+    } else {
+      resource
+    }
     new ContainerBuilder(pod.container)
       .addToArgs("driver")
       .addToArgs("--properties-file", SPARK_CONF_PATH)
       .addToArgs("--class", conf.mainClass)
-      .addToArgs(resource)
+      .addToArgs(resolvedResource)
       .addToArgs(conf.appArgs: _*)
   }
 }
diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml
index 2248482..d129ffb 100644
--- a/resource-managers/kubernetes/integration-tests/pom.xml
+++ b/resource-managers/kubernetes/integration-tests/pom.xml
@@ -76,6 +76,12 @@
       <artifactId>spark-tags_${scala.binary.version}</artifactId>
       <type>test-jar</type>
     </dependency>
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk</artifactId>
+      <version>1.7.4</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
new file mode 100644
index 0000000..b0c4182
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.net.URL
+
+import scala.collection.JavaConverters._
+
+import com.amazonaws.auth.BasicAWSCredentials
+import com.amazonaws.services.s3.AmazonS3Client
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.time.{Minutes, Span}
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.integrationtest.DepsTestsSuite.{DEPS_TIMEOUT, FILE_CONTENTS, HOST_PATH}
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{INTERVAL, MinikubeTag, TIMEOUT}
+import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.Minikube
+
+private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
+  import KubernetesSuite.k8sTestTag
+
+  val cName = "ceph-nano"
+  val svcName = s"$cName-s3"
+  val bucket = "spark"
+
+  private def getCephContainer(): Container = {
+    val envVars = Map ( "NETWORK_AUTO_DETECT" -> "4",
+      "RGW_CIVETWEB_PORT" -> "8000",
+      "SREE_PORT" -> "5001",
+      "CEPH_DEMO_UID" -> "nano",
+      "CEPH_DAEMON" -> "demo",
+      "DEBUG" -> "verbose"
+    ).map( envV =>
+      new EnvVarBuilder()
+        .withName(envV._1)
+        .withValue(envV._2)
+        .build()
+    ).toArray
+
+    val resources = Map(
+      "cpu" -> new QuantityBuilder()
+        .withAmount("1")
+        .build(),
+      "memory" -> new QuantityBuilder()
+        .withAmount("512M")
+        .build()
+    ).asJava
+
+    new ContainerBuilder()
+      .withImage("ceph/daemon:v4.0.0-stable-4.0-master-centos-7-x86_64")
+      .withImagePullPolicy("Always")
+      .withName(cName)
+      .withPorts(new ContainerPortBuilder()
+          .withName(svcName)
+          .withProtocol("TCP")
+          .withContainerPort(8000)
+        .build()
+      )
+      .withResources(new ResourceRequirementsBuilder()
+        .withLimits(resources)
+        .withRequests(resources)
+        .build()
+      )
+      .withEnv(envVars: _*)
+      .build()
+  }
+
+  // Based on https://github.com/ceph/cn
+  private def setupCephStorage(): Unit = {
+    val labels = Map("app" -> "ceph", "daemon" -> "nano").asJava
+    val cephService = new ServiceBuilder()
+      .withNewMetadata()
+        .withName(svcName)
+      .withLabels(labels)
+      .endMetadata()
+      .withNewSpec()
+        .withPorts(new ServicePortBuilder()
+          .withName("https")
+          .withPort(8000)
+          .withProtocol("TCP")
+          .withTargetPort(new IntOrString(8000))
+          .build()
+        )
+        .withType("NodePort")
+        .withSelector(labels)
+      .endSpec()
+      .build()
+
+    val cephStatefulSet = new StatefulSetBuilder()
+      .withNewMetadata()
+        .withName(cName)
+        .withLabels(labels)
+      .endMetadata()
+      .withNewSpec()
+        .withReplicas(1)
+        .withNewSelector()
+          .withMatchLabels(Map("app" -> "ceph").asJava)
+        .endSelector()
+        .withServiceName(cName)
+        .withNewTemplate()
+          .withNewMetadata()
+            .withName(cName)
+            .withLabels(labels)
+           .endMetadata()
+          .withNewSpec()
+            .withContainers(getCephContainer())
+          .endSpec()
+        .endTemplate()
+      .endSpec()
+      .build()
+
+    kubernetesTestComponents
+      .kubernetesClient
+      .services()
+      .create(cephService)
+
+    kubernetesTestComponents
+      .kubernetesClient
+      .apps()
+      .statefulSets()
+      .create(cephStatefulSet)
+  }
+
+ private def deleteCephStorage(): Unit = {
+    kubernetesTestComponents
+      .kubernetesClient
+      .apps()
+      .statefulSets()
+      .withName(cName)
+      .delete()
+
+    kubernetesTestComponents
+      .kubernetesClient
+      .services()
+      .withName(svcName)
+      .delete()
+  }
+
+  test("Launcher client dependencies", k8sTestTag, MinikubeTag) {
+    val fileName = Utils.createTempFile(FILE_CONTENTS, HOST_PATH)
+    try {
+      setupCephStorage()
+      val cephUrlStr = getServiceUrl(svcName)
+      val cephUrl = new URL(cephUrlStr)
+      val cephHost = cephUrl.getHost
+      val cephPort = cephUrl.getPort
+      val examplesJar = Utils.getExamplesJarAbsolutePath(sparkHomeDir)
+      val (accessKey, secretKey) = getCephCredentials()
+      sparkAppConf
+        .set("spark.hadoop.fs.s3a.access.key", accessKey)
+        .set("spark.hadoop.fs.s3a.secret.key", secretKey)
+        .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
+        .set("spark.hadoop.fs.s3a.endpoint", s"$cephHost:$cephPort")
+        .set("spark.kubernetes.file.upload.path", s"s3a://$bucket")
+        .set("spark.files", s"$HOST_PATH/$fileName")
+        .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
+        .set("spark.jars.packages", "com.amazonaws:aws-java-sdk:" +
+          "1.7.4,org.apache.hadoop:hadoop-aws:2.7.6")
+        .set("spark.driver.extraJavaOptions", "-Divy.cache.dir=/tmp -Divy.home=/tmp")
+      createS3Bucket(accessKey, secretKey, cephUrlStr)
+      runSparkRemoteCheckAndVerifyCompletion(appResource = examplesJar,
+        appArgs = Array(fileName),
+        timeout = Option(DEPS_TIMEOUT))
+    } finally {
+      // make sure this always runs
+      deleteCephStorage()
+    }
+  }
+
+  // There isn't a cleaner way to get the credentials
+  // when ceph-nano runs on k8s
+  private def getCephCredentials(): (String, String) = {
+    Eventually.eventually(TIMEOUT, INTERVAL) {
+      val cephPod = kubernetesTestComponents
+        .kubernetesClient
+        .pods()
+        .withName(s"$cName-0")
+        .get()
+      implicit val podName: String = cephPod.getMetadata.getName
+      implicit val components: KubernetesTestComponents = kubernetesTestComponents
+      val contents = Utils.executeCommand("cat", "/nano_user_details")
+    (extractS3Key(contents, "access_key"), extractS3Key(contents, "secret_key"))
+    }
+  }
+
+  private def extractS3Key(data: String, key: String): String = {
+    data.split("\n")
+      .filter(_.contains(key))
+      .head
+      .split(":")
+      .last
+      .trim
+      .replaceAll("[,|\"]", "")
+  }
+
+  private def createS3Bucket(accessKey: String, secretKey: String, endPoint: String): Unit = {
+    Eventually.eventually(TIMEOUT, INTERVAL) {
+      try {
+        val credentials = new BasicAWSCredentials(accessKey, secretKey)
+        val s3client = new AmazonS3Client(credentials)
+        s3client.setEndpoint(endPoint)
+        s3client.createBucket(bucket)
+      } catch {
+        case e: Exception =>
+          throw new SparkException(s"Failed to create bucket $bucket.", e)
+      }
+    }
+  }
+
+  private def getServiceUrl(serviceName: String): String = {
+    Eventually.eventually(TIMEOUT, INTERVAL) {
+      // ns is always available either random or provided by the user
+      Minikube.minikubeServiceAction(serviceName, "-n", kubernetesTestComponents.namespace, "--url")
+    }
+  }
+}
+
+private[spark] object DepsTestsSuite {
+  val HOST_PATH = "/tmp"
+  val FILE_CONTENTS = "test deps"
+  // increase the default because jar resolution takes time in the container
+  val DEPS_TIMEOUT = PatienceConfiguration.Timeout(Span(4, Minutes))
+}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index bc0bb20..51e758f 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -30,9 +30,10 @@ import io.fabric8.kubernetes.client.Watcher.Action
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Tag}
 import org.scalatest.Matchers
 import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.concurrent.PatienceConfiguration.{Interval, Timeout}
 import org.scalatest.time.{Minutes, Seconds, Span}
 
-import org.apache.spark.{SPARK_VERSION, SparkFunSuite}
+import org.apache.spark.SparkFunSuite
 import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
 import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory}
 import org.apache.spark.internal.Logging
@@ -41,7 +42,7 @@ import org.apache.spark.internal.config._
 class KubernetesSuite extends SparkFunSuite
   with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite
   with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with PVTestsSuite
-  with Logging with Eventually with Matchers {
+  with DepsTestsSuite with Logging with Eventually with Matchers {
 
   import KubernetesSuite._
 
@@ -120,12 +121,8 @@ class KubernetesSuite extends SparkFunSuite
     pyImage = testImageRef(sys.props.getOrElse(CONFIG_KEY_IMAGE_PYTHON, "spark-py"))
     rImage = testImageRef(sys.props.getOrElse(CONFIG_KEY_IMAGE_R, "spark-r"))
 
-    val scalaVersion = scala.util.Properties.versionNumberString
-      .split("\\.")
-      .take(2)
-      .mkString(".")
     containerLocalSparkDistroExamplesJar =
-      s"local:///opt/spark/examples/jars/spark-examples_$scalaVersion-${SPARK_VERSION}.jar"
+      s"local:///opt/spark/examples/jars/${Utils.getExamplesJarName()}"
     testBackend = IntegrationTestBackendFactory.getTestBackend
     testBackend.initialize()
     kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient)
@@ -198,7 +195,7 @@ class KubernetesSuite extends SparkFunSuite
       appLocator,
       isJVM,
       None,
-      interval)
+      Option((interval, None)))
   }
 
   protected def runSparkRemoteCheckAndVerifyCompletion(
@@ -206,7 +203,8 @@ class KubernetesSuite extends SparkFunSuite
       driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
       executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
       appArgs: Array[String],
-      appLocator: String = appLocator): Unit = {
+      appLocator: String = appLocator,
+      timeout: Option[PatienceConfiguration.Timeout] = None): Unit = {
     runSparkApplicationAndVerifyCompletion(
       appResource,
       SPARK_REMOTE_MAIN_CLASS,
@@ -215,7 +213,8 @@ class KubernetesSuite extends SparkFunSuite
       driverPodChecker,
       executorPodChecker,
       appLocator,
-      true)
+      true,
+      executorPatience = Option((None, timeout)))
   }
 
   protected def runSparkJVMCheckAndVerifyCompletion(
@@ -265,7 +264,7 @@ class KubernetesSuite extends SparkFunSuite
       appLocator: String,
       isJVM: Boolean,
       pyFiles: Option[String] = None,
-      interval: Option[PatienceConfiguration.Interval] = None): Unit = {
+      executorPatience: Option[(Option[Interval], Option[Timeout])] = None): Unit = {
     val appArguments = SparkAppArguments(
       mainAppResource = appResource,
       mainClass = mainClass,
@@ -306,8 +305,16 @@ class KubernetesSuite extends SparkFunSuite
         }
       })
 
-    val patienceInterval = interval.getOrElse(INTERVAL)
-    Eventually.eventually(TIMEOUT, patienceInterval) { execPods.values.nonEmpty should be (true) }
+    val (patienceInterval, patienceTimeout) = {
+      executorPatience match {
+        case Some(patience) => (patience._1.getOrElse(INTERVAL), patience._2.getOrElse(TIMEOUT))
+        case _ => (INTERVAL, TIMEOUT)
+      }
+    }
+
+    Eventually.eventually(patienceTimeout, patienceInterval) {
+      execPods.values.nonEmpty should be (true)
+    }
     execWatcher.close()
     execPods.values.foreach(executorPodChecker(_))
     Eventually.eventually(TIMEOUT, patienceInterval) {
@@ -408,6 +415,7 @@ class KubernetesSuite extends SparkFunSuite
 
 private[spark] object KubernetesSuite {
   val k8sTestTag = Tag("k8s")
+  val MinikubeTag = Tag("minikube")
   val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi"
   val SPARK_DFS_READ_WRITE_TEST = "org.apache.spark.examples.DFSReadWriteTest"
   val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest"
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
index 50a7ef7..4cfda8a 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable.ArrayBuffer
 import io.fabric8.kubernetes.client.DefaultKubernetesClient
 import org.scalatest.concurrent.Eventually
 
+import org.apache.spark.SparkConf
 import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.JARS
@@ -93,6 +94,8 @@ private[spark] class SparkAppConf {
   override def toString: String = map.toString
 
   def toStringArray: Iterable[String] = map.toList.flatMap(t => List("--conf", s"${t._1}=${t._2}"))
+
+  def toSparkConf: SparkConf = new SparkConf().setAll(map)
 }
 
 private[spark] case class SparkAppArguments(
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala
index d7a237f..7776bc6 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala
@@ -16,13 +16,10 @@
  */
 package org.apache.spark.deploy.k8s.integrationtest
 
-import java.io.{File, PrintWriter}
-
 import scala.collection.JavaConverters._
 
 import io.fabric8.kubernetes.api.model._
 import io.fabric8.kubernetes.api.model.storage.StorageClassBuilder
-import org.scalatest.Tag
 import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
 import org.scalatest.time.{Milliseconds, Span}
 
@@ -125,25 +122,7 @@ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite =>
     }
   }
 
-  private def createTempFile(): String = {
-    val filename = try {
-      val f = File.createTempFile("tmp", ".txt", new File(HOST_PATH))
-      f.deleteOnExit()
-      new PrintWriter(f) {
-        try {
-          write(FILE_CONTENTS)
-        } finally {
-          close()
-        }
-      }
-      f.getName
-    } catch {
-      case e: Exception => e.printStackTrace(); throw e;
-    }
-    filename
-  }
-
-  test("Test PVs with local storage", k8sTestTag, MinikubeTag) {
+  test("PVs with local storage", k8sTestTag, MinikubeTag) {
     sparkAppConf
       .set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.mount.path",
         CONTAINER_MOUNT_PATH)
@@ -153,7 +132,7 @@ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite =>
         CONTAINER_MOUNT_PATH)
       .set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName",
         PVC_NAME)
-    val file = createTempFile()
+    val file = Utils.createTempFile(FILE_CONTENTS, HOST_PATH)
     try {
       setupLocalStorage()
       runDFSReadWriteAndVerifyCompletion(
@@ -170,14 +149,13 @@ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite =>
         interval = Some(PV_TESTS_INTERVAL)
       )
     } finally {
-      // make sure this always run
+      // make sure this always runs
       deleteLocalStorage()
     }
   }
 }
 
 private[spark] object PVTestsSuite {
-  val MinikubeTag = Tag("minikube")
   val STORAGE_NAME = "test-local-storage"
   val PV_NAME = "test-local-pv"
   val PVC_NAME = "test-local-pvc"
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
index d425f70..a687a1b 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
@@ -16,15 +16,26 @@
  */
 package org.apache.spark.deploy.k8s.integrationtest
 
-import java.io.Closeable
-import java.net.URI
+import java.io.{Closeable, File, PrintWriter}
+import java.nio.file.{Files, Path}
+
+import scala.collection.JavaConverters._
 
 import org.apache.commons.io.output.ByteArrayOutputStream
 
+import org.apache.spark.{SPARK_VERSION, SparkException}
 import org.apache.spark.internal.Logging
 
 object Utils extends Logging {
 
+  def getExamplesJarName(): String = {
+    val scalaVersion = scala.util.Properties.versionNumberString
+      .split("\\.")
+      .take(2)
+      .mkString(".")
+    s"spark-examples_$scalaVersion-${SPARK_VERSION}.jar"
+  }
+
   def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = {
     val resource = createResource
     try f.apply(resource) finally resource.close()
@@ -49,4 +60,43 @@ object Utils extends Logging {
     out.flush()
     out.toString()
   }
+
+  def createTempFile(contents: String, hostPath: String): String = {
+    val filename = try {
+      val f = File.createTempFile("tmp", ".txt", new File(hostPath))
+      f.deleteOnExit()
+      new PrintWriter(f) {
+        try {
+          write(contents)
+        } finally {
+          close()
+        }
+      }
+      f.getName
+    } catch {
+      case e: Exception => e.printStackTrace(); throw e;
+    }
+    filename
+  }
+
+  def getExamplesJarAbsolutePath(sparkHomeDir: Path): String = {
+    val jarName = getExamplesJarName()
+    val jarPathsFound = Files
+      .walk(sparkHomeDir)
+      .filter(Files.isRegularFile(_))
+      .filter((f: Path) => {f.toFile.getName == jarName})
+    // we should not have more than one here under current test build dir
+    // we only need one though
+    val jarPath = jarPathsFound
+      .iterator()
+      .asScala
+      .map(_.toAbsolutePath.toString)
+      .toArray
+      .headOption
+    jarPath match {
+      case Some(jar) => jar
+      case _ => throw new SparkException(s"No valid $jarName file was found " +
+        s"under spark home test dir ${sparkHomeDir.toAbsolutePath}!")
+    }
+  }
 }
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
index 78ef44b..ce2ce1c 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
@@ -112,7 +112,12 @@ private[spark] object Minikube extends Logging {
 
   private def executeMinikube(action: String, args: String*): Seq[String] = {
     ProcessUtils.executeProcess(
-      Array("bash", "-c", s"minikube $action") ++ args, MINIKUBE_STARTUP_TIMEOUT_SECONDS)
+      Array("bash", "-c", s"minikube $action ${args.mkString(" ")}"),
+      MINIKUBE_STARTUP_TIMEOUT_SECONDS)
+  }
+
+  def minikubeServiceAction(args: String*): String = {
+    executeMinikube("service", args: _*).head
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org