You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mc...@apache.org on 2018/03/19 18:30:23 UTC

[2/2] spark git commit: [SPARK-22839][K8S] Remove the use of init-container for downloading remote dependencies

[SPARK-22839][K8S] Remove the use of init-container for downloading remote dependencies

## What changes were proposed in this pull request?

Removal of the init-container for downloading remote dependencies. Built off of the work done by vanzin in an attempt to refactor driver/executor configuration elaborated in [this](https://issues.apache.org/jira/browse/SPARK-22839) ticket.

## How was this patch tested?

This patch was tested with unit and integration tests.

Author: Ilan Filonenko <if...@cornell.edu>

Closes #20669 from ifilonenko/remove-init-container.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f15906da
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f15906da
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f15906da

Branch: refs/heads/master
Commit: f15906da153f139b698e192ec6f82f078f896f1e
Parents: 4de638c
Author: Ilan Filonenko <if...@cornell.edu>
Authored: Mon Mar 19 11:29:56 2018 -0700
Committer: mcheah <mc...@palantir.com>
Committed: Mon Mar 19 11:29:56 2018 -0700

----------------------------------------------------------------------
 bin/docker-image-tool.sh                        |   9 +-
 .../org/apache/spark/deploy/SparkSubmit.scala   |   2 -
 docs/running-on-kubernetes.md                   |  71 +-------
 .../spark/examples/SparkRemoteFileTest.scala    |  48 ++++++
 .../org/apache/spark/deploy/k8s/Config.scala    |  73 +--------
 .../org/apache/spark/deploy/k8s/Constants.scala |  21 +--
 .../deploy/k8s/InitContainerBootstrap.scala     | 120 --------------
 .../spark/deploy/k8s/KubernetesUtils.scala      |  63 +-------
 .../k8s/PodWithDetachedInitContainer.scala      |  31 ----
 .../deploy/k8s/SparkPodInitContainer.scala      | 116 --------------
 .../k8s/submit/DriverConfigOrchestrator.scala   |  45 +-----
 .../submit/KubernetesClientApplication.scala    |  84 ++++++----
 .../steps/BasicDriverConfigurationStep.scala    |  32 ++--
 .../submit/steps/DependencyResolutionStep.scala |  18 +--
 .../DriverInitContainerBootstrapStep.scala      |  95 -----------
 .../steps/DriverKubernetesCredentialsStep.scala |   2 +-
 .../BasicInitContainerConfigurationStep.scala   |  67 --------
 .../InitContainerConfigOrchestrator.scala       |  79 ---------
 .../InitContainerConfigurationStep.scala        |  25 ---
 .../InitContainerMountSecretsStep.scala         |  36 -----
 .../steps/initcontainer/InitContainerSpec.scala |  37 -----
 .../cluster/k8s/ExecutorPodFactory.scala        |  43 +----
 .../cluster/k8s/KubernetesClusterManager.scala  |  65 +-------
 .../deploy/k8s/SparkPodInitContainerSuite.scala |  86 ----------
 .../spark/deploy/k8s/submit/ClientSuite.scala   |  82 +++++-----
 .../submit/DriverConfigOrchestratorSuite.scala  |  41 +----
 .../BasicDriverConfigurationStepSuite.scala     |   8 +-
 .../steps/DependencyResolutionStepSuite.scala   |  32 ++--
 .../DriverInitContainerBootstrapStepSuite.scala | 160 -------------------
 ...sicInitContainerConfigurationStepSuite.scala |  95 -----------
 .../InitContainerConfigOrchestratorSuite.scala  |  80 ----------
 .../InitContainerMountSecretsStepSuite.scala    |  52 ------
 .../cluster/k8s/ExecutorPodFactorySuite.scala   |  67 ++------
 .../src/main/dockerfiles/spark/Dockerfile       |   1 -
 .../src/main/dockerfiles/spark/entrypoint.sh    |  20 +--
 35 files changed, 241 insertions(+), 1665 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/bin/docker-image-tool.sh
----------------------------------------------------------------------
diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh
index 0d0f564..f090240 100755
--- a/bin/docker-image-tool.sh
+++ b/bin/docker-image-tool.sh
@@ -64,9 +64,11 @@ function build {
     error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
   fi
 
+  local DOCKERFILE=${DOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
+
   docker build "${BUILD_ARGS[@]}" \
     -t $(image_ref spark) \
-    -f "$IMG_PATH/spark/Dockerfile" .
+    -f "$DOCKERFILE" .
 }
 
 function push {
@@ -84,6 +86,7 @@ Commands:
   push        Push a pre-built image to a registry. Requires a repository address to be provided.
 
 Options:
+  -f file     Dockerfile to build. By default builds the Dockerfile shipped with Spark.
   -r repo     Repository address.
   -t tag      Tag to apply to the built image, or to identify the image to be pushed.
   -m          Use minikube's Docker daemon.
@@ -113,10 +116,12 @@ fi
 
 REPO=
 TAG=
-while getopts mr:t: option
+DOCKERFILE=
+while getopts f:mr:t: option
 do
  case "${option}"
  in
+ f) DOCKERFILE=${OPTARG};;
  r) REPO=${OPTARG};;
  t) TAG=${OPTARG};;
  m)

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 1e38196..329bde0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -320,8 +320,6 @@ object SparkSubmit extends CommandLineUtils with Logging {
         printErrorAndExit("Python applications are currently not supported for Kubernetes.")
       case (KUBERNETES, _) if args.isR =>
         printErrorAndExit("R applications are currently not supported for Kubernetes.")
-      case (KUBERNETES, CLIENT) =>
-        printErrorAndExit("Client mode is currently not supported for Kubernetes.")
       case (LOCAL, CLUSTER) =>
         printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
       case (_, CLUSTER) if isShell(args.primaryResource) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/docs/running-on-kubernetes.md
----------------------------------------------------------------------
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 3c7586e..975b28d 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -126,29 +126,6 @@ Those dependencies can be added to the classpath by referencing them with `local
 dependencies in custom-built Docker images in `spark-submit`. Note that using application dependencies from the submission
 client's local file system is currently not yet supported.
 
-
-### Using Remote Dependencies
-When there are application dependencies hosted in remote locations like HDFS or HTTP servers, the driver and executor pods
-need a Kubernetes [init-container](https://kubernetes.io/docs/concepts/workloads/pods/init-containers/) for downloading
-the dependencies so the driver and executor containers can use them locally.
-
-The init-container handles remote dependencies specified in `spark.jars` (or the `--jars` option of `spark-submit`) and
-`spark.files` (or the `--files` option of `spark-submit`). It also handles remotely hosted main application resources, e.g.,
-the main application jar. The following shows an example of using remote dependencies with the `spark-submit` command:
-
-```bash
-$ bin/spark-submit \
-    --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
-    --deploy-mode cluster \
-    --name spark-pi \
-    --class org.apache.spark.examples.SparkPi \
-    --jars https://path/to/dependency1.jar,https://path/to/dependency2.jar
-    --files hdfs://host:port/path/to/file1,hdfs://host:port/path/to/file2
-    --conf spark.executor.instances=5 \
-    --conf spark.kubernetes.container.image=<spark-image> \
-    https://path/to/examples.jar
-```
-
 ## Secret Management
 Kubernetes [Secrets](https://kubernetes.io/docs/concepts/configuration/secret/) can be used to provide credentials for a
 Spark application to access secured services. To mount a user-specified secret into the driver container, users can use
@@ -163,10 +140,6 @@ namespace as that of the driver and executor pods. For example, to mount a secre
 --conf spark.kubernetes.executor.secrets.spark-secret=/etc/secrets
 ```
 
-Note that if an init-container is used, any secret mounted into the driver container will also be mounted into the
-init-container of the driver. Similarly, any secret mounted into an executor container will also be mounted into the
-init-container of the executor.
-
 ## Introspection and Debugging
 
 These are the different ways in which you can investigate a running/completed Spark application, monitor progress, and
@@ -605,50 +578,11 @@ specific to Spark on Kubernetes.
   </td>
 </tr>
 <tr>
-  <td><code>spark.kubernetes.mountDependencies.jarsDownloadDir</code></td>
-  <td><code>/var/spark-data/spark-jars</code></td>
-  <td>
-    Location to download jars to in the driver and executors.
-    This directory must be empty and will be mounted as an empty directory volume on the driver and executor pods.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.kubernetes.mountDependencies.filesDownloadDir</code></td>
-  <td><code>/var/spark-data/spark-files</code></td>
-  <td>
-    Location to download jars to in the driver and executors.
-    This directory must be empty and will be mounted as an empty directory volume on the driver and executor pods.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.kubernetes.mountDependencies.timeout</code></td>
-  <td>300s</td>
-  <td>
-   Timeout in seconds before aborting the attempt to download and unpack dependencies from remote locations into
-   the driver and executor pods.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.kubernetes.mountDependencies.maxSimultaneousDownloads</code></td>
-  <td>5</td>
-  <td>
-   Maximum number of remote dependencies to download simultaneously in a driver or executor pod.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.kubernetes.initContainer.image</code></td>
-  <td><code>(value of spark.kubernetes.container.image)</code></td>
-  <td>
-   Custom container image for the init container of both driver and executors.
-  </td>
-</tr>
-<tr>
   <td><code>spark.kubernetes.driver.secrets.[SecretName]</code></td>
   <td>(none)</td>
   <td>
    Add the <a href="https://kubernetes.io/docs/concepts/configuration/secret/">Kubernetes Secret</a> named <code>SecretName</code> to the driver pod on the path specified in the value. For example,
-   <code>spark.kubernetes.driver.secrets.spark-secret=/etc/secrets</code>. Note that if an init-container is used,
-   the secret will also be added to the init-container in the driver pod.
+   <code>spark.kubernetes.driver.secrets.spark-secret=/etc/secrets</code>.
   </td>
 </tr>
 <tr>
@@ -656,8 +590,7 @@ specific to Spark on Kubernetes.
   <td>(none)</td>
   <td>
    Add the <a href="https://kubernetes.io/docs/concepts/configuration/secret/">Kubernetes Secret</a> named <code>SecretName</code> to the executor pod on the path specified in the value. For example,
-   <code>spark.kubernetes.executor.secrets.spark-secret=/etc/secrets</code>. Note that if an init-container is used,
-   the secret will also be added to the init-container in the executor pod.
+   <code>spark.kubernetes.executor.secrets.spark-secret=/etc/secrets</code>.
   </td>
 </tr>
 </table>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/examples/src/main/scala/org/apache/spark/examples/SparkRemoteFileTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkRemoteFileTest.scala b/examples/src/main/scala/org/apache/spark/examples/SparkRemoteFileTest.scala
new file mode 100644
index 0000000..64076f2
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkRemoteFileTest.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples
+
+import java.io.File
+
+import org.apache.spark.SparkFiles
+import org.apache.spark.sql.SparkSession
+
+/** Usage: SparkRemoteFileTest [file] */
+object SparkRemoteFileTest {
+  def main(args: Array[String]) {
+    if (args.length < 1) {
+      System.err.println("Usage: SparkRemoteFileTest <file>")
+      System.exit(1)
+    }
+    val spark = SparkSession
+      .builder()
+      .appName("SparkRemoteFileTest")
+      .getOrCreate()
+    val sc = spark.sparkContext
+    val rdd = sc.parallelize(Seq(1)).map(_ => {
+      val localLocation = SparkFiles.get(args(0))
+      println(s"${args(0)} is stored at: $localLocation")
+      new File(localLocation).isFile
+    })
+    val truthCheck = rdd.collect().head
+    println(s"Mounting of ${args(0)} was $truthCheck")
+    spark.stop()
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 471196a..da34a7e 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -79,6 +79,12 @@ private[spark] object Config extends Logging {
       .stringConf
       .createOptional
 
+  val KUBERNETES_DRIVER_SUBMIT_CHECK =
+    ConfigBuilder("spark.kubernetes.submitInDriver")
+    .internal()
+    .booleanConf
+    .createOptional
+
   val KUBERNETES_EXECUTOR_LIMIT_CORES =
     ConfigBuilder("spark.kubernetes.executor.limit.cores")
       .doc("Specify the hard cpu limit for each executor pod")
@@ -135,73 +141,6 @@ private[spark] object Config extends Logging {
       .checkValue(interval => interval > 0, s"Logging interval must be a positive time value.")
       .createWithDefaultString("1s")
 
-  val JARS_DOWNLOAD_LOCATION =
-    ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir")
-      .doc("Location to download jars to in the driver and executors. When using " +
-        "spark-submit, this directory must be empty and will be mounted as an empty directory " +
-        "volume on the driver and executor pod.")
-      .stringConf
-      .createWithDefault("/var/spark-data/spark-jars")
-
-  val FILES_DOWNLOAD_LOCATION =
-    ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir")
-      .doc("Location to download files to in the driver and executors. When using " +
-        "spark-submit, this directory must be empty and will be mounted as an empty directory " +
-        "volume on the driver and executor pods.")
-      .stringConf
-      .createWithDefault("/var/spark-data/spark-files")
-
-  val INIT_CONTAINER_IMAGE =
-    ConfigBuilder("spark.kubernetes.initContainer.image")
-      .doc("Image for the driver and executor's init-container for downloading dependencies.")
-      .fallbackConf(CONTAINER_IMAGE)
-
-  val INIT_CONTAINER_MOUNT_TIMEOUT =
-    ConfigBuilder("spark.kubernetes.mountDependencies.timeout")
-      .doc("Timeout before aborting the attempt to download and unpack dependencies from remote " +
-        "locations into the driver and executor pods.")
-      .timeConf(TimeUnit.SECONDS)
-      .createWithDefault(300)
-
-  val INIT_CONTAINER_MAX_THREAD_POOL_SIZE =
-    ConfigBuilder("spark.kubernetes.mountDependencies.maxSimultaneousDownloads")
-      .doc("Maximum number of remote dependencies to download simultaneously in a driver or " +
-        "executor pod.")
-      .intConf
-      .createWithDefault(5)
-
-  val INIT_CONTAINER_REMOTE_JARS =
-    ConfigBuilder("spark.kubernetes.initContainer.remoteJars")
-      .doc("Comma-separated list of jar URIs to download in the init-container. This is " +
-        "calculated from spark.jars.")
-      .internal()
-      .stringConf
-      .createOptional
-
-  val INIT_CONTAINER_REMOTE_FILES =
-    ConfigBuilder("spark.kubernetes.initContainer.remoteFiles")
-      .doc("Comma-separated list of file URIs to download in the init-container. This is " +
-        "calculated from spark.files.")
-      .internal()
-      .stringConf
-      .createOptional
-
-  val INIT_CONTAINER_CONFIG_MAP_NAME =
-    ConfigBuilder("spark.kubernetes.initContainer.configMapName")
-      .doc("Name of the config map to use in the init-container that retrieves submitted files " +
-        "for the executor.")
-      .internal()
-      .stringConf
-      .createOptional
-
-  val INIT_CONTAINER_CONFIG_MAP_KEY_CONF =
-    ConfigBuilder("spark.kubernetes.initContainer.configMapKey")
-      .doc("Key for the entry in the init container config map for submitted files that " +
-        "corresponds to the properties for this init-container.")
-      .internal()
-      .stringConf
-      .createOptional
-
   val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
     "spark.kubernetes.authenticate.submission"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index 9411956..8da5f24 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -63,22 +63,13 @@ private[spark] object Constants {
   val ENV_MOUNTED_CLASSPATH = "SPARK_MOUNTED_CLASSPATH"
   val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"
   val ENV_CLASSPATH = "SPARK_CLASSPATH"
-  val ENV_DRIVER_MAIN_CLASS = "SPARK_DRIVER_CLASS"
-  val ENV_DRIVER_ARGS = "SPARK_DRIVER_ARGS"
-  val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS"
   val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
-  val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"
-  val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"
-
-  // Bootstrapping dependencies with the init-container
-  val INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME = "download-jars-volume"
-  val INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME = "download-files-volume"
-  val INIT_CONTAINER_PROPERTIES_FILE_VOLUME = "spark-init-properties"
-  val INIT_CONTAINER_PROPERTIES_FILE_DIR = "/etc/spark-init"
-  val INIT_CONTAINER_PROPERTIES_FILE_NAME = "spark-init.properties"
-  val INIT_CONTAINER_PROPERTIES_FILE_PATH =
-    s"$INIT_CONTAINER_PROPERTIES_FILE_DIR/$INIT_CONTAINER_PROPERTIES_FILE_NAME"
-  val INIT_CONTAINER_SECRET_VOLUME_NAME = "spark-init-secret"
+  val ENV_SPARK_CONF_DIR = "SPARK_CONF_DIR"
+  // Spark app configs for containers
+  val SPARK_CONF_VOLUME = "spark-conf-volume"
+  val SPARK_CONF_DIR_INTERNAL = "/opt/spark/conf"
+  val SPARK_CONF_FILE_NAME = "spark.properties"
+  val SPARK_CONF_PATH = s"$SPARK_CONF_DIR_INTERNAL/$SPARK_CONF_FILE_NAME"
 
   // Miscellaneous
   val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/InitContainerBootstrap.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/InitContainerBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/InitContainerBootstrap.scala
deleted file mode 100644
index f6a57df..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/InitContainerBootstrap.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s
-
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, EnvVarBuilder, PodBuilder, VolumeMount, VolumeMountBuilder}
-
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-
-/**
- * Bootstraps an init-container for downloading remote dependencies. This is separated out from
- * the init-container steps API because this component can be used to bootstrap init-containers
- * for both the driver and executors.
- */
-private[spark] class InitContainerBootstrap(
-    initContainerImage: String,
-    imagePullPolicy: String,
-    jarsDownloadPath: String,
-    filesDownloadPath: String,
-    configMapName: String,
-    configMapKey: String,
-    sparkRole: String,
-    sparkConf: SparkConf) {
-
-  /**
-   * Bootstraps an init-container that downloads dependencies to be used by a main container.
-   */
-  def bootstrapInitContainer(
-      original: PodWithDetachedInitContainer): PodWithDetachedInitContainer = {
-    val sharedVolumeMounts = Seq[VolumeMount](
-      new VolumeMountBuilder()
-        .withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
-        .withMountPath(jarsDownloadPath)
-        .build(),
-      new VolumeMountBuilder()
-        .withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
-        .withMountPath(filesDownloadPath)
-        .build())
-
-    val customEnvVarKeyPrefix = sparkRole match {
-      case SPARK_POD_DRIVER_ROLE => KUBERNETES_DRIVER_ENV_KEY
-      case SPARK_POD_EXECUTOR_ROLE => "spark.executorEnv."
-      case _ => throw new SparkException(s"$sparkRole is not a valid Spark pod role")
-    }
-    val customEnvVars = sparkConf.getAllWithPrefix(customEnvVarKeyPrefix).toSeq.map {
-      case (key, value) =>
-        new EnvVarBuilder()
-          .withName(key)
-          .withValue(value)
-          .build()
-    }
-
-    val initContainer = new ContainerBuilder(original.initContainer)
-      .withName("spark-init")
-      .withImage(initContainerImage)
-      .withImagePullPolicy(imagePullPolicy)
-      .addAllToEnv(customEnvVars.asJava)
-      .addNewVolumeMount()
-        .withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
-        .withMountPath(INIT_CONTAINER_PROPERTIES_FILE_DIR)
-        .endVolumeMount()
-      .addToVolumeMounts(sharedVolumeMounts: _*)
-      .addToArgs("init")
-      .addToArgs(INIT_CONTAINER_PROPERTIES_FILE_PATH)
-      .build()
-
-    val podWithBasicVolumes = new PodBuilder(original.pod)
-      .editSpec()
-      .addNewVolume()
-        .withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
-        .withNewConfigMap()
-          .withName(configMapName)
-          .addNewItem()
-            .withKey(configMapKey)
-            .withPath(INIT_CONTAINER_PROPERTIES_FILE_NAME)
-            .endItem()
-          .endConfigMap()
-        .endVolume()
-      .addNewVolume()
-        .withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
-        .withEmptyDir(new EmptyDirVolumeSource())
-        .endVolume()
-      .addNewVolume()
-        .withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
-        .withEmptyDir(new EmptyDirVolumeSource())
-        .endVolume()
-      .endSpec()
-      .build()
-
-    val mainContainer = new ContainerBuilder(original.mainContainer)
-      .addToVolumeMounts(sharedVolumeMounts: _*)
-      .addNewEnv()
-        .withName(ENV_MOUNTED_FILES_DIR)
-        .withValue(filesDownloadPath)
-        .endEnv()
-      .build()
-
-    PodWithDetachedInitContainer(
-      podWithBasicVolumes,
-      initContainer,
-      mainContainer)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index 37331d8..5bc0701 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -16,10 +16,6 @@
  */
 package org.apache.spark.deploy.k8s
 
-import java.io.File
-
-import io.fabric8.kubernetes.api.model.{Container, Pod, PodBuilder}
-
 import org.apache.spark.SparkConf
 import org.apache.spark.util.Utils
 
@@ -44,71 +40,22 @@ private[spark] object KubernetesUtils {
   }
 
   /**
-   * Append the given init-container to a pod's list of init-containers.
-   *
-   * @param originalPodSpec original specification of the pod
-   * @param initContainer the init-container to add to the pod
-   * @return the pod with the init-container added to the list of InitContainers
-   */
-  def appendInitContainer(originalPodSpec: Pod, initContainer: Container): Pod = {
-    new PodBuilder(originalPodSpec)
-      .editOrNewSpec()
-        .addToInitContainers(initContainer)
-        .endSpec()
-      .build()
-  }
-
-  /**
    * For the given collection of file URIs, resolves them as follows:
-   * - File URIs with scheme file:// are resolved to the given download path.
    * - File URIs with scheme local:// resolve to just the path of the URI.
    * - Otherwise, the URIs are returned as-is.
    */
-  def resolveFileUris(
-      fileUris: Iterable[String],
-      fileDownloadPath: String): Iterable[String] = {
-    fileUris.map { uri =>
-      resolveFileUri(uri, fileDownloadPath, false)
-    }
-  }
-
-  /**
-   * If any file uri has any scheme other than local:// it is mapped as if the file
-   * was downloaded to the file download path. Otherwise, it is mapped to the path
-   * part of the URI.
-   */
-  def resolveFilePaths(fileUris: Iterable[String], fileDownloadPath: String): Iterable[String] = {
+  def resolveFileUrisAndPath(fileUris: Iterable[String]): Iterable[String] = {
     fileUris.map { uri =>
-      resolveFileUri(uri, fileDownloadPath, true)
-    }
-  }
-
-  /**
-   * Get from a given collection of file URIs the ones that represent remote files.
-   */
-  def getOnlyRemoteFiles(uris: Iterable[String]): Iterable[String] = {
-    uris.filter { uri =>
-      val scheme = Utils.resolveURI(uri).getScheme
-      scheme != "file" && scheme != "local"
+      resolveFileUri(uri)
     }
   }
 
-  private def resolveFileUri(
-      uri: String,
-      fileDownloadPath: String,
-      assumesDownloaded: Boolean): String = {
+  private def resolveFileUri(uri: String): String = {
     val fileUri = Utils.resolveURI(uri)
     val fileScheme = Option(fileUri.getScheme).getOrElse("file")
     fileScheme match {
-      case "local" =>
-        fileUri.getPath
-      case _ =>
-        if (assumesDownloaded || fileScheme == "file") {
-          val fileName = new File(fileUri.getPath).getName
-          s"$fileDownloadPath/$fileName"
-        } else {
-          uri
-        }
+      case "local" => fileUri.getPath
+      case _ => uri
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/PodWithDetachedInitContainer.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/PodWithDetachedInitContainer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/PodWithDetachedInitContainer.scala
deleted file mode 100644
index 0b79f8b..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/PodWithDetachedInitContainer.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s
-
-import io.fabric8.kubernetes.api.model.{Container, Pod}
-
-/**
- * Represents a pod with a detached init-container (not yet added to the pod).
- *
- * @param pod the pod
- * @param initContainer the init-container in the pod
- * @param mainContainer the main container in the pod
- */
-private[spark] case class PodWithDetachedInitContainer(
-    pod: Pod,
-    initContainer: Container,
-    mainContainer: Container)

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala
deleted file mode 100644
index c0f0878..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-
-import scala.concurrent.{ExecutionContext, Future}
-
-import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.{ThreadUtils, Utils}
-
-/**
- * Process that fetches files from a resource staging server and/or arbitrary remote locations.
- *
- * The init-container can handle fetching files from any of those sources, but not all of the
- * sources need to be specified. This allows for composing multiple instances of this container
- * with different configurations for different download sources, or using the same container to
- * download everything at once.
- */
-private[spark] class SparkPodInitContainer(
-    sparkConf: SparkConf,
-    fileFetcher: FileFetcher) extends Logging {
-
-  private val maxThreadPoolSize = sparkConf.get(INIT_CONTAINER_MAX_THREAD_POOL_SIZE)
-  private implicit val downloadExecutor = ExecutionContext.fromExecutorService(
-    ThreadUtils.newDaemonCachedThreadPool("download-executor", maxThreadPoolSize))
-
-  private val jarsDownloadDir = new File(sparkConf.get(JARS_DOWNLOAD_LOCATION))
-  private val filesDownloadDir = new File(sparkConf.get(FILES_DOWNLOAD_LOCATION))
-
-  private val remoteJars = sparkConf.get(INIT_CONTAINER_REMOTE_JARS)
-  private val remoteFiles = sparkConf.get(INIT_CONTAINER_REMOTE_FILES)
-
-  private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT)
-
-  def run(): Unit = {
-    logInfo(s"Downloading remote jars: $remoteJars")
-    downloadFiles(
-      remoteJars,
-      jarsDownloadDir,
-      s"Remote jars download directory specified at $jarsDownloadDir does not exist " +
-        "or is not a directory.")
-
-    logInfo(s"Downloading remote files: $remoteFiles")
-    downloadFiles(
-      remoteFiles,
-      filesDownloadDir,
-      s"Remote files download directory specified at $filesDownloadDir does not exist " +
-        "or is not a directory.")
-
-    downloadExecutor.shutdown()
-    downloadExecutor.awaitTermination(downloadTimeoutMinutes, TimeUnit.MINUTES)
-  }
-
-  private def downloadFiles(
-      filesCommaSeparated: Option[String],
-      downloadDir: File,
-      errMessage: String): Unit = {
-    filesCommaSeparated.foreach { files =>
-      require(downloadDir.isDirectory, errMessage)
-      Utils.stringToSeq(files).foreach { file =>
-        Future[Unit] {
-          fileFetcher.fetchFile(file, downloadDir)
-        }
-      }
-    }
-  }
-}
-
-private class FileFetcher(sparkConf: SparkConf, securityManager: SparkSecurityManager) {
-
-  def fetchFile(uri: String, targetDir: File): Unit = {
-    Utils.fetchFile(
-      url = uri,
-      targetDir = targetDir,
-      conf = sparkConf,
-      securityMgr = securityManager,
-      hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf),
-      timestamp = System.currentTimeMillis(),
-      useCache = false)
-  }
-}
-
-object SparkPodInitContainer extends Logging {
-
-  def main(args: Array[String]): Unit = {
-    logInfo("Starting init-container to download Spark application dependencies.")
-    val sparkConf = new SparkConf(true)
-    if (args.nonEmpty) {
-      Utils.loadDefaultSparkProperties(sparkConf, args(0))
-    }
-
-    val securityManager = new SparkSecurityManager(sparkConf)
-    val fileFetcher = new FileFetcher(sparkConf, securityManager)
-    new SparkPodInitContainer(sparkConf, fileFetcher).run()
-    logInfo("Finished downloading application dependencies.")
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala
index ae70904..b4d3f04 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala
@@ -16,16 +16,11 @@
  */
 package org.apache.spark.deploy.k8s.submit
 
-import java.util.UUID
-
-import com.google.common.primitives.Longs
-
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.submit.steps._
-import org.apache.spark.deploy.k8s.submit.steps.initcontainer.InitContainerConfigOrchestrator
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.util.SystemClock
 import org.apache.spark.util.Utils
@@ -34,13 +29,11 @@ import org.apache.spark.util.Utils
  * Figures out and returns the complete ordered list of needed DriverConfigurationSteps to
  * configure the Spark driver pod. The returned steps will be applied one by one in the given
  * order to produce a final KubernetesDriverSpec that is used in KubernetesClientApplication
- * to construct and create the driver pod. It uses the InitContainerConfigOrchestrator to
- * configure the driver init-container if one is needed, i.e., when there are remote dependencies
- * to localize.
+ * to construct and create the driver pod.
  */
 private[spark] class DriverConfigOrchestrator(
     kubernetesAppId: String,
-    launchTime: Long,
+    kubernetesResourceNamePrefix: String,
     mainAppResource: Option[MainAppResource],
     appName: String,
     mainClass: String,
@@ -50,15 +43,8 @@ private[spark] class DriverConfigOrchestrator(
   // The resource name prefix is derived from the Spark application name, making it easy to connect
   // the names of the Kubernetes resources from e.g. kubectl or the Kubernetes dashboard to the
   // application the user submitted.
-  private val kubernetesResourceNamePrefix = {
-    val uuid = UUID.nameUUIDFromBytes(Longs.toByteArray(launchTime)).toString.replaceAll("-", "")
-    s"$appName-$uuid".toLowerCase.replaceAll("\\.", "-")
-  }
 
   private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
-  private val initContainerConfigMapName = s"$kubernetesResourceNamePrefix-init-config"
-  private val jarsDownloadPath = sparkConf.get(JARS_DOWNLOAD_LOCATION)
-  private val filesDownloadPath = sparkConf.get(FILES_DOWNLOAD_LOCATION)
 
   def getAllConfigurationSteps: Seq[DriverConfigurationStep] = {
     val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
@@ -126,9 +112,7 @@ private[spark] class DriverConfigOrchestrator(
     val dependencyResolutionStep = if (sparkJars.nonEmpty || sparkFiles.nonEmpty) {
       Seq(new DependencyResolutionStep(
         sparkJars,
-        sparkFiles,
-        jarsDownloadPath,
-        filesDownloadPath))
+        sparkFiles))
     } else {
       Nil
     }
@@ -139,33 +123,12 @@ private[spark] class DriverConfigOrchestrator(
       Nil
     }
 
-    val initContainerBootstrapStep = if (existNonContainerLocalFiles(sparkJars ++ sparkFiles)) {
-      val orchestrator = new InitContainerConfigOrchestrator(
-        sparkJars,
-        sparkFiles,
-        jarsDownloadPath,
-        filesDownloadPath,
-        imagePullPolicy,
-        initContainerConfigMapName,
-        INIT_CONTAINER_PROPERTIES_FILE_NAME,
-        sparkConf)
-      val bootstrapStep = new DriverInitContainerBootstrapStep(
-        orchestrator.getAllConfigurationSteps,
-        initContainerConfigMapName,
-        INIT_CONTAINER_PROPERTIES_FILE_NAME)
-
-      Seq(bootstrapStep)
-    } else {
-      Nil
-    }
-
     Seq(
       initialSubmissionStep,
       serviceBootstrapStep,
       kubernetesCredentialsStep) ++
       dependencyResolutionStep ++
-      mountSecretsStep ++
-      initContainerBootstrapStep
+      mountSecretsStep
   }
 
   private def existSubmissionLocalFiles(files: Seq[String]): Boolean = {

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 5884348..e16d1ad 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -16,14 +16,14 @@
  */
 package org.apache.spark.deploy.k8s.submit
 
+import java.io.StringWriter
 import java.util.{Collections, UUID}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.util.control.NonFatal
+import java.util.Properties
 
 import io.fabric8.kubernetes.api.model._
 import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.mutable
+import scala.util.control.NonFatal
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkApplication
@@ -32,6 +32,7 @@ import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
 import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
 import org.apache.spark.util.Utils
 
 /**
@@ -93,10 +94,8 @@ private[spark] class Client(
     kubernetesClient: KubernetesClient,
     waitForAppCompletion: Boolean,
     appName: String,
-    watcher: LoggingPodStatusWatcher) extends Logging {
-
-  private val driverJavaOptions = sparkConf.get(
-    org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+    watcher: LoggingPodStatusWatcher,
+    kubernetesResourceNamePrefix: String) extends Logging {
 
    /**
     * Run command that initializes a DriverSpec that will be updated after each
@@ -110,33 +109,31 @@ private[spark] class Client(
     for (nextStep <- submissionSteps) {
       currentDriverSpec = nextStep.configureDriver(currentDriverSpec)
     }
-
-    val resolvedDriverJavaOpts = currentDriverSpec
-      .driverSparkConf
-      // Remove this as the options are instead extracted and set individually below using
-      // environment variables with prefix SPARK_JAVA_OPT_.
-      .remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
-      .getAll
-      .map {
-        case (confKey, confValue) => s"-D$confKey=$confValue"
-      } ++ driverJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
-    val driverJavaOptsEnvs: Seq[EnvVar] = resolvedDriverJavaOpts.zipWithIndex.map {
-      case (option, index) =>
-        new EnvVarBuilder()
-          .withName(s"$ENV_JAVA_OPT_PREFIX$index")
-          .withValue(option)
-          .build()
-    }
-
+    val configMapName = s"$kubernetesResourceNamePrefix-driver-conf-map"
+    val configMap = buildConfigMap(configMapName, currentDriverSpec.driverSparkConf)
+    // The include of the ENV_VAR for "SPARK_CONF_DIR" is to allow for the
+    // Spark command builder to pickup on the Java Options present in the ConfigMap
     val resolvedDriverContainer = new ContainerBuilder(currentDriverSpec.driverContainer)
-      .addAllToEnv(driverJavaOptsEnvs.asJava)
+      .addNewEnv()
+        .withName(ENV_SPARK_CONF_DIR)
+        .withValue(SPARK_CONF_DIR_INTERNAL)
+        .endEnv()
+      .addNewVolumeMount()
+        .withName(SPARK_CONF_VOLUME)
+        .withMountPath(SPARK_CONF_DIR_INTERNAL)
+        .endVolumeMount()
       .build()
     val resolvedDriverPod = new PodBuilder(currentDriverSpec.driverPod)
       .editSpec()
         .addToContainers(resolvedDriverContainer)
+        .addNewVolume()
+          .withName(SPARK_CONF_VOLUME)
+          .withNewConfigMap()
+            .withName(configMapName)
+            .endConfigMap()
+          .endVolume()
         .endSpec()
       .build()
-
     Utils.tryWithResource(
       kubernetesClient
         .pods()
@@ -145,7 +142,8 @@ private[spark] class Client(
       val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
       try {
         if (currentDriverSpec.otherKubernetesResources.nonEmpty) {
-          val otherKubernetesResources = currentDriverSpec.otherKubernetesResources
+          val otherKubernetesResources =
+            currentDriverSpec.otherKubernetesResources ++ Seq(configMap)
           addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
           kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
         }
@@ -180,6 +178,26 @@ private[spark] class Client(
       originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference))
     }
   }
+
+  // Build a Config Map that will house spark conf properties in a single file for spark-submit
+  private def buildConfigMap(configMapName: String, conf: SparkConf): ConfigMap = {
+    val properties = new Properties()
+    conf.getAll.foreach { case (k, v) =>
+      properties.setProperty(k, v)
+    }
+    val propertiesWriter = new StringWriter()
+    properties.store(propertiesWriter,
+      s"Java properties built from Kubernetes config map with name: $configMapName")
+
+    val namespace = conf.get(KUBERNETES_NAMESPACE)
+    new ConfigMapBuilder()
+      .withNewMetadata()
+        .withName(configMapName)
+        .withNamespace(namespace)
+        .endMetadata()
+      .addToData(SPARK_CONF_FILE_NAME, propertiesWriter.toString)
+      .build()
+  }
 }
 
 /**
@@ -202,6 +220,9 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
     val launchTime = System.currentTimeMillis()
     val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
     val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
+    val kubernetesResourceNamePrefix = {
+      s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
+    }
     // The master URL has been checked for validity already in SparkSubmit.
     // We just need to get rid of the "k8s://" prefix here.
     val master = sparkConf.get("spark.master").substring("k8s://".length)
@@ -211,7 +232,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
 
     val orchestrator = new DriverConfigOrchestrator(
       kubernetesAppId,
-      launchTime,
+      kubernetesResourceNamePrefix,
       clientArguments.mainAppResource,
       appName,
       clientArguments.mainClass,
@@ -231,7 +252,8 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
           kubernetesClient,
           waitForAppCompletion,
           appName,
-          watcher)
+          watcher,
+          kubernetesResourceNamePrefix)
         client.run()
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
index 164e2e5..347c4d2 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
@@ -26,6 +26,7 @@ import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.KubernetesUtils
 import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
 import org.apache.spark.internal.config.{DRIVER_CLASS_PATH, DRIVER_MEMORY, DRIVER_MEMORY_OVERHEAD}
+import org.apache.spark.launcher.SparkLauncher
 
 /**
  * Performs basic configuration for the driver pod.
@@ -56,8 +57,6 @@ private[spark] class BasicDriverConfigurationStep(
 
   // Memory settings
   private val driverMemoryMiB = sparkConf.get(DRIVER_MEMORY)
-  private val driverMemoryString = sparkConf.get(
-    DRIVER_MEMORY.key, DRIVER_MEMORY.defaultValueString)
   private val memoryOverheadMiB = sparkConf
     .get(DRIVER_MEMORY_OVERHEAD)
     .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB))
@@ -103,25 +102,13 @@ private[spark] class BasicDriverConfigurationStep(
       ("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
     }
 
-    val driverContainer = new ContainerBuilder(driverSpec.driverContainer)
+    val driverContainerWithoutArgs = new ContainerBuilder(driverSpec.driverContainer)
       .withName(DRIVER_CONTAINER_NAME)
       .withImage(driverContainerImage)
       .withImagePullPolicy(imagePullPolicy)
       .addAllToEnv(driverCustomEnvs.asJava)
       .addToEnv(driverExtraClasspathEnv.toSeq: _*)
       .addNewEnv()
-        .withName(ENV_DRIVER_MEMORY)
-        .withValue(driverMemoryString)
-        .endEnv()
-      .addNewEnv()
-        .withName(ENV_DRIVER_MAIN_CLASS)
-        .withValue(mainClass)
-        .endEnv()
-      .addNewEnv()
-        .withName(ENV_DRIVER_ARGS)
-        .withValue(appArgs.mkString(" "))
-        .endEnv()
-      .addNewEnv()
         .withName(ENV_DRIVER_BIND_ADDRESS)
         .withValueFrom(new EnvVarSourceBuilder()
           .withNewFieldRef("v1", "status.podIP")
@@ -134,7 +121,16 @@ private[spark] class BasicDriverConfigurationStep(
         .addToLimits(maybeCpuLimitQuantity.toMap.asJava)
         .endResources()
       .addToArgs("driver")
-      .build()
+      .addToArgs("--properties-file", SPARK_CONF_PATH)
+      .addToArgs("--class", mainClass)
+      // The user application jar is merged into the spark.jars list and managed through that
+      // property, so there is no need to reference it explicitly here.
+      .addToArgs(SparkLauncher.NO_RESOURCE)
+
+    val driverContainer = appArgs.toList match {
+      case "" :: Nil | Nil => driverContainerWithoutArgs.build()
+      case _ => driverContainerWithoutArgs.addToArgs(appArgs: _*).build()
+    }
 
     val baseDriverPod = new PodBuilder(driverSpec.driverPod)
       .editOrNewMetadata()
@@ -152,10 +148,14 @@ private[spark] class BasicDriverConfigurationStep(
       .setIfMissing(KUBERNETES_DRIVER_POD_NAME, driverPodName)
       .set("spark.app.id", kubernetesAppId)
       .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, resourceNamePrefix)
+      // to set the config variables to allow client-mode spark-submit from driver
+      .set(KUBERNETES_DRIVER_SUBMIT_CHECK, true)
 
     driverSpec.copy(
       driverPod = baseDriverPod,
       driverSparkConf = resolvedSparkConf,
       driverContainer = driverContainer)
   }
+
 }
+

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
index d4b8323..43de329 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
@@ -30,13 +30,11 @@ import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
  */
 private[spark] class DependencyResolutionStep(
     sparkJars: Seq[String],
-    sparkFiles: Seq[String],
-    jarsDownloadPath: String,
-    filesDownloadPath: String) extends DriverConfigurationStep {
+    sparkFiles: Seq[String]) extends DriverConfigurationStep {
 
   override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
-    val resolvedSparkJars = KubernetesUtils.resolveFileUris(sparkJars, jarsDownloadPath)
-    val resolvedSparkFiles = KubernetesUtils.resolveFileUris(sparkFiles, filesDownloadPath)
+    val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath(sparkJars)
+    val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath(sparkFiles)
 
     val sparkConf = driverSpec.driverSparkConf.clone()
     if (resolvedSparkJars.nonEmpty) {
@@ -45,14 +43,12 @@ private[spark] class DependencyResolutionStep(
     if (resolvedSparkFiles.nonEmpty) {
       sparkConf.set("spark.files", resolvedSparkFiles.mkString(","))
     }
-
-    val resolvedClasspath = KubernetesUtils.resolveFilePaths(sparkJars, jarsDownloadPath)
-    val resolvedDriverContainer = if (resolvedClasspath.nonEmpty) {
+    val resolvedDriverContainer = if (resolvedSparkJars.nonEmpty) {
       new ContainerBuilder(driverSpec.driverContainer)
         .addNewEnv()
-        .withName(ENV_MOUNTED_CLASSPATH)
-        .withValue(resolvedClasspath.mkString(File.pathSeparator))
-        .endEnv()
+          .withName(ENV_MOUNTED_CLASSPATH)
+          .withValue(resolvedSparkJars.mkString(File.pathSeparator))
+          .endEnv()
         .build()
     } else {
       driverSpec.driverContainer

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStep.scala
deleted file mode 100644
index 9fb3daf..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStep.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import java.io.StringWriter
-import java.util.Properties
-
-import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, ContainerBuilder, HasMetadata}
-
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.KubernetesUtils
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-import org.apache.spark.deploy.k8s.submit.steps.initcontainer.{InitContainerConfigurationStep, InitContainerSpec}
-
-/**
- * Configures the driver init-container that localizes remote dependencies into the driver pod.
- * It applies the given InitContainerConfigurationSteps in the given order to produce a final
- * InitContainerSpec that is then used to configure the driver pod with the init-container attached.
- * It also builds a ConfigMap that will be mounted into the init-container. The ConfigMap carries
- * configuration properties for the init-container.
- */
-private[spark] class DriverInitContainerBootstrapStep(
-    steps: Seq[InitContainerConfigurationStep],
-    configMapName: String,
-    configMapKey: String)
-  extends DriverConfigurationStep {
-
-  override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
-    var initContainerSpec = InitContainerSpec(
-      properties = Map.empty[String, String],
-      driverSparkConf = Map.empty[String, String],
-      initContainer = new ContainerBuilder().build(),
-      driverContainer = driverSpec.driverContainer,
-      driverPod = driverSpec.driverPod,
-      dependentResources = Seq.empty[HasMetadata])
-    for (nextStep <- steps) {
-      initContainerSpec = nextStep.configureInitContainer(initContainerSpec)
-    }
-
-    val configMap = buildConfigMap(
-      configMapName,
-      configMapKey,
-      initContainerSpec.properties)
-    val resolvedDriverSparkConf = driverSpec.driverSparkConf
-      .clone()
-      .set(INIT_CONTAINER_CONFIG_MAP_NAME, configMapName)
-      .set(INIT_CONTAINER_CONFIG_MAP_KEY_CONF, configMapKey)
-      .setAll(initContainerSpec.driverSparkConf)
-    val resolvedDriverPod = KubernetesUtils.appendInitContainer(
-      initContainerSpec.driverPod, initContainerSpec.initContainer)
-
-    driverSpec.copy(
-      driverPod = resolvedDriverPod,
-      driverContainer = initContainerSpec.driverContainer,
-      driverSparkConf = resolvedDriverSparkConf,
-      otherKubernetesResources =
-        driverSpec.otherKubernetesResources ++
-          initContainerSpec.dependentResources ++
-          Seq(configMap))
-  }
-
-  private def buildConfigMap(
-      configMapName: String,
-      configMapKey: String,
-      config: Map[String, String]): ConfigMap = {
-    val properties = new Properties()
-    config.foreach { entry =>
-      properties.setProperty(entry._1, entry._2)
-    }
-    val propertiesWriter = new StringWriter()
-    properties.store(propertiesWriter,
-      s"Java properties built from Kubernetes config map with name: $configMapName " +
-        s"and config map key: $configMapKey")
-    new ConfigMapBuilder()
-      .withNewMetadata()
-        .withName(configMapName)
-        .endMetadata()
-      .addToData(configMapKey, propertiesWriter.toString)
-      .build()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
index ccc1890..2424e63 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
@@ -99,7 +99,7 @@ private[spark] class DriverKubernetesCredentialsStep(
       }.getOrElse(driverSpec.driverPod)
     )
 
-    val driverContainerWithMountedSecretVolume = kubernetesCredentialsSecret.map { secret =>
+    val driverContainerWithMountedSecretVolume = kubernetesCredentialsSecret.map { _ =>
       new ContainerBuilder(driverSpec.driverContainer)
         .addNewVolumeMount()
           .withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStep.scala
deleted file mode 100644
index 0146985..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStep.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps.initcontainer
-
-import org.apache.spark.deploy.k8s.{InitContainerBootstrap, PodWithDetachedInitContainer}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.KubernetesUtils
-
-/**
- * Performs basic configuration for the driver init-container with most of the work delegated to
- * the given InitContainerBootstrap.
- */
-private[spark] class BasicInitContainerConfigurationStep(
-    sparkJars: Seq[String],
-    sparkFiles: Seq[String],
-    jarsDownloadPath: String,
-    filesDownloadPath: String,
-    bootstrap: InitContainerBootstrap)
-  extends InitContainerConfigurationStep {
-
-  override def configureInitContainer(spec: InitContainerSpec): InitContainerSpec = {
-    val remoteJarsToDownload = KubernetesUtils.getOnlyRemoteFiles(sparkJars)
-    val remoteFilesToDownload = KubernetesUtils.getOnlyRemoteFiles(sparkFiles)
-    val remoteJarsConf = if (remoteJarsToDownload.nonEmpty) {
-      Map(INIT_CONTAINER_REMOTE_JARS.key -> remoteJarsToDownload.mkString(","))
-    } else {
-      Map()
-    }
-    val remoteFilesConf = if (remoteFilesToDownload.nonEmpty) {
-      Map(INIT_CONTAINER_REMOTE_FILES.key -> remoteFilesToDownload.mkString(","))
-    } else {
-      Map()
-    }
-
-    val baseInitContainerConfig = Map(
-      JARS_DOWNLOAD_LOCATION.key -> jarsDownloadPath,
-      FILES_DOWNLOAD_LOCATION.key -> filesDownloadPath) ++
-      remoteJarsConf ++
-      remoteFilesConf
-
-    val bootstrapped = bootstrap.bootstrapInitContainer(
-      PodWithDetachedInitContainer(
-        spec.driverPod,
-        spec.initContainer,
-        spec.driverContainer))
-
-    spec.copy(
-      initContainer = bootstrapped.initContainer,
-      driverContainer = bootstrapped.mainContainer,
-      driverPod = bootstrapped.pod,
-      properties = spec.properties ++ baseInitContainerConfig)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestrator.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestrator.scala
deleted file mode 100644
index f2c29c7..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestrator.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps.initcontainer
-
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.k8s.{InitContainerBootstrap, KubernetesUtils, MountSecretsBootstrap}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-
-/**
- * Figures out and returns the complete ordered list of InitContainerConfigurationSteps required to
- * configure the driver init-container. The returned steps will be applied in the given order to
- * produce a final InitContainerSpec that is used to construct the driver init-container in
- * DriverInitContainerBootstrapStep. This class is only used when an init-container is needed, i.e.,
- * when there are remote application dependencies to localize.
- */
-private[spark] class InitContainerConfigOrchestrator(
-    sparkJars: Seq[String],
-    sparkFiles: Seq[String],
-    jarsDownloadPath: String,
-    filesDownloadPath: String,
-    imagePullPolicy: String,
-    configMapName: String,
-    configMapKey: String,
-    sparkConf: SparkConf) {
-
-  private val initContainerImage = sparkConf
-    .get(INIT_CONTAINER_IMAGE)
-    .getOrElse(throw new SparkException(
-      "Must specify the init-container image when there are remote dependencies"))
-
-  def getAllConfigurationSteps: Seq[InitContainerConfigurationStep] = {
-    val initContainerBootstrap = new InitContainerBootstrap(
-      initContainerImage,
-      imagePullPolicy,
-      jarsDownloadPath,
-      filesDownloadPath,
-      configMapName,
-      configMapKey,
-      SPARK_POD_DRIVER_ROLE,
-      sparkConf)
-    val baseStep = new BasicInitContainerConfigurationStep(
-      sparkJars,
-      sparkFiles,
-      jarsDownloadPath,
-      filesDownloadPath,
-      initContainerBootstrap)
-
-    val secretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf,
-      KUBERNETES_DRIVER_SECRETS_PREFIX)
-    // Mount user-specified driver secrets also into the driver's init-container. The
-    // init-container may need credentials in the secrets to be able to download remote
-    // dependencies. The driver's main container and its init-container share the secrets
-    // because the init-container is sort of an implementation details and this sharing
-    // avoids introducing a dedicated configuration property just for the init-container.
-    val mountSecretsStep = if (secretNamesToMountPaths.nonEmpty) {
-      Seq(new InitContainerMountSecretsStep(new MountSecretsBootstrap(secretNamesToMountPaths)))
-    } else {
-      Nil
-    }
-
-    Seq(baseStep) ++ mountSecretsStep
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigurationStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigurationStep.scala
deleted file mode 100644
index 0372ad5..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigurationStep.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps.initcontainer
-
-/**
- * Represents a step in configuring the driver init-container.
- */
-private[spark] trait InitContainerConfigurationStep {
-
-  def configureInitContainer(spec: InitContainerSpec): InitContainerSpec
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStep.scala
deleted file mode 100644
index 0daa7b9..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStep.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps.initcontainer
-
-import org.apache.spark.deploy.k8s.MountSecretsBootstrap
-
-/**
- * An init-container configuration step for mounting user-specified secrets onto user-specified
- * paths.
- *
- * @param bootstrap a utility actually handling mounting of the secrets
- */
-private[spark] class InitContainerMountSecretsStep(
-    bootstrap: MountSecretsBootstrap) extends InitContainerConfigurationStep {
-
-  override def configureInitContainer(spec: InitContainerSpec) : InitContainerSpec = {
-    // Mount the secret volumes given that the volumes have already been added to the driver pod
-    // when mounting the secrets into the main driver container.
-    val initContainer = bootstrap.mountSecrets(spec.initContainer)
-    spec.copy(initContainer = initContainer)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerSpec.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerSpec.scala
deleted file mode 100644
index b52c343..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerSpec.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps.initcontainer
-
-import io.fabric8.kubernetes.api.model.{Container, HasMetadata, Pod}
-
-/**
- * Represents a specification of the init-container for the driver pod.
- *
- * @param properties properties that should be set on the init-container
- * @param driverSparkConf Spark configuration properties that will be carried back to the driver
- * @param initContainer the init-container object
- * @param driverContainer the driver container object
- * @param driverPod the driver pod object
- * @param dependentResources resources the init-container depends on to work
- */
-private[spark] case class InitContainerSpec(
-    properties: Map[String, String],
-    driverSparkConf: Map[String, String],
-    initContainer: Container,
-    driverContainer: Container,
-    driverPod: Pod,
-    dependentResources: Seq[HasMetadata])

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
index 141bd28..98cbd56 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
 import io.fabric8.kubernetes.api.model._
 
 import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.k8s.{InitContainerBootstrap, KubernetesUtils, MountSecretsBootstrap, PodWithDetachedInitContainer}
+import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
@@ -34,18 +34,10 @@ import org.apache.spark.util.Utils
  * @param sparkConf Spark configuration
  * @param mountSecretsBootstrap an optional component for mounting user-specified secrets onto
  *                              user-specified paths into the executor container
- * @param initContainerBootstrap an optional component for bootstrapping the executor init-container
- *                               if one is needed, i.e., when there are remote dependencies to
- *                               localize
- * @param initContainerMountSecretsBootstrap an optional component for mounting user-specified
- *                                           secrets onto user-specified paths into the executor
- *                                           init-container
  */
 private[spark] class ExecutorPodFactory(
     sparkConf: SparkConf,
-    mountSecretsBootstrap: Option[MountSecretsBootstrap],
-    initContainerBootstrap: Option[InitContainerBootstrap],
-    initContainerMountSecretsBootstrap: Option[MountSecretsBootstrap]) {
+    mountSecretsBootstrap: Option[MountSecretsBootstrap]) {
 
   private val executorExtraClasspath = sparkConf.get(EXECUTOR_CLASS_PATH)
 
@@ -94,8 +86,6 @@ private[spark] class ExecutorPodFactory(
   private val executorCores = sparkConf.getDouble("spark.executor.cores", 1)
   private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
 
-  private val executorJarsDownloadDir = sparkConf.get(JARS_DOWNLOAD_LOCATION)
-
   /**
    * Configure and construct an executor pod with the given parameters.
    */
@@ -147,8 +137,9 @@ private[spark] class ExecutorPodFactory(
       (ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString),
       (ENV_EXECUTOR_MEMORY, executorMemoryString),
       (ENV_APPLICATION_ID, applicationId),
-      (ENV_EXECUTOR_ID, executorId),
-      (ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*")) ++ executorEnvs)
+      // This is to set the SPARK_CONF_DIR to be /opt/spark/conf
+      (ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL),
+      (ENV_EXECUTOR_ID, executorId)) ++ executorEnvs)
       .map(env => new EnvVarBuilder()
         .withName(env._1)
         .withValue(env._2)
@@ -221,30 +212,10 @@ private[spark] class ExecutorPodFactory(
         (bootstrap.addSecretVolumes(executorPod), bootstrap.mountSecrets(containerWithLimitCores))
       }.getOrElse((executorPod, containerWithLimitCores))
 
-    val (bootstrappedPod, bootstrappedContainer) =
-      initContainerBootstrap.map { bootstrap =>
-        val podWithInitContainer = bootstrap.bootstrapInitContainer(
-          PodWithDetachedInitContainer(
-            maybeSecretsMountedPod,
-            new ContainerBuilder().build(),
-            maybeSecretsMountedContainer))
-
-        val (pod, mayBeSecretsMountedInitContainer) =
-          initContainerMountSecretsBootstrap.map { bootstrap =>
-            // Mount the secret volumes given that the volumes have already been added to the
-            // executor pod when mounting the secrets into the main executor container.
-            (podWithInitContainer.pod, bootstrap.mountSecrets(podWithInitContainer.initContainer))
-          }.getOrElse((podWithInitContainer.pod, podWithInitContainer.initContainer))
-
-        val bootstrappedPod = KubernetesUtils.appendInitContainer(
-          pod, mayBeSecretsMountedInitContainer)
-
-        (bootstrappedPod, podWithInitContainer.mainContainer)
-      }.getOrElse((maybeSecretsMountedPod, maybeSecretsMountedContainer))
 
-    new PodBuilder(bootstrappedPod)
+    new PodBuilder(maybeSecretsMountedPod)
       .editSpec()
-        .addToContainers(bootstrappedContainer)
+        .addToContainers(maybeSecretsMountedContainer)
         .endSpec()
       .build()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index a942db6..ff5f680 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -21,7 +21,7 @@ import java.io.File
 import io.fabric8.kubernetes.client.Config
 
 import org.apache.spark.{SparkContext, SparkException}
-import org.apache.spark.deploy.k8s.{InitContainerBootstrap, KubernetesUtils, MountSecretsBootstrap, SparkKubernetesClientFactory}
+import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap, SparkKubernetesClientFactory}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.Logging
@@ -33,7 +33,9 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
   override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s")
 
   override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
-    if (masterURL.startsWith("k8s") && sc.deployMode == "client") {
+    if (masterURL.startsWith("k8s") &&
+      sc.deployMode == "client" &&
+      !sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK).getOrElse(false)) {
       throw new SparkException("Client mode is currently not supported for Kubernetes.")
     }
 
@@ -44,74 +46,23 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
       sc: SparkContext,
       masterURL: String,
       scheduler: TaskScheduler): SchedulerBackend = {
-    val sparkConf = sc.getConf
-    val initContainerConfigMap = sparkConf.get(INIT_CONTAINER_CONFIG_MAP_NAME)
-    val initContainerConfigMapKey = sparkConf.get(INIT_CONTAINER_CONFIG_MAP_KEY_CONF)
-
-    if (initContainerConfigMap.isEmpty) {
-      logWarning("The executor's init-container config map is not specified. Executors will " +
-        "therefore not attempt to fetch remote or submitted dependencies.")
-    }
-
-    if (initContainerConfigMapKey.isEmpty) {
-      logWarning("The executor's init-container config map key is not specified. Executors will " +
-        "therefore not attempt to fetch remote or submitted dependencies.")
-    }
-
-    // Only set up the bootstrap if they've provided both the config map key and the config map
-    // name. The config map might not be provided if init-containers aren't being used to
-    // bootstrap dependencies.
-    val initContainerBootstrap = for {
-      configMap <- initContainerConfigMap
-      configMapKey <- initContainerConfigMapKey
-    } yield {
-      val initContainerImage = sparkConf
-        .get(INIT_CONTAINER_IMAGE)
-        .getOrElse(throw new SparkException(
-          "Must specify the init-container image when there are remote dependencies"))
-      new InitContainerBootstrap(
-        initContainerImage,
-        sparkConf.get(CONTAINER_IMAGE_PULL_POLICY),
-        sparkConf.get(JARS_DOWNLOAD_LOCATION),
-        sparkConf.get(FILES_DOWNLOAD_LOCATION),
-        configMap,
-        configMapKey,
-        SPARK_POD_EXECUTOR_ROLE,
-        sparkConf)
-    }
-
     val executorSecretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
+      sc.conf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
     val mountSecretBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) {
       Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths))
     } else {
       None
     }
-    // Mount user-specified executor secrets also into the executor's init-container. The
-    // init-container may need credentials in the secrets to be able to download remote
-    // dependencies. The executor's main container and its init-container share the secrets
-    // because the init-container is sort of an implementation details and this sharing
-    // avoids introducing a dedicated configuration property just for the init-container.
-    val initContainerMountSecretsBootstrap = if (initContainerBootstrap.nonEmpty &&
-      executorSecretNamesToMountPaths.nonEmpty) {
-      Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths))
-    } else {
-      None
-    }
 
     val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
       KUBERNETES_MASTER_INTERNAL_URL,
-      Some(sparkConf.get(KUBERNETES_NAMESPACE)),
+      Some(sc.conf.get(KUBERNETES_NAMESPACE)),
       KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
-      sparkConf,
+      sc.conf,
       Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
       Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
 
-    val executorPodFactory = new ExecutorPodFactory(
-      sparkConf,
-      mountSecretBootstrap,
-      initContainerBootstrap,
-      initContainerMountSecretsBootstrap)
+    val executorPodFactory = new ExecutorPodFactory(sc.conf, mountSecretBootstrap)
 
     val allocatorExecutor = ThreadUtils
       .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")

http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerSuite.scala
deleted file mode 100644
index e0f29ec..0000000
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerSuite.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s
-
-import java.io.File
-import java.util.UUID
-
-import com.google.common.base.Charsets
-import com.google.common.io.Files
-import org.mockito.Mockito
-import org.scalatest.BeforeAndAfter
-import org.scalatest.mockito.MockitoSugar._
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.util.Utils
-
-class SparkPodInitContainerSuite extends SparkFunSuite with BeforeAndAfter {
-
-  private val DOWNLOAD_JARS_SECRET_LOCATION = createTempFile("txt")
-  private val DOWNLOAD_FILES_SECRET_LOCATION = createTempFile("txt")
-
-  private var downloadJarsDir: File = _
-  private var downloadFilesDir: File = _
-  private var downloadJarsSecretValue: String = _
-  private var downloadFilesSecretValue: String = _
-  private var fileFetcher: FileFetcher = _
-
-  override def beforeAll(): Unit = {
-    downloadJarsSecretValue = Files.toString(
-      new File(DOWNLOAD_JARS_SECRET_LOCATION), Charsets.UTF_8)
-    downloadFilesSecretValue = Files.toString(
-      new File(DOWNLOAD_FILES_SECRET_LOCATION), Charsets.UTF_8)
-  }
-
-  before {
-    downloadJarsDir = Utils.createTempDir()
-    downloadFilesDir = Utils.createTempDir()
-    fileFetcher = mock[FileFetcher]
-  }
-
-  after {
-    downloadJarsDir.delete()
-    downloadFilesDir.delete()
-  }
-
-  test("Downloads from remote server should invoke the file fetcher") {
-    val sparkConf = getSparkConfForRemoteFileDownloads
-    val initContainerUnderTest = new SparkPodInitContainer(sparkConf, fileFetcher)
-    initContainerUnderTest.run()
-    Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/jar1.jar", downloadJarsDir)
-    Mockito.verify(fileFetcher).fetchFile("hdfs://localhost:9000/jar2.jar", downloadJarsDir)
-    Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/file.txt", downloadFilesDir)
-  }
-
-  private def getSparkConfForRemoteFileDownloads: SparkConf = {
-    new SparkConf(true)
-      .set(INIT_CONTAINER_REMOTE_JARS,
-        "http://localhost:9000/jar1.jar,hdfs://localhost:9000/jar2.jar")
-      .set(INIT_CONTAINER_REMOTE_FILES,
-        "http://localhost:9000/file.txt")
-      .set(JARS_DOWNLOAD_LOCATION, downloadJarsDir.getAbsolutePath)
-      .set(FILES_DOWNLOAD_LOCATION, downloadFilesDir.getAbsolutePath)
-  }
-
-  private def createTempFile(extension: String): String = {
-    val dir = Utils.createTempDir()
-    val file = new File(dir, s"${UUID.randomUUID().toString}.$extension")
-    Files.write(UUID.randomUUID().toString, file, Charsets.UTF_8)
-    file.getAbsolutePath
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org