You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ej...@apache.org on 2019/05/22 23:16:41 UTC
[spark] branch master updated: [SPARK-23153][K8S] Support client
dependencies with a Hadoop Compatible File System
This is an automated email from the ASF dual-hosted git repository.
eje pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5e74570 [SPARK-23153][K8S] Support client dependencies with a Hadoop Compatible File System
5e74570 is described below
commit 5e74570c8f5e7dfc1ca1c53c177827c5cea57bf1
Author: Stavros Kontopoulos <st...@lightbend.com>
AuthorDate: Wed May 22 16:15:42 2019 -0700
[SPARK-23153][K8S] Support client dependencies with a Hadoop Compatible File System
## What changes were proposed in this pull request?
- solves the current issue with --packages in cluster mode (there is no ticket for it). Also note of some [issues](https://issues.apache.org/jira/browse/SPARK-22657) of the past here when hadoop libs are used at the spark submit side.
- supports spark.jars, spark.files, app jar.
It works as follows:
Spark submit uploads the deps to the HCFS. Then the driver serves the deps via the Spark file server.
No hcfs uris are propagated.
The related design document is [here](https://docs.google.com/document/d/1peg_qVhLaAl4weo5C51jQicPwLclApBsdR1To2fgc48/edit). the next option to add is the RSS but has to be improved given the discussion in the past about it (Spark 2.3).
## How was this patch tested?
- Run integration test suite.
- Run an example using S3:
```
./bin/spark-submit \
...
--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.6 \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.memory=1G \
--conf spark.kubernetes.namespace=spark \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa \
--conf spark.driver.memory=1G \
--conf spark.executor.instances=2 \
--conf spark.sql.streaming.metricsEnabled=true \
--conf "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp" \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image=skonto/spark:k8s-3.0.0 \
--conf spark.kubernetes.file.upload.path=s3a://fdp-stavros-test \
--conf spark.hadoop.fs.s3a.access.key=... \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.fast.upload=true \
--conf spark.kubernetes.executor.deleteOnTermination=false \
--conf spark.hadoop.fs.s3a.secret.key=... \
--conf spark.files=client:///...resolv.conf \
file:///my.jar **
```
Added integration tests based on [Ceph nano](https://github.com/ceph/cn). Looks very [active](http://www.sebastien-han.fr/blog/2019/02/24/Ceph-nano-is-getting-better-and-better/).
Unfortunately minio needs hadoop >= 2.8.
Closes #23546 from skonto/support-client-deps.
Authored-by: Stavros Kontopoulos <st...@lightbend.com>
Signed-off-by: Erik Erlandson <ee...@redhat.com>
---
.../org/apache/spark/deploy/SparkSubmit.scala | 91 ++++++--
.../org/apache/spark/deploy/SparkSubmitSuite.scala | 12 +-
docs/running-on-kubernetes.md | 37 +++-
.../scala/org/apache/spark/deploy/k8s/Config.scala | 7 +
.../apache/spark/deploy/k8s/KubernetesUtils.scala | 82 ++++++-
.../k8s/features/BasicDriverFeatureStep.scala | 10 +-
.../k8s/features/DriverCommandFeatureStep.scala | 10 +-
.../kubernetes/integration-tests/pom.xml | 6 +
.../k8s/integrationtest/DepsTestsSuite.scala | 239 +++++++++++++++++++++
.../k8s/integrationtest/KubernetesSuite.scala | 34 +--
.../integrationtest/KubernetesTestComponents.scala | 3 +
.../deploy/k8s/integrationtest/PVTestsSuite.scala | 28 +--
.../spark/deploy/k8s/integrationtest/Utils.scala | 54 ++++-
.../backend/minikube/Minikube.scala | 7 +-
14 files changed, 545 insertions(+), 75 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 59b638b..926e2df 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -29,6 +29,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.util.{Properties, Try}
+import org.apache.commons.io.FilenameUtils
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -222,7 +223,7 @@ private[spark] class SparkSubmit extends Logging {
// Return values
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
- val sparkConf = new SparkConf()
+ val sparkConf = args.toSparkConf()
var childMainClass = ""
// Set the cluster manager
@@ -313,6 +314,9 @@ private[spark] class SparkSubmit extends Logging {
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
+ val isKubernetesClient = clusterManager == KUBERNETES && deployMode == CLIENT
+ val isKubernetesClusterModeDriver = isKubernetesClient &&
+ sparkConf.getBoolean("spark.kubernetes.submitInDriver", false)
val isMesosClient = clusterManager == MESOS && deployMode == CLIENT
if (!isMesosCluster && !isStandAloneCluster) {
@@ -323,9 +327,25 @@ private[spark] class SparkSubmit extends Logging {
args.ivySettingsPath)
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
- args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
- if (args.isPython || isInternal(args.primaryResource)) {
- args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
+ // In K8s client mode, when in the driver, add resolved jars early as we might need
+ // them at the submit time for artifact downloading.
+ // For example we might use the dependencies for downloading
+ // files from a Hadoop Compatible fs eg. S3. In this case the user might pass:
+ // --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6
+ if (isKubernetesClusterModeDriver) {
+ val loader = getSubmitClassLoader(sparkConf)
+ for (jar <- resolvedMavenCoordinates.split(",")) {
+ addJarToClasspath(jar, loader)
+ }
+ } else if (isKubernetesCluster) {
+ // We need this in K8s cluster mode so that we can upload local deps
+ // via the k8s application, like in cluster mode driver
+ childClasspath ++= resolvedMavenCoordinates.split(",")
+ } else {
+ args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
+ if (args.isPython || isInternal(args.primaryResource)) {
+ args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
+ }
}
}
@@ -380,6 +400,17 @@ private[spark] class SparkSubmit extends Logging {
localPyFiles = Option(args.pyFiles).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
}.orNull
+
+ if (isKubernetesClusterModeDriver) {
+ // Replace with the downloaded local jar path to avoid propagating hadoop compatible uris.
+ // Executors will get the jars from the Spark file server.
+ // Explicitly download the related files here
+ args.jars = renameResourcesToLocalFS(args.jars, localJars)
+ val localFiles = Option(args.files).map {
+ downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
+ }.orNull
+ args.files = renameResourcesToLocalFS(args.files, localFiles)
+ }
}
// When running in YARN, for some remote resources with scheme:
@@ -535,11 +566,13 @@ private[spark] class SparkSubmit extends Logging {
OptionAssigner(args.pyFiles, ALL_CLUSTER_MGRS, CLUSTER, confKey = SUBMIT_PYTHON_FILES.key),
// Propagate attributes for dependency resolution at the driver side
- OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.packages"),
- OptionAssigner(args.repositories, STANDALONE | MESOS, CLUSTER,
- confKey = "spark.jars.repositories"),
- OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.ivy"),
- OptionAssigner(args.packagesExclusions, STANDALONE | MESOS,
+ OptionAssigner(args.packages, STANDALONE | MESOS | KUBERNETES,
+ CLUSTER, confKey = "spark.jars.packages"),
+ OptionAssigner(args.repositories, STANDALONE | MESOS | KUBERNETES,
+ CLUSTER, confKey = "spark.jars.repositories"),
+ OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS | KUBERNETES,
+ CLUSTER, confKey = "spark.jars.ivy"),
+ OptionAssigner(args.packagesExclusions, STANDALONE | MESOS | KUBERNETES,
CLUSTER, confKey = "spark.jars.excludes"),
// Yarn only
@@ -777,6 +810,21 @@ private[spark] class SparkSubmit extends Logging {
(childArgs, childClasspath, sparkConf, childMainClass)
}
+ private def renameResourcesToLocalFS(resources: String, localResources: String): String = {
+ if (resources != null && localResources != null) {
+ val localResourcesSeq = Utils.stringToSeq(localResources)
+ Utils.stringToSeq(resources).map { resource =>
+ val filenameRemote = FilenameUtils.getName(new URI(resource).getPath)
+ localResourcesSeq.find { localUri =>
+ val filenameLocal = FilenameUtils.getName(new URI(localUri).getPath)
+ filenameRemote == filenameLocal
+ }.getOrElse(resource)
+ }.mkString(",")
+ } else {
+ resources
+ }
+ }
+
// [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with
// renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos or Kubernetes
// mode, we must trick it into thinking we're YARN.
@@ -787,6 +835,19 @@ private[spark] class SparkSubmit extends Logging {
sparkConf.set(key, shortUserName)
}
+ private def getSubmitClassLoader(sparkConf: SparkConf): MutableURLClassLoader = {
+ val loader =
+ if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
+ new ChildFirstURLClassLoader(new Array[URL](0),
+ Thread.currentThread.getContextClassLoader)
+ } else {
+ new MutableURLClassLoader(new Array[URL](0),
+ Thread.currentThread.getContextClassLoader)
+ }
+ Thread.currentThread.setContextClassLoader(loader)
+ loader
+ }
+
/**
* Run the main method of the child class using the submit arguments.
*
@@ -814,17 +875,7 @@ private[spark] class SparkSubmit extends Logging {
logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")
logInfo("\n")
}
-
- val loader =
- if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
- new ChildFirstURLClassLoader(new Array[URL](0),
- Thread.currentThread.getContextClassLoader)
- } else {
- new MutableURLClassLoader(new Array[URL](0),
- Thread.currentThread.getContextClassLoader)
- }
- Thread.currentThread.setContextClassLoader(loader)
-
+ val loader = getSubmitClassLoader(sparkConf)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index b0c187d..65c9cb9 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -1325,12 +1325,12 @@ class SparkSubmitSuite
"--class", "Foo",
"app.jar")
val conf = new SparkSubmitArguments(clArgs).toSparkConf()
- Seq(
- testConf,
- masterConf
- ).foreach { case (k, v) =>
- conf.get(k) should be (v)
- }
+ Seq(
+ testConf,
+ masterConf
+ ).foreach { case (k, v) =>
+ conf.get(k) should be (v)
+ }
}
}
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 72833cc..8a424b5 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -208,8 +208,31 @@ If your application's dependencies are all hosted in remote locations like HDFS
by their appropriate remote URIs. Also, application dependencies can be pre-mounted into custom-built Docker images.
Those dependencies can be added to the classpath by referencing them with `local://` URIs and/or setting the
`SPARK_EXTRA_CLASSPATH` environment variable in your Dockerfiles. The `local://` scheme is also required when referring to
-dependencies in custom-built Docker images in `spark-submit`. Note that using application dependencies from the submission
-client's local file system is currently not yet supported.
+dependencies in custom-built Docker images in `spark-submit`. We support dependencies from the submission
+client's local file system using the `file://` scheme or without a scheme (using a full path), where the destination should be a Hadoop compatible filesystem.
+A typical example of this using S3 is via passing the following options:
+
+```
+...
+--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.6
+--conf spark.kubernetes.file.upload.path=s3a://<s3-bucket>/path
+--conf spark.hadoop.fs.s3a.access.key=...
+--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
+--conf spark.hadoop.fs.s3a.fast.upload=true
+--conf spark.hadoop.fs.s3a.secret.key=....
+--conf spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp
+file:///full/path/to/app.jar
+```
+The app jar file will be uploaded to the S3 and then when the driver is launched it will be downloaded
+to the driver pod and will be added to its classpath. Spark will generate a subdir under the upload path with a random name
+to avoid conflicts with spark apps running in parallel. User could manage the subdirs created according to his needs.
+
+The client scheme is supported for the application jar, and dependencies specified by properties `spark.jars` and `spark.files`.
+
+Important: all client-side dependencies will be uploaded to the given path with a flat directory structure so
+file names must be unique otherwise files will be overwritten. Also make sure in the derived k8s image default ivy dir
+has the required access rights or modify the settings as above. The latter is also important if you use `--packages` in
+cluster mode.
## Secret Management
Kubernetes [Secrets](https://kubernetes.io/docs/concepts/configuration/secret/) can be used to provide credentials for a
@@ -455,7 +478,6 @@ There are several Spark on Kubernetes features that are currently being worked o
Some of these include:
* Dynamic Resource Allocation and External Shuffle Service
-* Local File Dependency Management
* Job Queues and Resource Management
# Configuration
@@ -1078,6 +1100,15 @@ See the [configuration page](configuration.html) for information on Spark config
Specify the grace period in seconds when deleting a Spark application using spark-submit.
</td>
</tr>
+<tr>
+ <td><code>spark.kubernetes.file.upload.path</code></td>
+ <td>(none)</td>
+ <td>
+ Path to store files at the spark submit side in cluster mode. For example:
+ <code>spark.kubernetes.file.upload.path=s3a://<s3-bucket>/path</code>
+ File should specified as <code>file://path/to/file </code> or absolute path.
+ </td>
+</tr>
</table>
#### Pod template properties
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index cc1bfd9..7e99220 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -338,6 +338,13 @@ private[spark] object Config extends Logging {
.timeConf(TimeUnit.SECONDS)
.createOptional
+ val KUBERNETES_FILE_UPLOAD_PATH =
+ ConfigBuilder("spark.kubernetes.file.upload.path")
+ .doc("Hadoop compatible file system path where files from the local file system " +
+ "will be uploded to in cluster mode.")
+ .stringConf
+ .createOptional
+
val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index 3f7fcec..a571035 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -16,18 +16,25 @@
*/
package org.apache.spark.deploy.k8s
-import java.io.File
+import java.io.{File, IOException}
+import java.net.URI
import java.security.SecureRandom
+import java.util.UUID
import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.commons.codec.binary.Hex
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.k8s.Config.KUBERNETES_FILE_UPLOAD_PATH
import org.apache.spark.internal.Logging
+import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.{Clock, SystemClock, Utils}
+import org.apache.spark.util.Utils.getHadoopFileSystem
private[spark] object KubernetesUtils extends Logging {
@@ -209,4 +216,77 @@ private[spark] object KubernetesUtils extends Logging {
Hex.encodeHexString(random) + time
}
+ /**
+ * Upload files and modify their uris
+ */
+ def uploadAndTransformFileUris(fileUris: Iterable[String], conf: Option[SparkConf] = None)
+ : Iterable[String] = {
+ fileUris.map { uri =>
+ uploadFileUri(uri, conf)
+ }
+ }
+
+ private def isLocalDependency(uri: URI): Boolean = {
+ uri.getScheme match {
+ case null | "file" => true
+ case _ => false
+ }
+ }
+
+ def isLocalAndResolvable(resource: String): Boolean = {
+ resource != SparkLauncher.NO_RESOURCE &&
+ isLocalDependency(Utils.resolveURI(resource))
+ }
+
+ def renameMainAppResource(resource: String, conf: SparkConf): String = {
+ if (isLocalAndResolvable(resource)) {
+ SparkLauncher.NO_RESOURCE
+ } else {
+ resource
+ }
+ }
+
+ def uploadFileUri(uri: String, conf: Option[SparkConf] = None): String = {
+ conf match {
+ case Some(sConf) =>
+ if (sConf.get(KUBERNETES_FILE_UPLOAD_PATH).isDefined) {
+ val fileUri = Utils.resolveURI(uri)
+ try {
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration(sConf)
+ val uploadPath = sConf.get(KUBERNETES_FILE_UPLOAD_PATH).get
+ val fs = getHadoopFileSystem(Utils.resolveURI(uploadPath), hadoopConf)
+ val randomDirName = s"spark-upload-${UUID.randomUUID()}"
+ fs.mkdirs(new Path(s"${uploadPath}/${randomDirName}"))
+ val targetUri = s"${uploadPath}/${randomDirName}/${fileUri.getPath.split("/").last}"
+ log.info(s"Uploading file: ${fileUri.getPath} to dest: $targetUri...")
+ uploadFileToHadoopCompatibleFS(new Path(fileUri.getPath), new Path(targetUri), fs)
+ targetUri
+ } catch {
+ case e: Exception =>
+ throw new SparkException(s"Uploading file ${fileUri.getPath} failed...", e)
+ }
+ } else {
+ throw new SparkException("Please specify " +
+ "spark.kubernetes.file.upload.path property.")
+ }
+ case _ => throw new SparkException("Spark configuration is missing...")
+ }
+ }
+
+ /**
+ * Upload a file to a Hadoop-compatible filesystem.
+ */
+ private def uploadFileToHadoopCompatibleFS(
+ src: Path,
+ dest: Path,
+ fs: FileSystem,
+ delSrc : Boolean = false,
+ overwrite: Boolean = true): Unit = {
+ try {
+ fs.copyFromLocalFile(false, true, src, dest)
+ } catch {
+ case e: IOException =>
+ throw new SparkException(s"Error uploading file ${src.getName}", e)
+ }
+ }
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index 61fc67f..92463df 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -27,7 +27,6 @@ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.config._
-import org.apache.spark.internal.config.UI._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils
@@ -156,6 +155,15 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.resourceNamePrefix,
KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
+ // try upload local, resolvable files to a hadoop compatible file system
+ Seq(JARS, FILES).foreach { key =>
+ val value = conf.get(key).filter(uri => KubernetesUtils.isLocalAndResolvable(uri))
+ val resolved = KubernetesUtils.uploadAndTransformFileUris(value, Some(conf.sparkConf))
+ if (resolved.nonEmpty) {
+ additionalProps.put(key.key, resolved.mkString(","))
+ }
+ }
additionalProps.toMap
}
}
+
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
index 9c9cd1e..7faf0d7 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
@@ -24,9 +24,7 @@ import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._
-import org.apache.spark.internal.config._
import org.apache.spark.launcher.SparkLauncher
-import org.apache.spark.util.Utils
/**
* Creates the driver command for running the user app, and propagates needed configuration so
@@ -88,11 +86,17 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
}
private def baseDriverContainer(pod: SparkPod, resource: String): ContainerBuilder = {
+ // re-write primary resource, app jar is also added to spark.jars by default in SparkSubmit
+ val resolvedResource = if (conf.mainAppResource.isInstanceOf[JavaMainAppResource]) {
+ KubernetesUtils.renameMainAppResource(resource, conf.sparkConf)
+ } else {
+ resource
+ }
new ContainerBuilder(pod.container)
.addToArgs("driver")
.addToArgs("--properties-file", SPARK_CONF_PATH)
.addToArgs("--class", conf.mainClass)
- .addToArgs(resource)
+ .addToArgs(resolvedResource)
.addToArgs(conf.appArgs: _*)
}
}
diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml
index 2248482..d129ffb 100644
--- a/resource-managers/kubernetes/integration-tests/pom.xml
+++ b/resource-managers/kubernetes/integration-tests/pom.xml
@@ -76,6 +76,12 @@
<artifactId>spark-tags_${scala.binary.version}</artifactId>
<type>test-jar</type>
</dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk</artifactId>
+ <version>1.7.4</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
new file mode 100644
index 0000000..b0c4182
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.net.URL
+
+import scala.collection.JavaConverters._
+
+import com.amazonaws.auth.BasicAWSCredentials
+import com.amazonaws.services.s3.AmazonS3Client
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.time.{Minutes, Span}
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.integrationtest.DepsTestsSuite.{DEPS_TIMEOUT, FILE_CONTENTS, HOST_PATH}
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{INTERVAL, MinikubeTag, TIMEOUT}
+import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.Minikube
+
+private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
+ import KubernetesSuite.k8sTestTag
+
+ val cName = "ceph-nano"
+ val svcName = s"$cName-s3"
+ val bucket = "spark"
+
+ private def getCephContainer(): Container = {
+ val envVars = Map ( "NETWORK_AUTO_DETECT" -> "4",
+ "RGW_CIVETWEB_PORT" -> "8000",
+ "SREE_PORT" -> "5001",
+ "CEPH_DEMO_UID" -> "nano",
+ "CEPH_DAEMON" -> "demo",
+ "DEBUG" -> "verbose"
+ ).map( envV =>
+ new EnvVarBuilder()
+ .withName(envV._1)
+ .withValue(envV._2)
+ .build()
+ ).toArray
+
+ val resources = Map(
+ "cpu" -> new QuantityBuilder()
+ .withAmount("1")
+ .build(),
+ "memory" -> new QuantityBuilder()
+ .withAmount("512M")
+ .build()
+ ).asJava
+
+ new ContainerBuilder()
+ .withImage("ceph/daemon:v4.0.0-stable-4.0-master-centos-7-x86_64")
+ .withImagePullPolicy("Always")
+ .withName(cName)
+ .withPorts(new ContainerPortBuilder()
+ .withName(svcName)
+ .withProtocol("TCP")
+ .withContainerPort(8000)
+ .build()
+ )
+ .withResources(new ResourceRequirementsBuilder()
+ .withLimits(resources)
+ .withRequests(resources)
+ .build()
+ )
+ .withEnv(envVars: _*)
+ .build()
+ }
+
+ // Based on https://github.com/ceph/cn
+ private def setupCephStorage(): Unit = {
+ val labels = Map("app" -> "ceph", "daemon" -> "nano").asJava
+ val cephService = new ServiceBuilder()
+ .withNewMetadata()
+ .withName(svcName)
+ .withLabels(labels)
+ .endMetadata()
+ .withNewSpec()
+ .withPorts(new ServicePortBuilder()
+ .withName("https")
+ .withPort(8000)
+ .withProtocol("TCP")
+ .withTargetPort(new IntOrString(8000))
+ .build()
+ )
+ .withType("NodePort")
+ .withSelector(labels)
+ .endSpec()
+ .build()
+
+ val cephStatefulSet = new StatefulSetBuilder()
+ .withNewMetadata()
+ .withName(cName)
+ .withLabels(labels)
+ .endMetadata()
+ .withNewSpec()
+ .withReplicas(1)
+ .withNewSelector()
+ .withMatchLabels(Map("app" -> "ceph").asJava)
+ .endSelector()
+ .withServiceName(cName)
+ .withNewTemplate()
+ .withNewMetadata()
+ .withName(cName)
+ .withLabels(labels)
+ .endMetadata()
+ .withNewSpec()
+ .withContainers(getCephContainer())
+ .endSpec()
+ .endTemplate()
+ .endSpec()
+ .build()
+
+ kubernetesTestComponents
+ .kubernetesClient
+ .services()
+ .create(cephService)
+
+ kubernetesTestComponents
+ .kubernetesClient
+ .apps()
+ .statefulSets()
+ .create(cephStatefulSet)
+ }
+
+ private def deleteCephStorage(): Unit = {
+ kubernetesTestComponents
+ .kubernetesClient
+ .apps()
+ .statefulSets()
+ .withName(cName)
+ .delete()
+
+ kubernetesTestComponents
+ .kubernetesClient
+ .services()
+ .withName(svcName)
+ .delete()
+ }
+
+ test("Launcher client dependencies", k8sTestTag, MinikubeTag) {
+ val fileName = Utils.createTempFile(FILE_CONTENTS, HOST_PATH)
+ try {
+ setupCephStorage()
+ val cephUrlStr = getServiceUrl(svcName)
+ val cephUrl = new URL(cephUrlStr)
+ val cephHost = cephUrl.getHost
+ val cephPort = cephUrl.getPort
+ val examplesJar = Utils.getExamplesJarAbsolutePath(sparkHomeDir)
+ val (accessKey, secretKey) = getCephCredentials()
+ sparkAppConf
+ .set("spark.hadoop.fs.s3a.access.key", accessKey)
+ .set("spark.hadoop.fs.s3a.secret.key", secretKey)
+ .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
+ .set("spark.hadoop.fs.s3a.endpoint", s"$cephHost:$cephPort")
+ .set("spark.kubernetes.file.upload.path", s"s3a://$bucket")
+ .set("spark.files", s"$HOST_PATH/$fileName")
+ .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
+ .set("spark.jars.packages", "com.amazonaws:aws-java-sdk:" +
+ "1.7.4,org.apache.hadoop:hadoop-aws:2.7.6")
+ .set("spark.driver.extraJavaOptions", "-Divy.cache.dir=/tmp -Divy.home=/tmp")
+ createS3Bucket(accessKey, secretKey, cephUrlStr)
+ runSparkRemoteCheckAndVerifyCompletion(appResource = examplesJar,
+ appArgs = Array(fileName),
+ timeout = Option(DEPS_TIMEOUT))
+ } finally {
+ // make sure this always runs
+ deleteCephStorage()
+ }
+ }
+
+ // There isn't a cleaner way to get the credentials
+ // when ceph-nano runs on k8s
+ private def getCephCredentials(): (String, String) = {
+ Eventually.eventually(TIMEOUT, INTERVAL) {
+ val cephPod = kubernetesTestComponents
+ .kubernetesClient
+ .pods()
+ .withName(s"$cName-0")
+ .get()
+ implicit val podName: String = cephPod.getMetadata.getName
+ implicit val components: KubernetesTestComponents = kubernetesTestComponents
+ val contents = Utils.executeCommand("cat", "/nano_user_details")
+ (extractS3Key(contents, "access_key"), extractS3Key(contents, "secret_key"))
+ }
+ }
+
+ private def extractS3Key(data: String, key: String): String = {
+ data.split("\n")
+ .filter(_.contains(key))
+ .head
+ .split(":")
+ .last
+ .trim
+ .replaceAll("[,|\"]", "")
+ }
+
+ private def createS3Bucket(accessKey: String, secretKey: String, endPoint: String): Unit = {
+ Eventually.eventually(TIMEOUT, INTERVAL) {
+ try {
+ val credentials = new BasicAWSCredentials(accessKey, secretKey)
+ val s3client = new AmazonS3Client(credentials)
+ s3client.setEndpoint(endPoint)
+ s3client.createBucket(bucket)
+ } catch {
+ case e: Exception =>
+ throw new SparkException(s"Failed to create bucket $bucket.", e)
+ }
+ }
+ }
+
+ private def getServiceUrl(serviceName: String): String = {
+ Eventually.eventually(TIMEOUT, INTERVAL) {
+ // ns is always available either random or provided by the user
+ Minikube.minikubeServiceAction(serviceName, "-n", kubernetesTestComponents.namespace, "--url")
+ }
+ }
+}
+
+private[spark] object DepsTestsSuite {
+ val HOST_PATH = "/tmp"
+ val FILE_CONTENTS = "test deps"
+ // increase the default because jar resolution takes time in the container
+ val DEPS_TIMEOUT = PatienceConfiguration.Timeout(Span(4, Minutes))
+}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index bc0bb20..51e758f 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -30,9 +30,10 @@ import io.fabric8.kubernetes.client.Watcher.Action
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Tag}
import org.scalatest.Matchers
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.concurrent.PatienceConfiguration.{Interval, Timeout}
import org.scalatest.time.{Minutes, Seconds, Span}
-import org.apache.spark.{SPARK_VERSION, SparkFunSuite}
+import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory}
import org.apache.spark.internal.Logging
@@ -41,7 +42,7 @@ import org.apache.spark.internal.config._
class KubernetesSuite extends SparkFunSuite
with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite
with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with PVTestsSuite
- with Logging with Eventually with Matchers {
+ with DepsTestsSuite with Logging with Eventually with Matchers {
import KubernetesSuite._
@@ -120,12 +121,8 @@ class KubernetesSuite extends SparkFunSuite
pyImage = testImageRef(sys.props.getOrElse(CONFIG_KEY_IMAGE_PYTHON, "spark-py"))
rImage = testImageRef(sys.props.getOrElse(CONFIG_KEY_IMAGE_R, "spark-r"))
- val scalaVersion = scala.util.Properties.versionNumberString
- .split("\\.")
- .take(2)
- .mkString(".")
containerLocalSparkDistroExamplesJar =
- s"local:///opt/spark/examples/jars/spark-examples_$scalaVersion-${SPARK_VERSION}.jar"
+ s"local:///opt/spark/examples/jars/${Utils.getExamplesJarName()}"
testBackend = IntegrationTestBackendFactory.getTestBackend
testBackend.initialize()
kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient)
@@ -198,7 +195,7 @@ class KubernetesSuite extends SparkFunSuite
appLocator,
isJVM,
None,
- interval)
+ Option((interval, None)))
}
protected def runSparkRemoteCheckAndVerifyCompletion(
@@ -206,7 +203,8 @@ class KubernetesSuite extends SparkFunSuite
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
appArgs: Array[String],
- appLocator: String = appLocator): Unit = {
+ appLocator: String = appLocator,
+ timeout: Option[PatienceConfiguration.Timeout] = None): Unit = {
runSparkApplicationAndVerifyCompletion(
appResource,
SPARK_REMOTE_MAIN_CLASS,
@@ -215,7 +213,8 @@ class KubernetesSuite extends SparkFunSuite
driverPodChecker,
executorPodChecker,
appLocator,
- true)
+ true,
+ executorPatience = Option((None, timeout)))
}
protected def runSparkJVMCheckAndVerifyCompletion(
@@ -265,7 +264,7 @@ class KubernetesSuite extends SparkFunSuite
appLocator: String,
isJVM: Boolean,
pyFiles: Option[String] = None,
- interval: Option[PatienceConfiguration.Interval] = None): Unit = {
+ executorPatience: Option[(Option[Interval], Option[Timeout])] = None): Unit = {
val appArguments = SparkAppArguments(
mainAppResource = appResource,
mainClass = mainClass,
@@ -306,8 +305,16 @@ class KubernetesSuite extends SparkFunSuite
}
})
- val patienceInterval = interval.getOrElse(INTERVAL)
- Eventually.eventually(TIMEOUT, patienceInterval) { execPods.values.nonEmpty should be (true) }
+ val (patienceInterval, patienceTimeout) = {
+ executorPatience match {
+ case Some(patience) => (patience._1.getOrElse(INTERVAL), patience._2.getOrElse(TIMEOUT))
+ case _ => (INTERVAL, TIMEOUT)
+ }
+ }
+
+ Eventually.eventually(patienceTimeout, patienceInterval) {
+ execPods.values.nonEmpty should be (true)
+ }
execWatcher.close()
execPods.values.foreach(executorPodChecker(_))
Eventually.eventually(TIMEOUT, patienceInterval) {
@@ -408,6 +415,7 @@ class KubernetesSuite extends SparkFunSuite
private[spark] object KubernetesSuite {
val k8sTestTag = Tag("k8s")
+ val MinikubeTag = Tag("minikube")
val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi"
val SPARK_DFS_READ_WRITE_TEST = "org.apache.spark.examples.DFSReadWriteTest"
val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest"
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
index 50a7ef7..4cfda8a 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable.ArrayBuffer
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import org.scalatest.concurrent.Eventually
+import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.JARS
@@ -93,6 +94,8 @@ private[spark] class SparkAppConf {
override def toString: String = map.toString
def toStringArray: Iterable[String] = map.toList.flatMap(t => List("--conf", s"${t._1}=${t._2}"))
+
+ def toSparkConf: SparkConf = new SparkConf().setAll(map)
}
private[spark] case class SparkAppArguments(
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala
index d7a237f..7776bc6 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala
@@ -16,13 +16,10 @@
*/
package org.apache.spark.deploy.k8s.integrationtest
-import java.io.{File, PrintWriter}
-
import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.api.model.storage.StorageClassBuilder
-import org.scalatest.Tag
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
import org.scalatest.time.{Milliseconds, Span}
@@ -125,25 +122,7 @@ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite =>
}
}
- private def createTempFile(): String = {
- val filename = try {
- val f = File.createTempFile("tmp", ".txt", new File(HOST_PATH))
- f.deleteOnExit()
- new PrintWriter(f) {
- try {
- write(FILE_CONTENTS)
- } finally {
- close()
- }
- }
- f.getName
- } catch {
- case e: Exception => e.printStackTrace(); throw e;
- }
- filename
- }
-
- test("Test PVs with local storage", k8sTestTag, MinikubeTag) {
+ test("PVs with local storage", k8sTestTag, MinikubeTag) {
sparkAppConf
.set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.mount.path",
CONTAINER_MOUNT_PATH)
@@ -153,7 +132,7 @@ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite =>
CONTAINER_MOUNT_PATH)
.set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName",
PVC_NAME)
- val file = createTempFile()
+ val file = Utils.createTempFile(FILE_CONTENTS, HOST_PATH)
try {
setupLocalStorage()
runDFSReadWriteAndVerifyCompletion(
@@ -170,14 +149,13 @@ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite =>
interval = Some(PV_TESTS_INTERVAL)
)
} finally {
- // make sure this always run
+ // make sure this always runs
deleteLocalStorage()
}
}
}
private[spark] object PVTestsSuite {
- val MinikubeTag = Tag("minikube")
val STORAGE_NAME = "test-local-storage"
val PV_NAME = "test-local-pv"
val PVC_NAME = "test-local-pvc"
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
index d425f70..a687a1b 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
@@ -16,15 +16,26 @@
*/
package org.apache.spark.deploy.k8s.integrationtest
-import java.io.Closeable
-import java.net.URI
+import java.io.{Closeable, File, PrintWriter}
+import java.nio.file.{Files, Path}
+
+import scala.collection.JavaConverters._
import org.apache.commons.io.output.ByteArrayOutputStream
+import org.apache.spark.{SPARK_VERSION, SparkException}
import org.apache.spark.internal.Logging
object Utils extends Logging {
+ def getExamplesJarName(): String = {
+ val scalaVersion = scala.util.Properties.versionNumberString
+ .split("\\.")
+ .take(2)
+ .mkString(".")
+ s"spark-examples_$scalaVersion-${SPARK_VERSION}.jar"
+ }
+
def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = {
val resource = createResource
try f.apply(resource) finally resource.close()
@@ -49,4 +60,43 @@ object Utils extends Logging {
out.flush()
out.toString()
}
+
+ def createTempFile(contents: String, hostPath: String): String = {
+ val filename = try {
+ val f = File.createTempFile("tmp", ".txt", new File(hostPath))
+ f.deleteOnExit()
+ new PrintWriter(f) {
+ try {
+ write(contents)
+ } finally {
+ close()
+ }
+ }
+ f.getName
+ } catch {
+ case e: Exception => e.printStackTrace(); throw e;
+ }
+ filename
+ }
+
+ def getExamplesJarAbsolutePath(sparkHomeDir: Path): String = {
+ val jarName = getExamplesJarName()
+ val jarPathsFound = Files
+ .walk(sparkHomeDir)
+ .filter(Files.isRegularFile(_))
+ .filter((f: Path) => {f.toFile.getName == jarName})
+ // we should not have more than one here under current test build dir
+ // we only need one though
+ val jarPath = jarPathsFound
+ .iterator()
+ .asScala
+ .map(_.toAbsolutePath.toString)
+ .toArray
+ .headOption
+ jarPath match {
+ case Some(jar) => jar
+ case _ => throw new SparkException(s"No valid $jarName file was found " +
+ s"under spark home test dir ${sparkHomeDir.toAbsolutePath}!")
+ }
+ }
}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
index 78ef44b..ce2ce1c 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
@@ -112,7 +112,12 @@ private[spark] object Minikube extends Logging {
private def executeMinikube(action: String, args: String*): Seq[String] = {
ProcessUtils.executeProcess(
- Array("bash", "-c", s"minikube $action") ++ args, MINIKUBE_STARTUP_TIMEOUT_SECONDS)
+ Array("bash", "-c", s"minikube $action ${args.mkString(" ")}"),
+ MINIKUBE_STARTUP_TIMEOUT_SECONDS)
+ }
+
+ def minikubeServiceAction(args: String*): String = {
+ executeMinikube("service", args: _*).head
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org