You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mc...@apache.org on 2018/03/19 18:30:23 UTC
[2/2] spark git commit: [SPARK-22839][K8S] Remove the use of
init-container for downloading remote dependencies
[SPARK-22839][K8S] Remove the use of init-container for downloading remote dependencies
## What changes were proposed in this pull request?
Removal of the init-container for downloading remote dependencies. Built off of the work done by vanzin in an attempt to refactor driver/executor configuration elaborated in [this](https://issues.apache.org/jira/browse/SPARK-22839) ticket.
## How was this patch tested?
This patch was tested with unit and integration tests.
Author: Ilan Filonenko <if...@cornell.edu>
Closes #20669 from ifilonenko/remove-init-container.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f15906da
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f15906da
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f15906da
Branch: refs/heads/master
Commit: f15906da153f139b698e192ec6f82f078f896f1e
Parents: 4de638c
Author: Ilan Filonenko <if...@cornell.edu>
Authored: Mon Mar 19 11:29:56 2018 -0700
Committer: mcheah <mc...@palantir.com>
Committed: Mon Mar 19 11:29:56 2018 -0700
----------------------------------------------------------------------
bin/docker-image-tool.sh | 9 +-
.../org/apache/spark/deploy/SparkSubmit.scala | 2 -
docs/running-on-kubernetes.md | 71 +-------
.../spark/examples/SparkRemoteFileTest.scala | 48 ++++++
.../org/apache/spark/deploy/k8s/Config.scala | 73 +--------
.../org/apache/spark/deploy/k8s/Constants.scala | 21 +--
.../deploy/k8s/InitContainerBootstrap.scala | 120 --------------
.../spark/deploy/k8s/KubernetesUtils.scala | 63 +-------
.../k8s/PodWithDetachedInitContainer.scala | 31 ----
.../deploy/k8s/SparkPodInitContainer.scala | 116 --------------
.../k8s/submit/DriverConfigOrchestrator.scala | 45 +-----
.../submit/KubernetesClientApplication.scala | 84 ++++++----
.../steps/BasicDriverConfigurationStep.scala | 32 ++--
.../submit/steps/DependencyResolutionStep.scala | 18 +--
.../DriverInitContainerBootstrapStep.scala | 95 -----------
.../steps/DriverKubernetesCredentialsStep.scala | 2 +-
.../BasicInitContainerConfigurationStep.scala | 67 --------
.../InitContainerConfigOrchestrator.scala | 79 ---------
.../InitContainerConfigurationStep.scala | 25 ---
.../InitContainerMountSecretsStep.scala | 36 -----
.../steps/initcontainer/InitContainerSpec.scala | 37 -----
.../cluster/k8s/ExecutorPodFactory.scala | 43 +----
.../cluster/k8s/KubernetesClusterManager.scala | 65 +-------
.../deploy/k8s/SparkPodInitContainerSuite.scala | 86 ----------
.../spark/deploy/k8s/submit/ClientSuite.scala | 82 +++++-----
.../submit/DriverConfigOrchestratorSuite.scala | 41 +----
.../BasicDriverConfigurationStepSuite.scala | 8 +-
.../steps/DependencyResolutionStepSuite.scala | 32 ++--
.../DriverInitContainerBootstrapStepSuite.scala | 160 -------------------
...sicInitContainerConfigurationStepSuite.scala | 95 -----------
.../InitContainerConfigOrchestratorSuite.scala | 80 ----------
.../InitContainerMountSecretsStepSuite.scala | 52 ------
.../cluster/k8s/ExecutorPodFactorySuite.scala | 67 ++------
.../src/main/dockerfiles/spark/Dockerfile | 1 -
.../src/main/dockerfiles/spark/entrypoint.sh | 20 +--
35 files changed, 241 insertions(+), 1665 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/bin/docker-image-tool.sh
----------------------------------------------------------------------
diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh
index 0d0f564..f090240 100755
--- a/bin/docker-image-tool.sh
+++ b/bin/docker-image-tool.sh
@@ -64,9 +64,11 @@ function build {
error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
fi
+ local DOCKERFILE=${DOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
+
docker build "${BUILD_ARGS[@]}" \
-t $(image_ref spark) \
- -f "$IMG_PATH/spark/Dockerfile" .
+ -f "$DOCKERFILE" .
}
function push {
@@ -84,6 +86,7 @@ Commands:
push Push a pre-built image to a registry. Requires a repository address to be provided.
Options:
+ -f file Dockerfile to build. By default builds the Dockerfile shipped with Spark.
-r repo Repository address.
-t tag Tag to apply to the built image, or to identify the image to be pushed.
-m Use minikube's Docker daemon.
@@ -113,10 +116,12 @@ fi
REPO=
TAG=
-while getopts mr:t: option
+DOCKERFILE=
+while getopts f:mr:t: option
do
case "${option}"
in
+ f) DOCKERFILE=${OPTARG};;
r) REPO=${OPTARG};;
t) TAG=${OPTARG};;
m)
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 1e38196..329bde0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -320,8 +320,6 @@ object SparkSubmit extends CommandLineUtils with Logging {
printErrorAndExit("Python applications are currently not supported for Kubernetes.")
case (KUBERNETES, _) if args.isR =>
printErrorAndExit("R applications are currently not supported for Kubernetes.")
- case (KUBERNETES, CLIENT) =>
- printErrorAndExit("Client mode is currently not supported for Kubernetes.")
case (LOCAL, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) =>
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/docs/running-on-kubernetes.md
----------------------------------------------------------------------
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 3c7586e..975b28d 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -126,29 +126,6 @@ Those dependencies can be added to the classpath by referencing them with `local
dependencies in custom-built Docker images in `spark-submit`. Note that using application dependencies from the submission
client's local file system is currently not yet supported.
-
-### Using Remote Dependencies
-When there are application dependencies hosted in remote locations like HDFS or HTTP servers, the driver and executor pods
-need a Kubernetes [init-container](https://kubernetes.io/docs/concepts/workloads/pods/init-containers/) for downloading
-the dependencies so the driver and executor containers can use them locally.
-
-The init-container handles remote dependencies specified in `spark.jars` (or the `--jars` option of `spark-submit`) and
-`spark.files` (or the `--files` option of `spark-submit`). It also handles remotely hosted main application resources, e.g.,
-the main application jar. The following shows an example of using remote dependencies with the `spark-submit` command:
-
-```bash
-$ bin/spark-submit \
- --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
- --deploy-mode cluster \
- --name spark-pi \
- --class org.apache.spark.examples.SparkPi \
- --jars https://path/to/dependency1.jar,https://path/to/dependency2.jar
- --files hdfs://host:port/path/to/file1,hdfs://host:port/path/to/file2
- --conf spark.executor.instances=5 \
- --conf spark.kubernetes.container.image=<spark-image> \
- https://path/to/examples.jar
-```
-
## Secret Management
Kubernetes [Secrets](https://kubernetes.io/docs/concepts/configuration/secret/) can be used to provide credentials for a
Spark application to access secured services. To mount a user-specified secret into the driver container, users can use
@@ -163,10 +140,6 @@ namespace as that of the driver and executor pods. For example, to mount a secre
--conf spark.kubernetes.executor.secrets.spark-secret=/etc/secrets
```
-Note that if an init-container is used, any secret mounted into the driver container will also be mounted into the
-init-container of the driver. Similarly, any secret mounted into an executor container will also be mounted into the
-init-container of the executor.
-
## Introspection and Debugging
These are the different ways in which you can investigate a running/completed Spark application, monitor progress, and
@@ -605,50 +578,11 @@ specific to Spark on Kubernetes.
</td>
</tr>
<tr>
- <td><code>spark.kubernetes.mountDependencies.jarsDownloadDir</code></td>
- <td><code>/var/spark-data/spark-jars</code></td>
- <td>
- Location to download jars to in the driver and executors.
- This directory must be empty and will be mounted as an empty directory volume on the driver and executor pods.
- </td>
-</tr>
-<tr>
- <td><code>spark.kubernetes.mountDependencies.filesDownloadDir</code></td>
- <td><code>/var/spark-data/spark-files</code></td>
- <td>
- Location to download jars to in the driver and executors.
- This directory must be empty and will be mounted as an empty directory volume on the driver and executor pods.
- </td>
-</tr>
-<tr>
- <td><code>spark.kubernetes.mountDependencies.timeout</code></td>
- <td>300s</td>
- <td>
- Timeout in seconds before aborting the attempt to download and unpack dependencies from remote locations into
- the driver and executor pods.
- </td>
-</tr>
-<tr>
- <td><code>spark.kubernetes.mountDependencies.maxSimultaneousDownloads</code></td>
- <td>5</td>
- <td>
- Maximum number of remote dependencies to download simultaneously in a driver or executor pod.
- </td>
-</tr>
-<tr>
- <td><code>spark.kubernetes.initContainer.image</code></td>
- <td><code>(value of spark.kubernetes.container.image)</code></td>
- <td>
- Custom container image for the init container of both driver and executors.
- </td>
-</tr>
-<tr>
<td><code>spark.kubernetes.driver.secrets.[SecretName]</code></td>
<td>(none)</td>
<td>
Add the <a href="https://kubernetes.io/docs/concepts/configuration/secret/">Kubernetes Secret</a> named <code>SecretName</code> to the driver pod on the path specified in the value. For example,
- <code>spark.kubernetes.driver.secrets.spark-secret=/etc/secrets</code>. Note that if an init-container is used,
- the secret will also be added to the init-container in the driver pod.
+ <code>spark.kubernetes.driver.secrets.spark-secret=/etc/secrets</code>.
</td>
</tr>
<tr>
@@ -656,8 +590,7 @@ specific to Spark on Kubernetes.
<td>(none)</td>
<td>
Add the <a href="https://kubernetes.io/docs/concepts/configuration/secret/">Kubernetes Secret</a> named <code>SecretName</code> to the executor pod on the path specified in the value. For example,
- <code>spark.kubernetes.executor.secrets.spark-secret=/etc/secrets</code>. Note that if an init-container is used,
- the secret will also be added to the init-container in the executor pod.
+ <code>spark.kubernetes.executor.secrets.spark-secret=/etc/secrets</code>.
</td>
</tr>
</table>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/examples/src/main/scala/org/apache/spark/examples/SparkRemoteFileTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkRemoteFileTest.scala b/examples/src/main/scala/org/apache/spark/examples/SparkRemoteFileTest.scala
new file mode 100644
index 0000000..64076f2
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkRemoteFileTest.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples
+
+import java.io.File
+
+import org.apache.spark.SparkFiles
+import org.apache.spark.sql.SparkSession
+
+/** Usage: SparkRemoteFileTest [file] */
+object SparkRemoteFileTest {
+ def main(args: Array[String]) {
+ if (args.length < 1) {
+ System.err.println("Usage: SparkRemoteFileTest <file>")
+ System.exit(1)
+ }
+ val spark = SparkSession
+ .builder()
+ .appName("SparkRemoteFileTest")
+ .getOrCreate()
+ val sc = spark.sparkContext
+ val rdd = sc.parallelize(Seq(1)).map(_ => {
+ val localLocation = SparkFiles.get(args(0))
+ println(s"${args(0)} is stored at: $localLocation")
+ new File(localLocation).isFile
+ })
+ val truthCheck = rdd.collect().head
+ println(s"Mounting of ${args(0)} was $truthCheck")
+ spark.stop()
+ }
+}
+// scalastyle:on println
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 471196a..da34a7e 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -79,6 +79,12 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional
+ val KUBERNETES_DRIVER_SUBMIT_CHECK =
+ ConfigBuilder("spark.kubernetes.submitInDriver")
+ .internal()
+ .booleanConf
+ .createOptional
+
val KUBERNETES_EXECUTOR_LIMIT_CORES =
ConfigBuilder("spark.kubernetes.executor.limit.cores")
.doc("Specify the hard cpu limit for each executor pod")
@@ -135,73 +141,6 @@ private[spark] object Config extends Logging {
.checkValue(interval => interval > 0, s"Logging interval must be a positive time value.")
.createWithDefaultString("1s")
- val JARS_DOWNLOAD_LOCATION =
- ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir")
- .doc("Location to download jars to in the driver and executors. When using " +
- "spark-submit, this directory must be empty and will be mounted as an empty directory " +
- "volume on the driver and executor pod.")
- .stringConf
- .createWithDefault("/var/spark-data/spark-jars")
-
- val FILES_DOWNLOAD_LOCATION =
- ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir")
- .doc("Location to download files to in the driver and executors. When using " +
- "spark-submit, this directory must be empty and will be mounted as an empty directory " +
- "volume on the driver and executor pods.")
- .stringConf
- .createWithDefault("/var/spark-data/spark-files")
-
- val INIT_CONTAINER_IMAGE =
- ConfigBuilder("spark.kubernetes.initContainer.image")
- .doc("Image for the driver and executor's init-container for downloading dependencies.")
- .fallbackConf(CONTAINER_IMAGE)
-
- val INIT_CONTAINER_MOUNT_TIMEOUT =
- ConfigBuilder("spark.kubernetes.mountDependencies.timeout")
- .doc("Timeout before aborting the attempt to download and unpack dependencies from remote " +
- "locations into the driver and executor pods.")
- .timeConf(TimeUnit.SECONDS)
- .createWithDefault(300)
-
- val INIT_CONTAINER_MAX_THREAD_POOL_SIZE =
- ConfigBuilder("spark.kubernetes.mountDependencies.maxSimultaneousDownloads")
- .doc("Maximum number of remote dependencies to download simultaneously in a driver or " +
- "executor pod.")
- .intConf
- .createWithDefault(5)
-
- val INIT_CONTAINER_REMOTE_JARS =
- ConfigBuilder("spark.kubernetes.initContainer.remoteJars")
- .doc("Comma-separated list of jar URIs to download in the init-container. This is " +
- "calculated from spark.jars.")
- .internal()
- .stringConf
- .createOptional
-
- val INIT_CONTAINER_REMOTE_FILES =
- ConfigBuilder("spark.kubernetes.initContainer.remoteFiles")
- .doc("Comma-separated list of file URIs to download in the init-container. This is " +
- "calculated from spark.files.")
- .internal()
- .stringConf
- .createOptional
-
- val INIT_CONTAINER_CONFIG_MAP_NAME =
- ConfigBuilder("spark.kubernetes.initContainer.configMapName")
- .doc("Name of the config map to use in the init-container that retrieves submitted files " +
- "for the executor.")
- .internal()
- .stringConf
- .createOptional
-
- val INIT_CONTAINER_CONFIG_MAP_KEY_CONF =
- ConfigBuilder("spark.kubernetes.initContainer.configMapKey")
- .doc("Key for the entry in the init container config map for submitted files that " +
- "corresponds to the properties for this init-container.")
- .internal()
- .stringConf
- .createOptional
-
val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index 9411956..8da5f24 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -63,22 +63,13 @@ private[spark] object Constants {
val ENV_MOUNTED_CLASSPATH = "SPARK_MOUNTED_CLASSPATH"
val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"
val ENV_CLASSPATH = "SPARK_CLASSPATH"
- val ENV_DRIVER_MAIN_CLASS = "SPARK_DRIVER_CLASS"
- val ENV_DRIVER_ARGS = "SPARK_DRIVER_ARGS"
- val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS"
val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
- val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"
- val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"
-
- // Bootstrapping dependencies with the init-container
- val INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME = "download-jars-volume"
- val INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME = "download-files-volume"
- val INIT_CONTAINER_PROPERTIES_FILE_VOLUME = "spark-init-properties"
- val INIT_CONTAINER_PROPERTIES_FILE_DIR = "/etc/spark-init"
- val INIT_CONTAINER_PROPERTIES_FILE_NAME = "spark-init.properties"
- val INIT_CONTAINER_PROPERTIES_FILE_PATH =
- s"$INIT_CONTAINER_PROPERTIES_FILE_DIR/$INIT_CONTAINER_PROPERTIES_FILE_NAME"
- val INIT_CONTAINER_SECRET_VOLUME_NAME = "spark-init-secret"
+ val ENV_SPARK_CONF_DIR = "SPARK_CONF_DIR"
+ // Spark app configs for containers
+ val SPARK_CONF_VOLUME = "spark-conf-volume"
+ val SPARK_CONF_DIR_INTERNAL = "/opt/spark/conf"
+ val SPARK_CONF_FILE_NAME = "spark.properties"
+ val SPARK_CONF_PATH = s"$SPARK_CONF_DIR_INTERNAL/$SPARK_CONF_FILE_NAME"
// Miscellaneous
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/InitContainerBootstrap.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/InitContainerBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/InitContainerBootstrap.scala
deleted file mode 100644
index f6a57df..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/InitContainerBootstrap.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s
-
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, EnvVarBuilder, PodBuilder, VolumeMount, VolumeMountBuilder}
-
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-
-/**
- * Bootstraps an init-container for downloading remote dependencies. This is separated out from
- * the init-container steps API because this component can be used to bootstrap init-containers
- * for both the driver and executors.
- */
-private[spark] class InitContainerBootstrap(
- initContainerImage: String,
- imagePullPolicy: String,
- jarsDownloadPath: String,
- filesDownloadPath: String,
- configMapName: String,
- configMapKey: String,
- sparkRole: String,
- sparkConf: SparkConf) {
-
- /**
- * Bootstraps an init-container that downloads dependencies to be used by a main container.
- */
- def bootstrapInitContainer(
- original: PodWithDetachedInitContainer): PodWithDetachedInitContainer = {
- val sharedVolumeMounts = Seq[VolumeMount](
- new VolumeMountBuilder()
- .withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
- .withMountPath(jarsDownloadPath)
- .build(),
- new VolumeMountBuilder()
- .withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
- .withMountPath(filesDownloadPath)
- .build())
-
- val customEnvVarKeyPrefix = sparkRole match {
- case SPARK_POD_DRIVER_ROLE => KUBERNETES_DRIVER_ENV_KEY
- case SPARK_POD_EXECUTOR_ROLE => "spark.executorEnv."
- case _ => throw new SparkException(s"$sparkRole is not a valid Spark pod role")
- }
- val customEnvVars = sparkConf.getAllWithPrefix(customEnvVarKeyPrefix).toSeq.map {
- case (key, value) =>
- new EnvVarBuilder()
- .withName(key)
- .withValue(value)
- .build()
- }
-
- val initContainer = new ContainerBuilder(original.initContainer)
- .withName("spark-init")
- .withImage(initContainerImage)
- .withImagePullPolicy(imagePullPolicy)
- .addAllToEnv(customEnvVars.asJava)
- .addNewVolumeMount()
- .withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
- .withMountPath(INIT_CONTAINER_PROPERTIES_FILE_DIR)
- .endVolumeMount()
- .addToVolumeMounts(sharedVolumeMounts: _*)
- .addToArgs("init")
- .addToArgs(INIT_CONTAINER_PROPERTIES_FILE_PATH)
- .build()
-
- val podWithBasicVolumes = new PodBuilder(original.pod)
- .editSpec()
- .addNewVolume()
- .withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
- .withNewConfigMap()
- .withName(configMapName)
- .addNewItem()
- .withKey(configMapKey)
- .withPath(INIT_CONTAINER_PROPERTIES_FILE_NAME)
- .endItem()
- .endConfigMap()
- .endVolume()
- .addNewVolume()
- .withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
- .withEmptyDir(new EmptyDirVolumeSource())
- .endVolume()
- .addNewVolume()
- .withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
- .withEmptyDir(new EmptyDirVolumeSource())
- .endVolume()
- .endSpec()
- .build()
-
- val mainContainer = new ContainerBuilder(original.mainContainer)
- .addToVolumeMounts(sharedVolumeMounts: _*)
- .addNewEnv()
- .withName(ENV_MOUNTED_FILES_DIR)
- .withValue(filesDownloadPath)
- .endEnv()
- .build()
-
- PodWithDetachedInitContainer(
- podWithBasicVolumes,
- initContainer,
- mainContainer)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index 37331d8..5bc0701 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -16,10 +16,6 @@
*/
package org.apache.spark.deploy.k8s
-import java.io.File
-
-import io.fabric8.kubernetes.api.model.{Container, Pod, PodBuilder}
-
import org.apache.spark.SparkConf
import org.apache.spark.util.Utils
@@ -44,71 +40,22 @@ private[spark] object KubernetesUtils {
}
/**
- * Append the given init-container to a pod's list of init-containers.
- *
- * @param originalPodSpec original specification of the pod
- * @param initContainer the init-container to add to the pod
- * @return the pod with the init-container added to the list of InitContainers
- */
- def appendInitContainer(originalPodSpec: Pod, initContainer: Container): Pod = {
- new PodBuilder(originalPodSpec)
- .editOrNewSpec()
- .addToInitContainers(initContainer)
- .endSpec()
- .build()
- }
-
- /**
* For the given collection of file URIs, resolves them as follows:
- * - File URIs with scheme file:// are resolved to the given download path.
* - File URIs with scheme local:// resolve to just the path of the URI.
* - Otherwise, the URIs are returned as-is.
*/
- def resolveFileUris(
- fileUris: Iterable[String],
- fileDownloadPath: String): Iterable[String] = {
- fileUris.map { uri =>
- resolveFileUri(uri, fileDownloadPath, false)
- }
- }
-
- /**
- * If any file uri has any scheme other than local:// it is mapped as if the file
- * was downloaded to the file download path. Otherwise, it is mapped to the path
- * part of the URI.
- */
- def resolveFilePaths(fileUris: Iterable[String], fileDownloadPath: String): Iterable[String] = {
+ def resolveFileUrisAndPath(fileUris: Iterable[String]): Iterable[String] = {
fileUris.map { uri =>
- resolveFileUri(uri, fileDownloadPath, true)
- }
- }
-
- /**
- * Get from a given collection of file URIs the ones that represent remote files.
- */
- def getOnlyRemoteFiles(uris: Iterable[String]): Iterable[String] = {
- uris.filter { uri =>
- val scheme = Utils.resolveURI(uri).getScheme
- scheme != "file" && scheme != "local"
+ resolveFileUri(uri)
}
}
- private def resolveFileUri(
- uri: String,
- fileDownloadPath: String,
- assumesDownloaded: Boolean): String = {
+ private def resolveFileUri(uri: String): String = {
val fileUri = Utils.resolveURI(uri)
val fileScheme = Option(fileUri.getScheme).getOrElse("file")
fileScheme match {
- case "local" =>
- fileUri.getPath
- case _ =>
- if (assumesDownloaded || fileScheme == "file") {
- val fileName = new File(fileUri.getPath).getName
- s"$fileDownloadPath/$fileName"
- } else {
- uri
- }
+ case "local" => fileUri.getPath
+ case _ => uri
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/PodWithDetachedInitContainer.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/PodWithDetachedInitContainer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/PodWithDetachedInitContainer.scala
deleted file mode 100644
index 0b79f8b..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/PodWithDetachedInitContainer.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s
-
-import io.fabric8.kubernetes.api.model.{Container, Pod}
-
-/**
- * Represents a pod with a detached init-container (not yet added to the pod).
- *
- * @param pod the pod
- * @param initContainer the init-container in the pod
- * @param mainContainer the main container in the pod
- */
-private[spark] case class PodWithDetachedInitContainer(
- pod: Pod,
- initContainer: Container,
- mainContainer: Container)
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala
deleted file mode 100644
index c0f0878..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-
-import scala.concurrent.{ExecutionContext, Future}
-
-import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.{ThreadUtils, Utils}
-
-/**
- * Process that fetches files from a resource staging server and/or arbitrary remote locations.
- *
- * The init-container can handle fetching files from any of those sources, but not all of the
- * sources need to be specified. This allows for composing multiple instances of this container
- * with different configurations for different download sources, or using the same container to
- * download everything at once.
- */
-private[spark] class SparkPodInitContainer(
- sparkConf: SparkConf,
- fileFetcher: FileFetcher) extends Logging {
-
- private val maxThreadPoolSize = sparkConf.get(INIT_CONTAINER_MAX_THREAD_POOL_SIZE)
- private implicit val downloadExecutor = ExecutionContext.fromExecutorService(
- ThreadUtils.newDaemonCachedThreadPool("download-executor", maxThreadPoolSize))
-
- private val jarsDownloadDir = new File(sparkConf.get(JARS_DOWNLOAD_LOCATION))
- private val filesDownloadDir = new File(sparkConf.get(FILES_DOWNLOAD_LOCATION))
-
- private val remoteJars = sparkConf.get(INIT_CONTAINER_REMOTE_JARS)
- private val remoteFiles = sparkConf.get(INIT_CONTAINER_REMOTE_FILES)
-
- private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT)
-
- def run(): Unit = {
- logInfo(s"Downloading remote jars: $remoteJars")
- downloadFiles(
- remoteJars,
- jarsDownloadDir,
- s"Remote jars download directory specified at $jarsDownloadDir does not exist " +
- "or is not a directory.")
-
- logInfo(s"Downloading remote files: $remoteFiles")
- downloadFiles(
- remoteFiles,
- filesDownloadDir,
- s"Remote files download directory specified at $filesDownloadDir does not exist " +
- "or is not a directory.")
-
- downloadExecutor.shutdown()
- downloadExecutor.awaitTermination(downloadTimeoutMinutes, TimeUnit.MINUTES)
- }
-
- private def downloadFiles(
- filesCommaSeparated: Option[String],
- downloadDir: File,
- errMessage: String): Unit = {
- filesCommaSeparated.foreach { files =>
- require(downloadDir.isDirectory, errMessage)
- Utils.stringToSeq(files).foreach { file =>
- Future[Unit] {
- fileFetcher.fetchFile(file, downloadDir)
- }
- }
- }
- }
-}
-
-private class FileFetcher(sparkConf: SparkConf, securityManager: SparkSecurityManager) {
-
- def fetchFile(uri: String, targetDir: File): Unit = {
- Utils.fetchFile(
- url = uri,
- targetDir = targetDir,
- conf = sparkConf,
- securityMgr = securityManager,
- hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf),
- timestamp = System.currentTimeMillis(),
- useCache = false)
- }
-}
-
-object SparkPodInitContainer extends Logging {
-
- def main(args: Array[String]): Unit = {
- logInfo("Starting init-container to download Spark application dependencies.")
- val sparkConf = new SparkConf(true)
- if (args.nonEmpty) {
- Utils.loadDefaultSparkProperties(sparkConf, args(0))
- }
-
- val securityManager = new SparkSecurityManager(sparkConf)
- val fileFetcher = new FileFetcher(sparkConf, securityManager)
- new SparkPodInitContainer(sparkConf, fileFetcher).run()
- logInfo("Finished downloading application dependencies.")
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala
index ae70904..b4d3f04 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala
@@ -16,16 +16,11 @@
*/
package org.apache.spark.deploy.k8s.submit
-import java.util.UUID
-
-import com.google.common.primitives.Longs
-
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.steps._
-import org.apache.spark.deploy.k8s.submit.steps.initcontainer.InitContainerConfigOrchestrator
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.SystemClock
import org.apache.spark.util.Utils
@@ -34,13 +29,11 @@ import org.apache.spark.util.Utils
* Figures out and returns the complete ordered list of needed DriverConfigurationSteps to
* configure the Spark driver pod. The returned steps will be applied one by one in the given
* order to produce a final KubernetesDriverSpec that is used in KubernetesClientApplication
- * to construct and create the driver pod. It uses the InitContainerConfigOrchestrator to
- * configure the driver init-container if one is needed, i.e., when there are remote dependencies
- * to localize.
+ * to construct and create the driver pod.
*/
private[spark] class DriverConfigOrchestrator(
kubernetesAppId: String,
- launchTime: Long,
+ kubernetesResourceNamePrefix: String,
mainAppResource: Option[MainAppResource],
appName: String,
mainClass: String,
@@ -50,15 +43,8 @@ private[spark] class DriverConfigOrchestrator(
// The resource name prefix is derived from the Spark application name, making it easy to connect
// the names of the Kubernetes resources from e.g. kubectl or the Kubernetes dashboard to the
// application the user submitted.
- private val kubernetesResourceNamePrefix = {
- val uuid = UUID.nameUUIDFromBytes(Longs.toByteArray(launchTime)).toString.replaceAll("-", "")
- s"$appName-$uuid".toLowerCase.replaceAll("\\.", "-")
- }
private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
- private val initContainerConfigMapName = s"$kubernetesResourceNamePrefix-init-config"
- private val jarsDownloadPath = sparkConf.get(JARS_DOWNLOAD_LOCATION)
- private val filesDownloadPath = sparkConf.get(FILES_DOWNLOAD_LOCATION)
def getAllConfigurationSteps: Seq[DriverConfigurationStep] = {
val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
@@ -126,9 +112,7 @@ private[spark] class DriverConfigOrchestrator(
val dependencyResolutionStep = if (sparkJars.nonEmpty || sparkFiles.nonEmpty) {
Seq(new DependencyResolutionStep(
sparkJars,
- sparkFiles,
- jarsDownloadPath,
- filesDownloadPath))
+ sparkFiles))
} else {
Nil
}
@@ -139,33 +123,12 @@ private[spark] class DriverConfigOrchestrator(
Nil
}
- val initContainerBootstrapStep = if (existNonContainerLocalFiles(sparkJars ++ sparkFiles)) {
- val orchestrator = new InitContainerConfigOrchestrator(
- sparkJars,
- sparkFiles,
- jarsDownloadPath,
- filesDownloadPath,
- imagePullPolicy,
- initContainerConfigMapName,
- INIT_CONTAINER_PROPERTIES_FILE_NAME,
- sparkConf)
- val bootstrapStep = new DriverInitContainerBootstrapStep(
- orchestrator.getAllConfigurationSteps,
- initContainerConfigMapName,
- INIT_CONTAINER_PROPERTIES_FILE_NAME)
-
- Seq(bootstrapStep)
- } else {
- Nil
- }
-
Seq(
initialSubmissionStep,
serviceBootstrapStep,
kubernetesCredentialsStep) ++
dependencyResolutionStep ++
- mountSecretsStep ++
- initContainerBootstrapStep
+ mountSecretsStep
}
private def existSubmissionLocalFiles(files: Seq[String]): Boolean = {
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 5884348..e16d1ad 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -16,14 +16,14 @@
*/
package org.apache.spark.deploy.k8s.submit
+import java.io.StringWriter
import java.util.{Collections, UUID}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.util.control.NonFatal
+import java.util.Properties
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.mutable
+import scala.util.control.NonFatal
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkApplication
@@ -32,6 +32,7 @@ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.util.Utils
/**
@@ -93,10 +94,8 @@ private[spark] class Client(
kubernetesClient: KubernetesClient,
waitForAppCompletion: Boolean,
appName: String,
- watcher: LoggingPodStatusWatcher) extends Logging {
-
- private val driverJavaOptions = sparkConf.get(
- org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+ watcher: LoggingPodStatusWatcher,
+ kubernetesResourceNamePrefix: String) extends Logging {
/**
* Run command that initializes a DriverSpec that will be updated after each
@@ -110,33 +109,31 @@ private[spark] class Client(
for (nextStep <- submissionSteps) {
currentDriverSpec = nextStep.configureDriver(currentDriverSpec)
}
-
- val resolvedDriverJavaOpts = currentDriverSpec
- .driverSparkConf
- // Remove this as the options are instead extracted and set individually below using
- // environment variables with prefix SPARK_JAVA_OPT_.
- .remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
- .getAll
- .map {
- case (confKey, confValue) => s"-D$confKey=$confValue"
- } ++ driverJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
- val driverJavaOptsEnvs: Seq[EnvVar] = resolvedDriverJavaOpts.zipWithIndex.map {
- case (option, index) =>
- new EnvVarBuilder()
- .withName(s"$ENV_JAVA_OPT_PREFIX$index")
- .withValue(option)
- .build()
- }
-
+ val configMapName = s"$kubernetesResourceNamePrefix-driver-conf-map"
+ val configMap = buildConfigMap(configMapName, currentDriverSpec.driverSparkConf)
+ // The include of the ENV_VAR for "SPARK_CONF_DIR" is to allow for the
+ // Spark command builder to pickup on the Java Options present in the ConfigMap
val resolvedDriverContainer = new ContainerBuilder(currentDriverSpec.driverContainer)
- .addAllToEnv(driverJavaOptsEnvs.asJava)
+ .addNewEnv()
+ .withName(ENV_SPARK_CONF_DIR)
+ .withValue(SPARK_CONF_DIR_INTERNAL)
+ .endEnv()
+ .addNewVolumeMount()
+ .withName(SPARK_CONF_VOLUME)
+ .withMountPath(SPARK_CONF_DIR_INTERNAL)
+ .endVolumeMount()
.build()
val resolvedDriverPod = new PodBuilder(currentDriverSpec.driverPod)
.editSpec()
.addToContainers(resolvedDriverContainer)
+ .addNewVolume()
+ .withName(SPARK_CONF_VOLUME)
+ .withNewConfigMap()
+ .withName(configMapName)
+ .endConfigMap()
+ .endVolume()
.endSpec()
.build()
-
Utils.tryWithResource(
kubernetesClient
.pods()
@@ -145,7 +142,8 @@ private[spark] class Client(
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
try {
if (currentDriverSpec.otherKubernetesResources.nonEmpty) {
- val otherKubernetesResources = currentDriverSpec.otherKubernetesResources
+ val otherKubernetesResources =
+ currentDriverSpec.otherKubernetesResources ++ Seq(configMap)
addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
}
@@ -180,6 +178,26 @@ private[spark] class Client(
originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference))
}
}
+
+ // Build a Config Map that will house spark conf properties in a single file for spark-submit
+ private def buildConfigMap(configMapName: String, conf: SparkConf): ConfigMap = {
+ val properties = new Properties()
+ conf.getAll.foreach { case (k, v) =>
+ properties.setProperty(k, v)
+ }
+ val propertiesWriter = new StringWriter()
+ properties.store(propertiesWriter,
+ s"Java properties built from Kubernetes config map with name: $configMapName")
+
+ val namespace = conf.get(KUBERNETES_NAMESPACE)
+ new ConfigMapBuilder()
+ .withNewMetadata()
+ .withName(configMapName)
+ .withNamespace(namespace)
+ .endMetadata()
+ .addToData(SPARK_CONF_FILE_NAME, propertiesWriter.toString)
+ .build()
+ }
}
/**
@@ -202,6 +220,9 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
val launchTime = System.currentTimeMillis()
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
+ val kubernetesResourceNamePrefix = {
+ s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
+ }
// The master URL has been checked for validity already in SparkSubmit.
// We just need to get rid of the "k8s://" prefix here.
val master = sparkConf.get("spark.master").substring("k8s://".length)
@@ -211,7 +232,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
val orchestrator = new DriverConfigOrchestrator(
kubernetesAppId,
- launchTime,
+ kubernetesResourceNamePrefix,
clientArguments.mainAppResource,
appName,
clientArguments.mainClass,
@@ -231,7 +252,8 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
kubernetesClient,
waitForAppCompletion,
appName,
- watcher)
+ watcher,
+ kubernetesResourceNamePrefix)
client.run()
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
index 164e2e5..347c4d2 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
@@ -26,6 +26,7 @@ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesUtils
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
import org.apache.spark.internal.config.{DRIVER_CLASS_PATH, DRIVER_MEMORY, DRIVER_MEMORY_OVERHEAD}
+import org.apache.spark.launcher.SparkLauncher
/**
* Performs basic configuration for the driver pod.
@@ -56,8 +57,6 @@ private[spark] class BasicDriverConfigurationStep(
// Memory settings
private val driverMemoryMiB = sparkConf.get(DRIVER_MEMORY)
- private val driverMemoryString = sparkConf.get(
- DRIVER_MEMORY.key, DRIVER_MEMORY.defaultValueString)
private val memoryOverheadMiB = sparkConf
.get(DRIVER_MEMORY_OVERHEAD)
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB))
@@ -103,25 +102,13 @@ private[spark] class BasicDriverConfigurationStep(
("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
}
- val driverContainer = new ContainerBuilder(driverSpec.driverContainer)
+ val driverContainerWithoutArgs = new ContainerBuilder(driverSpec.driverContainer)
.withName(DRIVER_CONTAINER_NAME)
.withImage(driverContainerImage)
.withImagePullPolicy(imagePullPolicy)
.addAllToEnv(driverCustomEnvs.asJava)
.addToEnv(driverExtraClasspathEnv.toSeq: _*)
.addNewEnv()
- .withName(ENV_DRIVER_MEMORY)
- .withValue(driverMemoryString)
- .endEnv()
- .addNewEnv()
- .withName(ENV_DRIVER_MAIN_CLASS)
- .withValue(mainClass)
- .endEnv()
- .addNewEnv()
- .withName(ENV_DRIVER_ARGS)
- .withValue(appArgs.mkString(" "))
- .endEnv()
- .addNewEnv()
.withName(ENV_DRIVER_BIND_ADDRESS)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef("v1", "status.podIP")
@@ -134,7 +121,16 @@ private[spark] class BasicDriverConfigurationStep(
.addToLimits(maybeCpuLimitQuantity.toMap.asJava)
.endResources()
.addToArgs("driver")
- .build()
+ .addToArgs("--properties-file", SPARK_CONF_PATH)
+ .addToArgs("--class", mainClass)
+ // The user application jar is merged into the spark.jars list and managed through that
+ // property, so there is no need to reference it explicitly here.
+ .addToArgs(SparkLauncher.NO_RESOURCE)
+
+ val driverContainer = appArgs.toList match {
+ case "" :: Nil | Nil => driverContainerWithoutArgs.build()
+ case _ => driverContainerWithoutArgs.addToArgs(appArgs: _*).build()
+ }
val baseDriverPod = new PodBuilder(driverSpec.driverPod)
.editOrNewMetadata()
@@ -152,10 +148,14 @@ private[spark] class BasicDriverConfigurationStep(
.setIfMissing(KUBERNETES_DRIVER_POD_NAME, driverPodName)
.set("spark.app.id", kubernetesAppId)
.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, resourceNamePrefix)
+ // to set the config variables to allow client-mode spark-submit from driver
+ .set(KUBERNETES_DRIVER_SUBMIT_CHECK, true)
driverSpec.copy(
driverPod = baseDriverPod,
driverSparkConf = resolvedSparkConf,
driverContainer = driverContainer)
}
+
}
+
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
index d4b8323..43de329 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
@@ -30,13 +30,11 @@ import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
*/
private[spark] class DependencyResolutionStep(
sparkJars: Seq[String],
- sparkFiles: Seq[String],
- jarsDownloadPath: String,
- filesDownloadPath: String) extends DriverConfigurationStep {
+ sparkFiles: Seq[String]) extends DriverConfigurationStep {
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
- val resolvedSparkJars = KubernetesUtils.resolveFileUris(sparkJars, jarsDownloadPath)
- val resolvedSparkFiles = KubernetesUtils.resolveFileUris(sparkFiles, filesDownloadPath)
+ val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath(sparkJars)
+ val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath(sparkFiles)
val sparkConf = driverSpec.driverSparkConf.clone()
if (resolvedSparkJars.nonEmpty) {
@@ -45,14 +43,12 @@ private[spark] class DependencyResolutionStep(
if (resolvedSparkFiles.nonEmpty) {
sparkConf.set("spark.files", resolvedSparkFiles.mkString(","))
}
-
- val resolvedClasspath = KubernetesUtils.resolveFilePaths(sparkJars, jarsDownloadPath)
- val resolvedDriverContainer = if (resolvedClasspath.nonEmpty) {
+ val resolvedDriverContainer = if (resolvedSparkJars.nonEmpty) {
new ContainerBuilder(driverSpec.driverContainer)
.addNewEnv()
- .withName(ENV_MOUNTED_CLASSPATH)
- .withValue(resolvedClasspath.mkString(File.pathSeparator))
- .endEnv()
+ .withName(ENV_MOUNTED_CLASSPATH)
+ .withValue(resolvedSparkJars.mkString(File.pathSeparator))
+ .endEnv()
.build()
} else {
driverSpec.driverContainer
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStep.scala
deleted file mode 100644
index 9fb3daf..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStep.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import java.io.StringWriter
-import java.util.Properties
-
-import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, ContainerBuilder, HasMetadata}
-
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.KubernetesUtils
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-import org.apache.spark.deploy.k8s.submit.steps.initcontainer.{InitContainerConfigurationStep, InitContainerSpec}
-
-/**
- * Configures the driver init-container that localizes remote dependencies into the driver pod.
- * It applies the given InitContainerConfigurationSteps in the given order to produce a final
- * InitContainerSpec that is then used to configure the driver pod with the init-container attached.
- * It also builds a ConfigMap that will be mounted into the init-container. The ConfigMap carries
- * configuration properties for the init-container.
- */
-private[spark] class DriverInitContainerBootstrapStep(
- steps: Seq[InitContainerConfigurationStep],
- configMapName: String,
- configMapKey: String)
- extends DriverConfigurationStep {
-
- override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
- var initContainerSpec = InitContainerSpec(
- properties = Map.empty[String, String],
- driverSparkConf = Map.empty[String, String],
- initContainer = new ContainerBuilder().build(),
- driverContainer = driverSpec.driverContainer,
- driverPod = driverSpec.driverPod,
- dependentResources = Seq.empty[HasMetadata])
- for (nextStep <- steps) {
- initContainerSpec = nextStep.configureInitContainer(initContainerSpec)
- }
-
- val configMap = buildConfigMap(
- configMapName,
- configMapKey,
- initContainerSpec.properties)
- val resolvedDriverSparkConf = driverSpec.driverSparkConf
- .clone()
- .set(INIT_CONTAINER_CONFIG_MAP_NAME, configMapName)
- .set(INIT_CONTAINER_CONFIG_MAP_KEY_CONF, configMapKey)
- .setAll(initContainerSpec.driverSparkConf)
- val resolvedDriverPod = KubernetesUtils.appendInitContainer(
- initContainerSpec.driverPod, initContainerSpec.initContainer)
-
- driverSpec.copy(
- driverPod = resolvedDriverPod,
- driverContainer = initContainerSpec.driverContainer,
- driverSparkConf = resolvedDriverSparkConf,
- otherKubernetesResources =
- driverSpec.otherKubernetesResources ++
- initContainerSpec.dependentResources ++
- Seq(configMap))
- }
-
- private def buildConfigMap(
- configMapName: String,
- configMapKey: String,
- config: Map[String, String]): ConfigMap = {
- val properties = new Properties()
- config.foreach { entry =>
- properties.setProperty(entry._1, entry._2)
- }
- val propertiesWriter = new StringWriter()
- properties.store(propertiesWriter,
- s"Java properties built from Kubernetes config map with name: $configMapName " +
- s"and config map key: $configMapKey")
- new ConfigMapBuilder()
- .withNewMetadata()
- .withName(configMapName)
- .endMetadata()
- .addToData(configMapKey, propertiesWriter.toString)
- .build()
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
index ccc1890..2424e63 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
@@ -99,7 +99,7 @@ private[spark] class DriverKubernetesCredentialsStep(
}.getOrElse(driverSpec.driverPod)
)
- val driverContainerWithMountedSecretVolume = kubernetesCredentialsSecret.map { secret =>
+ val driverContainerWithMountedSecretVolume = kubernetesCredentialsSecret.map { _ =>
new ContainerBuilder(driverSpec.driverContainer)
.addNewVolumeMount()
.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStep.scala
deleted file mode 100644
index 0146985..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStep.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps.initcontainer
-
-import org.apache.spark.deploy.k8s.{InitContainerBootstrap, PodWithDetachedInitContainer}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.KubernetesUtils
-
-/**
- * Performs basic configuration for the driver init-container with most of the work delegated to
- * the given InitContainerBootstrap.
- */
-private[spark] class BasicInitContainerConfigurationStep(
- sparkJars: Seq[String],
- sparkFiles: Seq[String],
- jarsDownloadPath: String,
- filesDownloadPath: String,
- bootstrap: InitContainerBootstrap)
- extends InitContainerConfigurationStep {
-
- override def configureInitContainer(spec: InitContainerSpec): InitContainerSpec = {
- val remoteJarsToDownload = KubernetesUtils.getOnlyRemoteFiles(sparkJars)
- val remoteFilesToDownload = KubernetesUtils.getOnlyRemoteFiles(sparkFiles)
- val remoteJarsConf = if (remoteJarsToDownload.nonEmpty) {
- Map(INIT_CONTAINER_REMOTE_JARS.key -> remoteJarsToDownload.mkString(","))
- } else {
- Map()
- }
- val remoteFilesConf = if (remoteFilesToDownload.nonEmpty) {
- Map(INIT_CONTAINER_REMOTE_FILES.key -> remoteFilesToDownload.mkString(","))
- } else {
- Map()
- }
-
- val baseInitContainerConfig = Map(
- JARS_DOWNLOAD_LOCATION.key -> jarsDownloadPath,
- FILES_DOWNLOAD_LOCATION.key -> filesDownloadPath) ++
- remoteJarsConf ++
- remoteFilesConf
-
- val bootstrapped = bootstrap.bootstrapInitContainer(
- PodWithDetachedInitContainer(
- spec.driverPod,
- spec.initContainer,
- spec.driverContainer))
-
- spec.copy(
- initContainer = bootstrapped.initContainer,
- driverContainer = bootstrapped.mainContainer,
- driverPod = bootstrapped.pod,
- properties = spec.properties ++ baseInitContainerConfig)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestrator.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestrator.scala
deleted file mode 100644
index f2c29c7..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestrator.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps.initcontainer
-
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.k8s.{InitContainerBootstrap, KubernetesUtils, MountSecretsBootstrap}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-
-/**
- * Figures out and returns the complete ordered list of InitContainerConfigurationSteps required to
- * configure the driver init-container. The returned steps will be applied in the given order to
- * produce a final InitContainerSpec that is used to construct the driver init-container in
- * DriverInitContainerBootstrapStep. This class is only used when an init-container is needed, i.e.,
- * when there are remote application dependencies to localize.
- */
-private[spark] class InitContainerConfigOrchestrator(
- sparkJars: Seq[String],
- sparkFiles: Seq[String],
- jarsDownloadPath: String,
- filesDownloadPath: String,
- imagePullPolicy: String,
- configMapName: String,
- configMapKey: String,
- sparkConf: SparkConf) {
-
- private val initContainerImage = sparkConf
- .get(INIT_CONTAINER_IMAGE)
- .getOrElse(throw new SparkException(
- "Must specify the init-container image when there are remote dependencies"))
-
- def getAllConfigurationSteps: Seq[InitContainerConfigurationStep] = {
- val initContainerBootstrap = new InitContainerBootstrap(
- initContainerImage,
- imagePullPolicy,
- jarsDownloadPath,
- filesDownloadPath,
- configMapName,
- configMapKey,
- SPARK_POD_DRIVER_ROLE,
- sparkConf)
- val baseStep = new BasicInitContainerConfigurationStep(
- sparkJars,
- sparkFiles,
- jarsDownloadPath,
- filesDownloadPath,
- initContainerBootstrap)
-
- val secretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
- sparkConf,
- KUBERNETES_DRIVER_SECRETS_PREFIX)
- // Mount user-specified driver secrets also into the driver's init-container. The
- // init-container may need credentials in the secrets to be able to download remote
- // dependencies. The driver's main container and its init-container share the secrets
- // because the init-container is sort of an implementation details and this sharing
- // avoids introducing a dedicated configuration property just for the init-container.
- val mountSecretsStep = if (secretNamesToMountPaths.nonEmpty) {
- Seq(new InitContainerMountSecretsStep(new MountSecretsBootstrap(secretNamesToMountPaths)))
- } else {
- Nil
- }
-
- Seq(baseStep) ++ mountSecretsStep
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigurationStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigurationStep.scala
deleted file mode 100644
index 0372ad5..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigurationStep.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps.initcontainer
-
-/**
- * Represents a step in configuring the driver init-container.
- */
-private[spark] trait InitContainerConfigurationStep {
-
- def configureInitContainer(spec: InitContainerSpec): InitContainerSpec
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStep.scala
deleted file mode 100644
index 0daa7b9..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStep.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps.initcontainer
-
-import org.apache.spark.deploy.k8s.MountSecretsBootstrap
-
-/**
- * An init-container configuration step for mounting user-specified secrets onto user-specified
- * paths.
- *
- * @param bootstrap a utility actually handling mounting of the secrets
- */
-private[spark] class InitContainerMountSecretsStep(
- bootstrap: MountSecretsBootstrap) extends InitContainerConfigurationStep {
-
- override def configureInitContainer(spec: InitContainerSpec) : InitContainerSpec = {
- // Mount the secret volumes given that the volumes have already been added to the driver pod
- // when mounting the secrets into the main driver container.
- val initContainer = bootstrap.mountSecrets(spec.initContainer)
- spec.copy(initContainer = initContainer)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerSpec.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerSpec.scala
deleted file mode 100644
index b52c343..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerSpec.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps.initcontainer
-
-import io.fabric8.kubernetes.api.model.{Container, HasMetadata, Pod}
-
-/**
- * Represents a specification of the init-container for the driver pod.
- *
- * @param properties properties that should be set on the init-container
- * @param driverSparkConf Spark configuration properties that will be carried back to the driver
- * @param initContainer the init-container object
- * @param driverContainer the driver container object
- * @param driverPod the driver pod object
- * @param dependentResources resources the init-container depends on to work
- */
-private[spark] case class InitContainerSpec(
- properties: Map[String, String],
- driverSparkConf: Map[String, String],
- initContainer: Container,
- driverContainer: Container,
- driverPod: Pod,
- dependentResources: Seq[HasMetadata])
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
index 141bd28..98cbd56 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model._
import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.k8s.{InitContainerBootstrap, KubernetesUtils, MountSecretsBootstrap, PodWithDetachedInitContainer}
+import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
@@ -34,18 +34,10 @@ import org.apache.spark.util.Utils
* @param sparkConf Spark configuration
* @param mountSecretsBootstrap an optional component for mounting user-specified secrets onto
* user-specified paths into the executor container
- * @param initContainerBootstrap an optional component for bootstrapping the executor init-container
- * if one is needed, i.e., when there are remote dependencies to
- * localize
- * @param initContainerMountSecretsBootstrap an optional component for mounting user-specified
- * secrets onto user-specified paths into the executor
- * init-container
*/
private[spark] class ExecutorPodFactory(
sparkConf: SparkConf,
- mountSecretsBootstrap: Option[MountSecretsBootstrap],
- initContainerBootstrap: Option[InitContainerBootstrap],
- initContainerMountSecretsBootstrap: Option[MountSecretsBootstrap]) {
+ mountSecretsBootstrap: Option[MountSecretsBootstrap]) {
private val executorExtraClasspath = sparkConf.get(EXECUTOR_CLASS_PATH)
@@ -94,8 +86,6 @@ private[spark] class ExecutorPodFactory(
private val executorCores = sparkConf.getDouble("spark.executor.cores", 1)
private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
- private val executorJarsDownloadDir = sparkConf.get(JARS_DOWNLOAD_LOCATION)
-
/**
* Configure and construct an executor pod with the given parameters.
*/
@@ -147,8 +137,9 @@ private[spark] class ExecutorPodFactory(
(ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString),
(ENV_EXECUTOR_MEMORY, executorMemoryString),
(ENV_APPLICATION_ID, applicationId),
- (ENV_EXECUTOR_ID, executorId),
- (ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*")) ++ executorEnvs)
+ // This is to set the SPARK_CONF_DIR to be /opt/spark/conf
+ (ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL),
+ (ENV_EXECUTOR_ID, executorId)) ++ executorEnvs)
.map(env => new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
@@ -221,30 +212,10 @@ private[spark] class ExecutorPodFactory(
(bootstrap.addSecretVolumes(executorPod), bootstrap.mountSecrets(containerWithLimitCores))
}.getOrElse((executorPod, containerWithLimitCores))
- val (bootstrappedPod, bootstrappedContainer) =
- initContainerBootstrap.map { bootstrap =>
- val podWithInitContainer = bootstrap.bootstrapInitContainer(
- PodWithDetachedInitContainer(
- maybeSecretsMountedPod,
- new ContainerBuilder().build(),
- maybeSecretsMountedContainer))
-
- val (pod, mayBeSecretsMountedInitContainer) =
- initContainerMountSecretsBootstrap.map { bootstrap =>
- // Mount the secret volumes given that the volumes have already been added to the
- // executor pod when mounting the secrets into the main executor container.
- (podWithInitContainer.pod, bootstrap.mountSecrets(podWithInitContainer.initContainer))
- }.getOrElse((podWithInitContainer.pod, podWithInitContainer.initContainer))
-
- val bootstrappedPod = KubernetesUtils.appendInitContainer(
- pod, mayBeSecretsMountedInitContainer)
-
- (bootstrappedPod, podWithInitContainer.mainContainer)
- }.getOrElse((maybeSecretsMountedPod, maybeSecretsMountedContainer))
- new PodBuilder(bootstrappedPod)
+ new PodBuilder(maybeSecretsMountedPod)
.editSpec()
- .addToContainers(bootstrappedContainer)
+ .addToContainers(maybeSecretsMountedContainer)
.endSpec()
.build()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index a942db6..ff5f680 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -21,7 +21,7 @@ import java.io.File
import io.fabric8.kubernetes.client.Config
import org.apache.spark.{SparkContext, SparkException}
-import org.apache.spark.deploy.k8s.{InitContainerBootstrap, KubernetesUtils, MountSecretsBootstrap, SparkKubernetesClientFactory}
+import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap, SparkKubernetesClientFactory}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
@@ -33,7 +33,9 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s")
override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
- if (masterURL.startsWith("k8s") && sc.deployMode == "client") {
+ if (masterURL.startsWith("k8s") &&
+ sc.deployMode == "client" &&
+ !sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK).getOrElse(false)) {
throw new SparkException("Client mode is currently not supported for Kubernetes.")
}
@@ -44,74 +46,23 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = {
- val sparkConf = sc.getConf
- val initContainerConfigMap = sparkConf.get(INIT_CONTAINER_CONFIG_MAP_NAME)
- val initContainerConfigMapKey = sparkConf.get(INIT_CONTAINER_CONFIG_MAP_KEY_CONF)
-
- if (initContainerConfigMap.isEmpty) {
- logWarning("The executor's init-container config map is not specified. Executors will " +
- "therefore not attempt to fetch remote or submitted dependencies.")
- }
-
- if (initContainerConfigMapKey.isEmpty) {
- logWarning("The executor's init-container config map key is not specified. Executors will " +
- "therefore not attempt to fetch remote or submitted dependencies.")
- }
-
- // Only set up the bootstrap if they've provided both the config map key and the config map
- // name. The config map might not be provided if init-containers aren't being used to
- // bootstrap dependencies.
- val initContainerBootstrap = for {
- configMap <- initContainerConfigMap
- configMapKey <- initContainerConfigMapKey
- } yield {
- val initContainerImage = sparkConf
- .get(INIT_CONTAINER_IMAGE)
- .getOrElse(throw new SparkException(
- "Must specify the init-container image when there are remote dependencies"))
- new InitContainerBootstrap(
- initContainerImage,
- sparkConf.get(CONTAINER_IMAGE_PULL_POLICY),
- sparkConf.get(JARS_DOWNLOAD_LOCATION),
- sparkConf.get(FILES_DOWNLOAD_LOCATION),
- configMap,
- configMapKey,
- SPARK_POD_EXECUTOR_ROLE,
- sparkConf)
- }
-
val executorSecretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
- sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
+ sc.conf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
val mountSecretBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) {
Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths))
} else {
None
}
- // Mount user-specified executor secrets also into the executor's init-container. The
- // init-container may need credentials in the secrets to be able to download remote
- // dependencies. The executor's main container and its init-container share the secrets
- // because the init-container is sort of an implementation details and this sharing
- // avoids introducing a dedicated configuration property just for the init-container.
- val initContainerMountSecretsBootstrap = if (initContainerBootstrap.nonEmpty &&
- executorSecretNamesToMountPaths.nonEmpty) {
- Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths))
- } else {
- None
- }
val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
KUBERNETES_MASTER_INTERNAL_URL,
- Some(sparkConf.get(KUBERNETES_NAMESPACE)),
+ Some(sc.conf.get(KUBERNETES_NAMESPACE)),
KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
- sparkConf,
+ sc.conf,
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
- val executorPodFactory = new ExecutorPodFactory(
- sparkConf,
- mountSecretBootstrap,
- initContainerBootstrap,
- initContainerMountSecretsBootstrap)
+ val executorPodFactory = new ExecutorPodFactory(sc.conf, mountSecretBootstrap)
val allocatorExecutor = ThreadUtils
.newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerSuite.scala
deleted file mode 100644
index e0f29ec..0000000
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerSuite.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s
-
-import java.io.File
-import java.util.UUID
-
-import com.google.common.base.Charsets
-import com.google.common.io.Files
-import org.mockito.Mockito
-import org.scalatest.BeforeAndAfter
-import org.scalatest.mockito.MockitoSugar._
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.util.Utils
-
-class SparkPodInitContainerSuite extends SparkFunSuite with BeforeAndAfter {
-
- private val DOWNLOAD_JARS_SECRET_LOCATION = createTempFile("txt")
- private val DOWNLOAD_FILES_SECRET_LOCATION = createTempFile("txt")
-
- private var downloadJarsDir: File = _
- private var downloadFilesDir: File = _
- private var downloadJarsSecretValue: String = _
- private var downloadFilesSecretValue: String = _
- private var fileFetcher: FileFetcher = _
-
- override def beforeAll(): Unit = {
- downloadJarsSecretValue = Files.toString(
- new File(DOWNLOAD_JARS_SECRET_LOCATION), Charsets.UTF_8)
- downloadFilesSecretValue = Files.toString(
- new File(DOWNLOAD_FILES_SECRET_LOCATION), Charsets.UTF_8)
- }
-
- before {
- downloadJarsDir = Utils.createTempDir()
- downloadFilesDir = Utils.createTempDir()
- fileFetcher = mock[FileFetcher]
- }
-
- after {
- downloadJarsDir.delete()
- downloadFilesDir.delete()
- }
-
- test("Downloads from remote server should invoke the file fetcher") {
- val sparkConf = getSparkConfForRemoteFileDownloads
- val initContainerUnderTest = new SparkPodInitContainer(sparkConf, fileFetcher)
- initContainerUnderTest.run()
- Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/jar1.jar", downloadJarsDir)
- Mockito.verify(fileFetcher).fetchFile("hdfs://localhost:9000/jar2.jar", downloadJarsDir)
- Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/file.txt", downloadFilesDir)
- }
-
- private def getSparkConfForRemoteFileDownloads: SparkConf = {
- new SparkConf(true)
- .set(INIT_CONTAINER_REMOTE_JARS,
- "http://localhost:9000/jar1.jar,hdfs://localhost:9000/jar2.jar")
- .set(INIT_CONTAINER_REMOTE_FILES,
- "http://localhost:9000/file.txt")
- .set(JARS_DOWNLOAD_LOCATION, downloadJarsDir.getAbsolutePath)
- .set(FILES_DOWNLOAD_LOCATION, downloadFilesDir.getAbsolutePath)
- }
-
- private def createTempFile(extension: String): String = {
- val dir = Utils.createTempDir()
- val file = new File(dir, s"${UUID.randomUUID().toString}.$extension")
- Files.write(UUID.randomUUID().toString, file, Charsets.UTF_8)
- file.getAbsolutePath
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org