You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fo...@apache.org on 2018/04/13 15:44:23 UTC

[3/3] spark git commit: [SPARK-22839][K8S] Refactor to unify driver and executor pod builder APIs

[SPARK-22839][K8S] Refactor to unify driver and executor pod builder APIs

## What changes were proposed in this pull request?

Breaks down the construction of driver pods and executor pods in a way that uses a common abstraction for both spark-submit creating the driver and KubernetesClusterSchedulerBackend creating the executor. Encourages more code reuse and is more legible than the older approach.

The high-level design is discussed in more detail on the JIRA ticket. This pull request is the implementation of that design with some minor changes in the implementation details.

No user-facing behavior should break as a result of this change.

## How was this patch tested?

Migrated all unit tests from the old submission steps architecture to the new architecture. Integration tests should not have to change and pass given that this shouldn't change any outward behavior.

Author: mcheah <mc...@palantir.com>

Closes #20910 from mccheah/spark-22839-incremental.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a83ae0d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a83ae0d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a83ae0d9

Branch: refs/heads/master
Commit: a83ae0d9bc1b8f4909b9338370efe4020079bea7
Parents: 0323e61
Author: mcheah <mc...@palantir.com>
Authored: Fri Apr 13 08:43:58 2018 -0700
Committer: Anirudh Ramanathan <ra...@google.com>
Committed: Fri Apr 13 08:43:58 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/k8s/Config.scala    |   2 +-
 .../spark/deploy/k8s/KubernetesConf.scala       | 184 ++++++++++++++
 .../spark/deploy/k8s/KubernetesDriverSpec.scala |  31 +++
 .../spark/deploy/k8s/KubernetesUtils.scala      |  11 -
 .../deploy/k8s/MountSecretsBootstrap.scala      |  72 ------
 .../org/apache/spark/deploy/k8s/SparkPod.scala  |  34 +++
 .../k8s/features/BasicDriverFeatureStep.scala   | 136 ++++++++++
 .../k8s/features/BasicExecutorFeatureStep.scala | 179 ++++++++++++++
 ...DriverKubernetesCredentialsFeatureStep.scala | 216 ++++++++++++++++
 .../k8s/features/DriverServiceFeatureStep.scala |  97 ++++++++
 .../features/KubernetesFeatureConfigStep.scala  |  71 ++++++
 .../k8s/features/MountSecretsFeatureStep.scala  |  62 +++++
 .../k8s/submit/DriverConfigOrchestrator.scala   | 145 -----------
 .../submit/KubernetesClientApplication.scala    |  80 +++---
 .../k8s/submit/KubernetesDriverBuilder.scala    |  56 +++++
 .../k8s/submit/KubernetesDriverSpec.scala       |  47 ----
 .../steps/BasicDriverConfigurationStep.scala    | 163 ------------
 .../submit/steps/DependencyResolutionStep.scala |  61 -----
 .../submit/steps/DriverConfigurationStep.scala  |  30 ---
 .../steps/DriverKubernetesCredentialsStep.scala | 245 -------------------
 .../submit/steps/DriverMountSecretsStep.scala   |  38 ---
 .../steps/DriverServiceBootstrapStep.scala      | 104 --------
 .../cluster/k8s/ExecutorPodFactory.scala        | 227 -----------------
 .../cluster/k8s/KubernetesClusterManager.scala  |  12 +-
 .../k8s/KubernetesClusterSchedulerBackend.scala |  20 +-
 .../cluster/k8s/KubernetesExecutorBuilder.scala |  41 ++++
 .../spark/deploy/k8s/KubernetesConfSuite.scala  | 175 +++++++++++++
 .../spark/deploy/k8s/KubernetesUtilsTest.scala  |  36 ---
 .../features/BasicDriverFeatureStepSuite.scala  | 153 ++++++++++++
 .../BasicExecutorFeatureStepSuite.scala         | 179 ++++++++++++++
 ...rKubernetesCredentialsFeatureStepSuite.scala | 174 +++++++++++++
 .../DriverServiceFeatureStepSuite.scala         | 227 +++++++++++++++++
 .../features/KubernetesFeaturesTestUtils.scala  |  61 +++++
 .../features/MountSecretsFeatureStepSuite.scala |  58 +++++
 .../spark/deploy/k8s/submit/ClientSuite.scala   | 216 ++++++++--------
 .../submit/DriverConfigOrchestratorSuite.scala  | 131 ----------
 .../submit/KubernetesDriverBuilderSuite.scala   | 102 ++++++++
 .../BasicDriverConfigurationStepSuite.scala     | 122 ---------
 .../steps/DependencyResolutionStepSuite.scala   |  69 ------
 .../DriverKubernetesCredentialsStepSuite.scala  | 153 ------------
 .../steps/DriverMountSecretsStepSuite.scala     |  49 ----
 .../steps/DriverServiceBootstrapStepSuite.scala | 180 --------------
 .../cluster/k8s/ExecutorPodFactorySuite.scala   | 195 ---------------
 ...KubernetesClusterSchedulerBackendSuite.scala |  37 +--
 .../k8s/KubernetesExecutorBuilderSuite.scala    |  75 ++++++
 45 files changed, 2482 insertions(+), 2274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 82f6c71..4086970 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -167,5 +167,5 @@ private[spark] object Config extends Logging {
   val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
   val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."
 
-  val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."
+  val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv."
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
new file mode 100644
index 0000000..77b634d
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, MainAppResource}
+import org.apache.spark.internal.config.ConfigEntry
+
+private[spark] sealed trait KubernetesRoleSpecificConf
+
+/*
+ * Structure containing metadata for Kubernetes logic that builds a Spark driver.
+ */
+private[spark] case class KubernetesDriverSpecificConf(
+    mainAppResource: Option[MainAppResource],
+    mainClass: String,
+    appName: String,
+    appArgs: Seq[String]) extends KubernetesRoleSpecificConf
+
+/*
+ * Structure containing metadata for Kubernetes logic that builds a Spark executor.
+ */
+private[spark] case class KubernetesExecutorSpecificConf(
+    executorId: String,
+    driverPod: Pod)
+  extends KubernetesRoleSpecificConf
+
+/**
+ * Structure containing metadata for Kubernetes logic to build Spark pods.
+ */
+private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
+    sparkConf: SparkConf,
+    roleSpecificConf: T,
+    appResourceNamePrefix: String,
+    appId: String,
+    roleLabels: Map[String, String],
+    roleAnnotations: Map[String, String],
+    roleSecretNamesToMountPaths: Map[String, String],
+    roleEnvs: Map[String, String]) {
+
+  def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
+
+  def sparkJars(): Seq[String] = sparkConf
+    .getOption("spark.jars")
+    .map(str => str.split(",").toSeq)
+    .getOrElse(Seq.empty[String])
+
+  def sparkFiles(): Seq[String] = sparkConf
+    .getOption("spark.files")
+    .map(str => str.split(",").toSeq)
+    .getOrElse(Seq.empty[String])
+
+  def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
+
+  def imagePullSecrets(): Seq[LocalObjectReference] = {
+    sparkConf
+      .get(IMAGE_PULL_SECRETS)
+      .map(_.split(","))
+      .getOrElse(Array.empty[String])
+      .map(_.trim)
+      .map { secret =>
+        new LocalObjectReferenceBuilder().withName(secret).build()
+      }
+  }
+
+  def nodeSelector(): Map[String, String] =
+    KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)
+
+  def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)
+
+  def get(conf: String): String = sparkConf.get(conf)
+
+  def get(conf: String, defaultValue: String): String = sparkConf.get(conf, defaultValue)
+
+  def getOption(key: String): Option[String] = sparkConf.getOption(key)
+}
+
+private[spark] object KubernetesConf {
+  def createDriverConf(
+      sparkConf: SparkConf,
+      appName: String,
+      appResourceNamePrefix: String,
+      appId: String,
+      mainAppResource: Option[MainAppResource],
+      mainClass: String,
+      appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = {
+    val sparkConfWithMainAppJar = sparkConf.clone()
+    mainAppResource.foreach {
+      case JavaMainAppResource(res) =>
+        val previousJars = sparkConf
+          .getOption("spark.jars")
+          .map(_.split(","))
+          .getOrElse(Array.empty)
+        if (!previousJars.contains(res)) {
+          sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
+        }
+    }
+
+    val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
+      sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX)
+    require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " +
+      s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " +
+      "operations.")
+    require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with key " +
+      s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " +
+      "operations.")
+    val driverLabels = driverCustomLabels ++ Map(
+      SPARK_APP_ID_LABEL -> appId,
+      SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
+    val driverAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
+      sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX)
+    val driverSecretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
+      sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX)
+    val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs(
+      sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)
+
+    KubernetesConf(
+      sparkConfWithMainAppJar,
+      KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs),
+      appResourceNamePrefix,
+      appId,
+      driverLabels,
+      driverAnnotations,
+      driverSecretNamesToMountPaths,
+      driverEnvs)
+  }
+
+  def createExecutorConf(
+      sparkConf: SparkConf,
+      executorId: String,
+      appId: String,
+      driverPod: Pod): KubernetesConf[KubernetesExecutorSpecificConf] = {
+    val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
+      sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX)
+    require(
+      !executorCustomLabels.contains(SPARK_APP_ID_LABEL),
+      s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.")
+    require(
+      !executorCustomLabels.contains(SPARK_EXECUTOR_ID_LABEL),
+      s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" +
+        " Spark.")
+    require(
+      !executorCustomLabels.contains(SPARK_ROLE_LABEL),
+      s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.")
+    val executorLabels = Map(
+      SPARK_EXECUTOR_ID_LABEL -> executorId,
+      SPARK_APP_ID_LABEL -> appId,
+      SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++
+      executorCustomLabels
+    val executorAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
+      sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
+    val executorSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
+      sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
+    val executorEnv = sparkConf.getExecutorEnv.toMap
+
+    KubernetesConf(
+      sparkConf.clone(),
+      KubernetesExecutorSpecificConf(executorId, driverPod),
+      sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX),
+      appId,
+      executorLabels,
+      executorAnnotations,
+      executorSecrets,
+      executorEnv)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala
new file mode 100644
index 0000000..0c5ae02
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.HasMetadata
+
+private[spark] case class KubernetesDriverSpec(
+    pod: SparkPod,
+    driverKubernetesResources: Seq[HasMetadata],
+    systemProperties: Map[String, String])
+
+private[spark] object KubernetesDriverSpec {
+  def initialSpec(initialProps: Map[String, String]): KubernetesDriverSpec = KubernetesDriverSpec(
+    SparkPod.initialPod(),
+    Seq.empty,
+    initialProps)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index 5b2bb81..ee62906 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -37,17 +37,6 @@ private[spark] object KubernetesUtils {
     sparkConf.getAllWithPrefix(prefix).toMap
   }
 
-  /**
-   * Parses comma-separated list of imagePullSecrets into K8s-understandable format
-   */
-  def parseImagePullSecrets(imagePullSecrets: Option[String]): List[LocalObjectReference] = {
-    imagePullSecrets match {
-      case Some(secretsCommaSeparated) =>
-        secretsCommaSeparated.split(',').map(_.trim).map(new LocalObjectReference(_)).toList
-      case None => Nil
-    }
-  }
-
   def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
     opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala
deleted file mode 100644
index c35e7db..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s
-
-import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}
-
-/**
- * Bootstraps a driver or executor container or an init-container with needed secrets mounted.
- */
-private[spark] class MountSecretsBootstrap(secretNamesToMountPaths: Map[String, String]) {
-
-  /**
-   * Add new secret volumes for the secrets specified in secretNamesToMountPaths into the given pod.
-   *
-   * @param pod the pod into which the secret volumes are being added.
-   * @return the updated pod with the secret volumes added.
-   */
-  def addSecretVolumes(pod: Pod): Pod = {
-    var podBuilder = new PodBuilder(pod)
-    secretNamesToMountPaths.keys.foreach { name =>
-      podBuilder = podBuilder
-        .editOrNewSpec()
-          .addNewVolume()
-            .withName(secretVolumeName(name))
-            .withNewSecret()
-              .withSecretName(name)
-              .endSecret()
-            .endVolume()
-          .endSpec()
-    }
-
-    podBuilder.build()
-  }
-
-  /**
-   * Mounts Kubernetes secret volumes of the secrets specified in secretNamesToMountPaths into the
-   * given container.
-   *
-   * @param container the container into which the secret volumes are being mounted.
-   * @return the updated container with the secrets mounted.
-   */
-  def mountSecrets(container: Container): Container = {
-    var containerBuilder = new ContainerBuilder(container)
-    secretNamesToMountPaths.foreach { case (name, path) =>
-      containerBuilder = containerBuilder
-        .addNewVolumeMount()
-          .withName(secretVolumeName(name))
-          .withMountPath(path)
-          .endVolumeMount()
-    }
-
-    containerBuilder.build()
-  }
-
-  private def secretVolumeName(secretName: String): String = {
-    secretName + "-volume"
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala
new file mode 100644
index 0000000..345dd11
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}
+
+private[spark] case class SparkPod(pod: Pod, container: Container)
+
+private[spark] object SparkPod {
+  def initialPod(): SparkPod = {
+    SparkPod(
+      new PodBuilder()
+        .withNewMetadata()
+        .endMetadata()
+        .withNewSpec()
+        .endSpec()
+        .build(),
+      new ContainerBuilder().build())
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
new file mode 100644
index 0000000..07bdccb
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
+
+private[spark] class BasicDriverFeatureStep(
+    conf: KubernetesConf[KubernetesDriverSpecificConf])
+  extends KubernetesFeatureConfigStep {
+
+  private val driverPodName = conf
+    .get(KUBERNETES_DRIVER_POD_NAME)
+    .getOrElse(s"${conf.appResourceNamePrefix}-driver")
+
+  private val driverContainerImage = conf
+    .get(DRIVER_CONTAINER_IMAGE)
+    .getOrElse(throw new SparkException("Must specify the driver container image"))
+
+  // CPU settings
+  private val driverCpuCores = conf.get("spark.driver.cores", "1")
+  private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES)
+
+  // Memory settings
+  private val driverMemoryMiB = conf.get(DRIVER_MEMORY)
+  private val memoryOverheadMiB = conf
+    .get(DRIVER_MEMORY_OVERHEAD)
+    .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB))
+  private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB
+
+  override def configurePod(pod: SparkPod): SparkPod = {
+    val driverCustomEnvs = conf.roleEnvs
+      .toSeq
+      .map { env =>
+        new EnvVarBuilder()
+          .withName(env._1)
+          .withValue(env._2)
+          .build()
+      }
+
+    val driverCpuQuantity = new QuantityBuilder(false)
+      .withAmount(driverCpuCores)
+      .build()
+    val driverMemoryQuantity = new QuantityBuilder(false)
+      .withAmount(s"${driverMemoryWithOverheadMiB}Mi")
+      .build()
+    val maybeCpuLimitQuantity = driverLimitCores.map { limitCores =>
+      ("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
+    }
+
+    val driverContainer = new ContainerBuilder(pod.container)
+      .withName(DRIVER_CONTAINER_NAME)
+      .withImage(driverContainerImage)
+      .withImagePullPolicy(conf.imagePullPolicy())
+      .addAllToEnv(driverCustomEnvs.asJava)
+      .addNewEnv()
+        .withName(ENV_DRIVER_BIND_ADDRESS)
+        .withValueFrom(new EnvVarSourceBuilder()
+          .withNewFieldRef("v1", "status.podIP")
+          .build())
+        .endEnv()
+      .withNewResources()
+        .addToRequests("cpu", driverCpuQuantity)
+        .addToLimits(maybeCpuLimitQuantity.toMap.asJava)
+        .addToRequests("memory", driverMemoryQuantity)
+        .addToLimits("memory", driverMemoryQuantity)
+        .endResources()
+      .addToArgs("driver")
+      .addToArgs("--properties-file", SPARK_CONF_PATH)
+      .addToArgs("--class", conf.roleSpecificConf.mainClass)
+      // The user application jar is merged into the spark.jars list and managed through that
+      // property, so there is no need to reference it explicitly here.
+      .addToArgs(SparkLauncher.NO_RESOURCE)
+      .addToArgs(conf.roleSpecificConf.appArgs: _*)
+      .build()
+
+    val driverPod = new PodBuilder(pod.pod)
+      .editOrNewMetadata()
+        .withName(driverPodName)
+        .addToLabels(conf.roleLabels.asJava)
+        .addToAnnotations(conf.roleAnnotations.asJava)
+        .endMetadata()
+      .withNewSpec()
+        .withRestartPolicy("Never")
+        .withNodeSelector(conf.nodeSelector().asJava)
+        .addToImagePullSecrets(conf.imagePullSecrets(): _*)
+        .endSpec()
+      .build()
+    SparkPod(driverPod, driverContainer)
+  }
+
+  override def getAdditionalPodSystemProperties(): Map[String, String] = {
+    val additionalProps = mutable.Map(
+      KUBERNETES_DRIVER_POD_NAME.key -> driverPodName,
+      "spark.app.id" -> conf.appId,
+      KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.appResourceNamePrefix,
+      KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true")
+
+    val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath(
+      conf.sparkJars())
+    val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath(
+      conf.sparkFiles())
+    if (resolvedSparkJars.nonEmpty) {
+      additionalProps.put("spark.jars", resolvedSparkJars.mkString(","))
+    }
+    if (resolvedSparkFiles.nonEmpty) {
+      additionalProps.put("spark.files", resolvedSparkFiles.mkString(","))
+    }
+    additionalProps.toMap
+  }
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
new file mode 100644
index 0000000..d220975
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
+import org.apache.spark.rpc.RpcEndpointAddress
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+private[spark] class BasicExecutorFeatureStep(
+    kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf])
+  extends KubernetesFeatureConfigStep {
+
+  // Consider moving some of these fields to KubernetesConf or KubernetesExecutorSpecificConf
+  private val executorExtraClasspath = kubernetesConf.get(EXECUTOR_CLASS_PATH)
+  private val executorContainerImage = kubernetesConf
+    .get(EXECUTOR_CONTAINER_IMAGE)
+    .getOrElse(throw new SparkException("Must specify the executor container image"))
+  private val blockManagerPort = kubernetesConf
+    .sparkConf
+    .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
+
+  private val executorPodNamePrefix = kubernetesConf.appResourceNamePrefix
+
+  private val driverUrl = RpcEndpointAddress(
+    kubernetesConf.get("spark.driver.host"),
+    kubernetesConf.sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+    CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+  private val executorMemoryMiB = kubernetesConf.get(EXECUTOR_MEMORY)
+  private val executorMemoryString = kubernetesConf.get(
+    EXECUTOR_MEMORY.key, EXECUTOR_MEMORY.defaultValueString)
+
+  private val memoryOverheadMiB = kubernetesConf
+    .get(EXECUTOR_MEMORY_OVERHEAD)
+    .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
+      MEMORY_OVERHEAD_MIN_MIB))
+  private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB
+
+  private val executorCores = kubernetesConf.sparkConf.getInt("spark.executor.cores", 1)
+  private val executorCoresRequest =
+    if (kubernetesConf.sparkConf.contains(KUBERNETES_EXECUTOR_REQUEST_CORES)) {
+      kubernetesConf.get(KUBERNETES_EXECUTOR_REQUEST_CORES).get
+    } else {
+      executorCores.toString
+    }
+  private val executorLimitCores = kubernetesConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
+
+  override def configurePod(pod: SparkPod): SparkPod = {
+    val name = s"$executorPodNamePrefix-exec-${kubernetesConf.roleSpecificConf.executorId}"
+
+    // hostname must be no longer than 63 characters, so take the last 63 characters of the pod
+    // name as the hostname.  This preserves uniqueness since the end of name contains
+    // executorId
+    val hostname = name.substring(Math.max(0, name.length - 63))
+    val executorMemoryQuantity = new QuantityBuilder(false)
+      .withAmount(s"${executorMemoryWithOverhead}Mi")
+      .build()
+    val executorCpuQuantity = new QuantityBuilder(false)
+      .withAmount(executorCoresRequest)
+      .build()
+    val executorExtraClasspathEnv = executorExtraClasspath.map { cp =>
+      new EnvVarBuilder()
+        .withName(ENV_CLASSPATH)
+        .withValue(cp)
+        .build()
+    }
+    val executorExtraJavaOptionsEnv = kubernetesConf
+      .get(EXECUTOR_JAVA_OPTIONS)
+      .map { opts =>
+        val delimitedOpts = Utils.splitCommandString(opts)
+        delimitedOpts.zipWithIndex.map {
+          case (opt, index) =>
+            new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build()
+        }
+      }.getOrElse(Seq.empty[EnvVar])
+    val executorEnv = (Seq(
+      (ENV_DRIVER_URL, driverUrl),
+      (ENV_EXECUTOR_CORES, executorCores.toString),
+      (ENV_EXECUTOR_MEMORY, executorMemoryString),
+      (ENV_APPLICATION_ID, kubernetesConf.appId),
+      // This is to set the SPARK_CONF_DIR to be /opt/spark/conf
+      (ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL),
+      (ENV_EXECUTOR_ID, kubernetesConf.roleSpecificConf.executorId)) ++
+      kubernetesConf.roleEnvs)
+      .map(env => new EnvVarBuilder()
+        .withName(env._1)
+        .withValue(env._2)
+        .build()
+      ) ++ Seq(
+      new EnvVarBuilder()
+        .withName(ENV_EXECUTOR_POD_IP)
+        .withValueFrom(new EnvVarSourceBuilder()
+          .withNewFieldRef("v1", "status.podIP")
+          .build())
+        .build()
+    ) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq
+    val requiredPorts = Seq(
+      (BLOCK_MANAGER_PORT_NAME, blockManagerPort))
+      .map { case (name, port) =>
+        new ContainerPortBuilder()
+          .withName(name)
+          .withContainerPort(port)
+          .build()
+      }
+
+    val executorContainer = new ContainerBuilder(pod.container)
+      .withName("executor")
+      .withImage(executorContainerImage)
+      .withImagePullPolicy(kubernetesConf.imagePullPolicy())
+      .withNewResources()
+        .addToRequests("memory", executorMemoryQuantity)
+        .addToLimits("memory", executorMemoryQuantity)
+        .addToRequests("cpu", executorCpuQuantity)
+        .endResources()
+      .addAllToEnv(executorEnv.asJava)
+      .withPorts(requiredPorts.asJava)
+      .addToArgs("executor")
+      .build()
+    val containerWithLimitCores = executorLimitCores.map { limitCores =>
+      val executorCpuLimitQuantity = new QuantityBuilder(false)
+        .withAmount(limitCores)
+        .build()
+      new ContainerBuilder(executorContainer)
+        .editResources()
+          .addToLimits("cpu", executorCpuLimitQuantity)
+          .endResources()
+        .build()
+    }.getOrElse(executorContainer)
+    val driverPod = kubernetesConf.roleSpecificConf.driverPod
+    val executorPod = new PodBuilder(pod.pod)
+      .editOrNewMetadata()
+        .withName(name)
+        .withLabels(kubernetesConf.roleLabels.asJava)
+        .withAnnotations(kubernetesConf.roleAnnotations.asJava)
+        .withOwnerReferences()
+        .addNewOwnerReference()
+          .withController(true)
+          .withApiVersion(driverPod.getApiVersion)
+          .withKind(driverPod.getKind)
+          .withName(driverPod.getMetadata.getName)
+          .withUid(driverPod.getMetadata.getUid)
+          .endOwnerReference()
+        .endMetadata()
+      .editOrNewSpec()
+        .withHostname(hostname)
+        .withRestartPolicy("Never")
+        .withNodeSelector(kubernetesConf.nodeSelector().asJava)
+        .addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*)
+        .endSpec()
+      .build()
+    SparkPod(executorPod, containerWithLimitCores)
+  }
+
+  override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala
new file mode 100644
index 0000000..ff5ad66
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, Secret, SecretBuilder}
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+
+private[spark] class DriverKubernetesCredentialsFeatureStep(kubernetesConf: KubernetesConf[_])
+  extends KubernetesFeatureConfigStep {
+  // TODO clean up this class, and credentials in general. See also SparkKubernetesClientFactory.
+  // We should use a struct to hold all creds-related fields. A lot of the code is very repetitive.
+
+  private val maybeMountedOAuthTokenFile = kubernetesConf.getOption(
+    s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX")
+  private val maybeMountedClientKeyFile = kubernetesConf.getOption(
+    s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX")
+  private val maybeMountedClientCertFile = kubernetesConf.getOption(
+    s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX")
+  private val maybeMountedCaCertFile = kubernetesConf.getOption(
+    s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")
+  private val driverServiceAccount = kubernetesConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
+
+  private val oauthTokenBase64 = kubernetesConf
+    .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX")
+    .map { token =>
+      BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8))
+    }
+
+  private val caCertDataBase64 = safeFileConfToBase64(
+    s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+    "Driver CA cert file")
+  private val clientKeyDataBase64 = safeFileConfToBase64(
+    s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+    "Driver client key file")
+  private val clientCertDataBase64 = safeFileConfToBase64(
+    s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+    "Driver client cert file")
+
+  // TODO decide whether or not to apply this step entirely in the caller, i.e. the builder.
+  private val shouldMountSecret = oauthTokenBase64.isDefined ||
+    caCertDataBase64.isDefined ||
+    clientKeyDataBase64.isDefined ||
+    clientCertDataBase64.isDefined
+
+  private val driverCredentialsSecretName =
+    s"${kubernetesConf.appResourceNamePrefix}-kubernetes-credentials"
+
+  override def configurePod(pod: SparkPod): SparkPod = {
+    if (!shouldMountSecret) {
+      pod.copy(
+        pod = driverServiceAccount.map { account =>
+          new PodBuilder(pod.pod)
+            .editOrNewSpec()
+              .withServiceAccount(account)
+              .withServiceAccountName(account)
+              .endSpec()
+            .build()
+        }.getOrElse(pod.pod))
+    } else {
+      val driverPodWithMountedKubernetesCredentials =
+        new PodBuilder(pod.pod)
+          .editOrNewSpec()
+            .addNewVolume()
+              .withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+              .withNewSecret().withSecretName(driverCredentialsSecretName).endSecret()
+              .endVolume()
+          .endSpec()
+          .build()
+
+      val driverContainerWithMountedSecretVolume =
+        new ContainerBuilder(pod.container)
+          .addNewVolumeMount()
+            .withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+            .withMountPath(DRIVER_CREDENTIALS_SECRETS_BASE_DIR)
+            .endVolumeMount()
+          .build()
+      SparkPod(driverPodWithMountedKubernetesCredentials, driverContainerWithMountedSecretVolume)
+    }
+  }
+
+  override def getAdditionalPodSystemProperties(): Map[String, String] = {
+    val resolvedMountedOAuthTokenFile = resolveSecretLocation(
+      maybeMountedOAuthTokenFile,
+      oauthTokenBase64,
+      DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH)
+    val resolvedMountedClientKeyFile = resolveSecretLocation(
+      maybeMountedClientKeyFile,
+      clientKeyDataBase64,
+      DRIVER_CREDENTIALS_CLIENT_KEY_PATH)
+    val resolvedMountedClientCertFile = resolveSecretLocation(
+      maybeMountedClientCertFile,
+      clientCertDataBase64,
+      DRIVER_CREDENTIALS_CLIENT_CERT_PATH)
+    val resolvedMountedCaCertFile = resolveSecretLocation(
+      maybeMountedCaCertFile,
+      caCertDataBase64,
+      DRIVER_CREDENTIALS_CA_CERT_PATH)
+
+    val redactedTokens = kubernetesConf.sparkConf.getAll
+      .filter(_._1.endsWith(OAUTH_TOKEN_CONF_SUFFIX))
+      .toMap
+      .mapValues( _ => "<present_but_redacted>")
+    redactedTokens ++
+      resolvedMountedCaCertFile.map { file =>
+        Map(
+          s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX" ->
+            file)
+      }.getOrElse(Map.empty) ++
+      resolvedMountedClientKeyFile.map { file =>
+        Map(
+          s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX" ->
+            file)
+      }.getOrElse(Map.empty) ++
+      resolvedMountedClientCertFile.map { file =>
+        Map(
+          s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX" ->
+            file)
+      }.getOrElse(Map.empty) ++
+      resolvedMountedOAuthTokenFile.map { file =>
+        Map(
+          s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX" ->
+          file)
+      }.getOrElse(Map.empty)
+  }
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
+    if (shouldMountSecret) {
+      Seq(createCredentialsSecret())
+    } else {
+      Seq.empty
+    }
+  }
+
+  private def safeFileConfToBase64(conf: String, fileType: String): Option[String] = {
+    kubernetesConf.getOption(conf)
+      .map(new File(_))
+      .map { file =>
+        require(file.isFile, String.format("%s provided at %s does not exist or is not a file.",
+          fileType, file.getAbsolutePath))
+        BaseEncoding.base64().encode(Files.toByteArray(file))
+      }
+  }
+
+  /**
+   * Resolve a Kubernetes secret data entry from an optional client credential used by the
+   * driver to talk to the Kubernetes API server.
+   *
+   * @param userSpecifiedCredential the optional user-specified client credential.
+   * @param secretName name of the Kubernetes secret storing the client credential.
+   * @return a secret data entry in the form of a map from the secret name to the secret data,
+   *         which may be empty if the user-specified credential is empty.
+   */
+  private def resolveSecretData(
+    userSpecifiedCredential: Option[String],
+    secretName: String): Map[String, String] = {
+    userSpecifiedCredential.map { valueBase64 =>
+      Map(secretName -> valueBase64)
+    }.getOrElse(Map.empty[String, String])
+  }
+
+  private def resolveSecretLocation(
+    mountedUserSpecified: Option[String],
+    valueMountedFromSubmitter: Option[String],
+    mountedCanonicalLocation: String): Option[String] = {
+    mountedUserSpecified.orElse(valueMountedFromSubmitter.map { _ =>
+      mountedCanonicalLocation
+    })
+  }
+
+  private def createCredentialsSecret(): Secret = {
+    val allSecretData =
+      resolveSecretData(
+        clientKeyDataBase64,
+        DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME) ++
+        resolveSecretData(
+          clientCertDataBase64,
+          DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME) ++
+        resolveSecretData(
+          caCertDataBase64,
+          DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME) ++
+        resolveSecretData(
+          oauthTokenBase64,
+          DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME)
+
+    new SecretBuilder()
+      .withNewMetadata()
+        .withName(driverCredentialsSecretName)
+        .endMetadata()
+      .withData(allSecretData.asJava)
+      .build()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
new file mode 100644
index 0000000..f2d7bbd
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder}
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.{Clock, SystemClock}
+
+private[spark] class DriverServiceFeatureStep(
+    kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf],
+    clock: Clock = new SystemClock)
+  extends KubernetesFeatureConfigStep with Logging {
+  import DriverServiceFeatureStep._
+
+  require(kubernetesConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
+    s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's bind " +
+      "address is managed and set to the driver pod's IP address.")
+  require(kubernetesConf.getOption(DRIVER_HOST_KEY).isEmpty,
+    s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be " +
+      "managed via a Kubernetes service.")
+
+  private val preferredServiceName = s"${kubernetesConf.appResourceNamePrefix}$DRIVER_SVC_POSTFIX"
+  private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
+    preferredServiceName
+  } else {
+    val randomServiceId = clock.getTimeMillis()
+    val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
+    logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
+      s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
+      s"$shorterServiceName as the driver service's name.")
+    shorterServiceName
+  }
+
+  private val driverPort = kubernetesConf.sparkConf.getInt(
+    "spark.driver.port", DEFAULT_DRIVER_PORT)
+  private val driverBlockManagerPort = kubernetesConf.sparkConf.getInt(
+    org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT)
+
+  override def configurePod(pod: SparkPod): SparkPod = pod
+
+  override def getAdditionalPodSystemProperties(): Map[String, String] = {
+    val driverHostname = s"$resolvedServiceName.${kubernetesConf.namespace()}.svc"
+    Map(DRIVER_HOST_KEY -> driverHostname,
+      "spark.driver.port" -> driverPort.toString,
+      org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key ->
+        driverBlockManagerPort.toString)
+  }
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
+    val driverService = new ServiceBuilder()
+      .withNewMetadata()
+        .withName(resolvedServiceName)
+        .endMetadata()
+      .withNewSpec()
+        .withClusterIP("None")
+        .withSelector(kubernetesConf.roleLabels.asJava)
+        .addNewPort()
+          .withName(DRIVER_PORT_NAME)
+          .withPort(driverPort)
+          .withNewTargetPort(driverPort)
+          .endPort()
+        .addNewPort()
+          .withName(BLOCK_MANAGER_PORT_NAME)
+          .withPort(driverBlockManagerPort)
+          .withNewTargetPort(driverBlockManagerPort)
+          .endPort()
+        .endSpec()
+      .build()
+    Seq(driverService)
+  }
+}
+
+private[spark] object DriverServiceFeatureStep {
+  val DRIVER_BIND_ADDRESS_KEY = org.apache.spark.internal.config.DRIVER_BIND_ADDRESS.key
+  val DRIVER_HOST_KEY = org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key
+  val DRIVER_SVC_POSTFIX = "-driver-svc"
+  val MAX_SERVICE_NAME_LENGTH = 63
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala
new file mode 100644
index 0000000..4c1be3b
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.HasMetadata
+
+import org.apache.spark.deploy.k8s.SparkPod
+
+/**
+ * A collection of functions that together represent a "feature" in pods that are launched for
+ * Spark drivers and executors.
+ */
+private[spark] trait KubernetesFeatureConfigStep {
+
+  /**
+   * Apply modifications on the given pod in accordance to this feature. This can include attaching
+   * volumes, adding environment variables, and adding labels/annotations.
+   * <p>
+   * Note that we should return a SparkPod that keeps all of the properties of the passed SparkPod
+   * object. So this is correct:
+   * <pre>
+   * {@code val configuredPod = new PodBuilder(pod.pod)
+   *     .editSpec()
+   *     ...
+   *     .build()
+   *   val configuredContainer = new ContainerBuilder(pod.container)
+   *     ...
+   *     .build()
+   *   SparkPod(configuredPod, configuredContainer)
+   *  }
+   * </pre>
+   * This is incorrect:
+   * <pre>
+   * {@code val configuredPod = new PodBuilder() // Loses the original state
+   *     .editSpec()
+   *     ...
+   *     .build()
+   *   val configuredContainer = new ContainerBuilder() // Loses the original state
+   *     ...
+   *     .build()
+   *   SparkPod(configuredPod, configuredContainer)
+   *  }
+   * </pre>
+   */
+  def configurePod(pod: SparkPod): SparkPod
+
+  /**
+   * Return any system properties that should be set on the JVM in accordance to this feature.
+   */
+  def getAdditionalPodSystemProperties(): Map[String, String]
+
+  /**
+   * Return any additional Kubernetes resources that should be added to support this feature. Only
+   * applicable when creating the driver in cluster mode.
+   */
+  def getAdditionalKubernetesResources(): Seq[HasMetadata]
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStep.scala
new file mode 100644
index 0000000..97fa949
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStep.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMountBuilder}
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod}
+
+private[spark] class MountSecretsFeatureStep(
+    kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf])
+  extends KubernetesFeatureConfigStep {
+  override def configurePod(pod: SparkPod): SparkPod = {
+    val addedVolumes = kubernetesConf
+      .roleSecretNamesToMountPaths
+      .keys
+      .map(secretName =>
+        new VolumeBuilder()
+          .withName(secretVolumeName(secretName))
+          .withNewSecret()
+            .withSecretName(secretName)
+            .endSecret()
+          .build())
+    val podWithVolumes = new PodBuilder(pod.pod)
+      .editOrNewSpec()
+        .addToVolumes(addedVolumes.toSeq: _*)
+        .endSpec()
+      .build()
+    val addedVolumeMounts = kubernetesConf
+      .roleSecretNamesToMountPaths
+      .map {
+        case (secretName, mountPath) =>
+          new VolumeMountBuilder()
+            .withName(secretVolumeName(secretName))
+            .withMountPath(mountPath)
+            .build()
+      }
+    val containerWithMounts = new ContainerBuilder(pod.container)
+      .addToVolumeMounts(addedVolumeMounts.toSeq: _*)
+      .build()
+    SparkPod(podWithVolumes, containerWithMounts)
+  }
+
+  override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
+
+  private def secretVolumeName(secretName: String): String = s"$secretName-volume"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala
deleted file mode 100644
index b4d3f04..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit
-
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.steps._
-import org.apache.spark.launcher.SparkLauncher
-import org.apache.spark.util.SystemClock
-import org.apache.spark.util.Utils
-
-/**
- * Figures out and returns the complete ordered list of needed DriverConfigurationSteps to
- * configure the Spark driver pod. The returned steps will be applied one by one in the given
- * order to produce a final KubernetesDriverSpec that is used in KubernetesClientApplication
- * to construct and create the driver pod.
- */
-private[spark] class DriverConfigOrchestrator(
-    kubernetesAppId: String,
-    kubernetesResourceNamePrefix: String,
-    mainAppResource: Option[MainAppResource],
-    appName: String,
-    mainClass: String,
-    appArgs: Array[String],
-    sparkConf: SparkConf) {
-
-  // The resource name prefix is derived from the Spark application name, making it easy to connect
-  // the names of the Kubernetes resources from e.g. kubectl or the Kubernetes dashboard to the
-  // application the user submitted.
-
-  private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
-
-  def getAllConfigurationSteps: Seq[DriverConfigurationStep] = {
-    val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf,
-      KUBERNETES_DRIVER_LABEL_PREFIX)
-    require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " +
-      s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " +
-      "operations.")
-    require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with key " +
-      s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " +
-      "operations.")
-
-    val secretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf,
-      KUBERNETES_DRIVER_SECRETS_PREFIX)
-
-    val allDriverLabels = driverCustomLabels ++ Map(
-      SPARK_APP_ID_LABEL -> kubernetesAppId,
-      SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
-
-    val initialSubmissionStep = new BasicDriverConfigurationStep(
-      kubernetesAppId,
-      kubernetesResourceNamePrefix,
-      allDriverLabels,
-      imagePullPolicy,
-      appName,
-      mainClass,
-      appArgs,
-      sparkConf)
-
-    val serviceBootstrapStep = new DriverServiceBootstrapStep(
-      kubernetesResourceNamePrefix,
-      allDriverLabels,
-      sparkConf,
-      new SystemClock)
-
-    val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
-      sparkConf, kubernetesResourceNamePrefix)
-
-    val additionalMainAppJar = if (mainAppResource.nonEmpty) {
-       val mayBeResource = mainAppResource.get match {
-        case JavaMainAppResource(resource) if resource != SparkLauncher.NO_RESOURCE =>
-          Some(resource)
-        case _ => None
-      }
-      mayBeResource
-    } else {
-      None
-    }
-
-    val sparkJars = sparkConf.getOption("spark.jars")
-      .map(_.split(","))
-      .getOrElse(Array.empty[String]) ++
-      additionalMainAppJar.toSeq
-    val sparkFiles = sparkConf.getOption("spark.files")
-      .map(_.split(","))
-      .getOrElse(Array.empty[String])
-
-    // TODO(SPARK-23153): remove once submission client local dependencies are supported.
-    if (existSubmissionLocalFiles(sparkJars) || existSubmissionLocalFiles(sparkFiles)) {
-      throw new SparkException("The Kubernetes mode does not yet support referencing application " +
-        "dependencies in the local file system.")
-    }
-
-    val dependencyResolutionStep = if (sparkJars.nonEmpty || sparkFiles.nonEmpty) {
-      Seq(new DependencyResolutionStep(
-        sparkJars,
-        sparkFiles))
-    } else {
-      Nil
-    }
-
-    val mountSecretsStep = if (secretNamesToMountPaths.nonEmpty) {
-      Seq(new DriverMountSecretsStep(new MountSecretsBootstrap(secretNamesToMountPaths)))
-    } else {
-      Nil
-    }
-
-    Seq(
-      initialSubmissionStep,
-      serviceBootstrapStep,
-      kubernetesCredentialsStep) ++
-      dependencyResolutionStep ++
-      mountSecretsStep
-  }
-
-  private def existSubmissionLocalFiles(files: Seq[String]): Boolean = {
-    files.exists { uri =>
-      Utils.resolveURI(uri).getScheme == "file"
-    }
-  }
-
-  private def existNonContainerLocalFiles(files: Seq[String]): Boolean = {
-    files.exists { uri =>
-      Utils.resolveURI(uri).getScheme != "local"
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index e16d1ad..a97f565 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -27,12 +27,10 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkKubernetesClientFactory}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
-import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.ConfigBuilder
 import org.apache.spark.util.Utils
 
 /**
@@ -43,9 +41,9 @@ import org.apache.spark.util.Utils
  * @param driverArgs arguments to the driver
  */
 private[spark] case class ClientArguments(
-     mainAppResource: Option[MainAppResource],
-     mainClass: String,
-     driverArgs: Array[String])
+    mainAppResource: Option[MainAppResource],
+    mainClass: String,
+    driverArgs: Array[String])
 
 private[spark] object ClientArguments {
 
@@ -80,8 +78,9 @@ private[spark] object ClientArguments {
  * watcher that monitors and logs the application status. Waits for the application to terminate if
  * spark.kubernetes.submission.waitAppCompletion is true.
  *
- * @param submissionSteps steps that collectively configure the driver
- * @param sparkConf the submission client Spark configuration
+ * @param builder Responsible for building the base driver pod based on a composition of
+ *                implemented features.
+ * @param kubernetesConf application configuration
  * @param kubernetesClient the client to talk to the Kubernetes API server
  * @param waitForAppCompletion a flag indicating whether the client should wait for the application
  *                             to complete
@@ -89,31 +88,21 @@ private[spark] object ClientArguments {
  * @param watcher a watcher that monitors and logs the application status
  */
 private[spark] class Client(
-    submissionSteps: Seq[DriverConfigurationStep],
-    sparkConf: SparkConf,
+    builder: KubernetesDriverBuilder,
+    kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf],
     kubernetesClient: KubernetesClient,
     waitForAppCompletion: Boolean,
     appName: String,
     watcher: LoggingPodStatusWatcher,
     kubernetesResourceNamePrefix: String) extends Logging {
 
-   /**
-    * Run command that initializes a DriverSpec that will be updated after each
-    * DriverConfigurationStep in the sequence that is passed in. The final KubernetesDriverSpec
-    * will be used to build the Driver Container, Driver Pod, and Kubernetes Resources
-    */
   def run(): Unit = {
-    var currentDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf)
-    // submissionSteps contain steps necessary to take, to resolve varying
-    // client arguments that are passed in, created by orchestrator
-    for (nextStep <- submissionSteps) {
-      currentDriverSpec = nextStep.configureDriver(currentDriverSpec)
-    }
+    val resolvedDriverSpec = builder.buildFromFeatures(kubernetesConf)
     val configMapName = s"$kubernetesResourceNamePrefix-driver-conf-map"
-    val configMap = buildConfigMap(configMapName, currentDriverSpec.driverSparkConf)
+    val configMap = buildConfigMap(configMapName, resolvedDriverSpec.systemProperties)
     // The include of the ENV_VAR for "SPARK_CONF_DIR" is to allow for the
     // Spark command builder to pickup on the Java Options present in the ConfigMap
-    val resolvedDriverContainer = new ContainerBuilder(currentDriverSpec.driverContainer)
+    val resolvedDriverContainer = new ContainerBuilder(resolvedDriverSpec.pod.container)
       .addNewEnv()
         .withName(ENV_SPARK_CONF_DIR)
         .withValue(SPARK_CONF_DIR_INTERNAL)
@@ -123,7 +112,7 @@ private[spark] class Client(
         .withMountPath(SPARK_CONF_DIR_INTERNAL)
         .endVolumeMount()
       .build()
-    val resolvedDriverPod = new PodBuilder(currentDriverSpec.driverPod)
+    val resolvedDriverPod = new PodBuilder(resolvedDriverSpec.pod.pod)
       .editSpec()
         .addToContainers(resolvedDriverContainer)
         .addNewVolume()
@@ -141,12 +130,10 @@ private[spark] class Client(
         .watch(watcher)) { _ =>
       val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
       try {
-        if (currentDriverSpec.otherKubernetesResources.nonEmpty) {
-          val otherKubernetesResources =
-            currentDriverSpec.otherKubernetesResources ++ Seq(configMap)
-          addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
-          kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
-        }
+        val otherKubernetesResources =
+          resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
+        addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
+        kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
       } catch {
         case NonFatal(e) =>
           kubernetesClient.pods().delete(createdDriverPod)
@@ -180,20 +167,17 @@ private[spark] class Client(
   }
 
   // Build a Config Map that will house spark conf properties in a single file for spark-submit
-  private def buildConfigMap(configMapName: String, conf: SparkConf): ConfigMap = {
+  private def buildConfigMap(configMapName: String, conf: Map[String, String]): ConfigMap = {
     val properties = new Properties()
-    conf.getAll.foreach { case (k, v) =>
+    conf.foreach { case (k, v) =>
       properties.setProperty(k, v)
     }
     val propertiesWriter = new StringWriter()
     properties.store(propertiesWriter,
       s"Java properties built from Kubernetes config map with name: $configMapName")
-
-    val namespace = conf.get(KUBERNETES_NAMESPACE)
     new ConfigMapBuilder()
       .withNewMetadata()
         .withName(configMapName)
-        .withNamespace(namespace)
         .endMetadata()
       .addToData(SPARK_CONF_FILE_NAME, propertiesWriter.toString)
       .build()
@@ -211,7 +195,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
   }
 
   private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = {
-    val namespace = sparkConf.get(KUBERNETES_NAMESPACE)
+    val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
     // For constructing the app ID, we can't use the Spark application name, as the app ID is going
     // to be added as a label to group resources belonging to the same application. Label values are
     // considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate
@@ -219,10 +203,19 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
     val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
     val launchTime = System.currentTimeMillis()
     val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
-    val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
     val kubernetesResourceNamePrefix = {
       s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
     }
+    val kubernetesConf = KubernetesConf.createDriverConf(
+      sparkConf,
+      appName,
+      kubernetesResourceNamePrefix,
+      kubernetesAppId,
+      clientArguments.mainAppResource,
+      clientArguments.mainClass,
+      clientArguments.driverArgs)
+    val builder = new KubernetesDriverBuilder
+    val namespace = kubernetesConf.namespace()
     // The master URL has been checked for validity already in SparkSubmit.
     // We just need to get rid of the "k8s://" prefix here.
     val master = sparkConf.get("spark.master").substring("k8s://".length)
@@ -230,15 +223,6 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
 
     val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval)
 
-    val orchestrator = new DriverConfigOrchestrator(
-      kubernetesAppId,
-      kubernetesResourceNamePrefix,
-      clientArguments.mainAppResource,
-      appName,
-      clientArguments.mainClass,
-      clientArguments.driverArgs,
-      sparkConf)
-
     Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
       master,
       Some(namespace),
@@ -247,8 +231,8 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
       None,
       None)) { kubernetesClient =>
         val client = new Client(
-          orchestrator.getAllConfigurationSteps,
-          sparkConf,
+          builder,
+          kubernetesConf,
           kubernetesClient,
           waitForAppCompletion,
           appName,

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
new file mode 100644
index 0000000..c7579ed
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf}
+import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, MountSecretsFeatureStep}
+
+private[spark] class KubernetesDriverBuilder(
+    provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => BasicDriverFeatureStep =
+      new BasicDriverFeatureStep(_),
+    provideCredentialsStep: (KubernetesConf[KubernetesDriverSpecificConf])
+      => DriverKubernetesCredentialsFeatureStep =
+      new DriverKubernetesCredentialsFeatureStep(_),
+    provideServiceStep: (KubernetesConf[KubernetesDriverSpecificConf]) => DriverServiceFeatureStep =
+      new DriverServiceFeatureStep(_),
+    provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
+      => MountSecretsFeatureStep) =
+      new MountSecretsFeatureStep(_)) {
+
+  def buildFromFeatures(
+    kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = {
+    val baseFeatures = Seq(
+      provideBasicStep(kubernetesConf),
+      provideCredentialsStep(kubernetesConf),
+      provideServiceStep(kubernetesConf))
+    val allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
+      baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
+    } else baseFeatures
+
+    var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap)
+    for (feature <- allFeatures) {
+      val configuredPod = feature.configurePod(spec.pod)
+      val addedSystemProperties = feature.getAdditionalPodSystemProperties()
+      val addedResources = feature.getAdditionalKubernetesResources()
+      spec = KubernetesDriverSpec(
+        configuredPod,
+        spec.driverKubernetesResources ++ addedResources,
+        spec.systemProperties ++ addedSystemProperties)
+    }
+    spec
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverSpec.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverSpec.scala
deleted file mode 100644
index db13f09..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverSpec.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit
-
-import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, HasMetadata, Pod, PodBuilder}
-
-import org.apache.spark.SparkConf
-
-/**
- * Represents the components and characteristics of a Spark driver. The driver can be considered
- * as being comprised of the driver pod itself, any other Kubernetes resources that the driver
- * pod depends on, and the SparkConf that should be supplied to the Spark application. The driver
- * container should be operated on via the specific field of this case class as opposed to trying
- * to edit the container directly on the pod. The driver container should be attached at the
- * end of executing all submission steps.
- */
-private[spark] case class KubernetesDriverSpec(
-    driverPod: Pod,
-    driverContainer: Container,
-    otherKubernetesResources: Seq[HasMetadata],
-    driverSparkConf: SparkConf)
-
-private[spark] object KubernetesDriverSpec {
-  def initialSpec(initialSparkConf: SparkConf): KubernetesDriverSpec = {
-    KubernetesDriverSpec(
-      // Set new metadata and a new spec so that submission steps can use
-      // PodBuilder#editMetadata() and/or PodBuilder#editSpec() safely.
-      new PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build(),
-      new ContainerBuilder().build(),
-      Seq.empty[HasMetadata],
-      initialSparkConf.clone())
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
deleted file mode 100644
index fcb1db8..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model._
-
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.KubernetesUtils
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-import org.apache.spark.internal.config.{DRIVER_CLASS_PATH, DRIVER_MEMORY, DRIVER_MEMORY_OVERHEAD}
-import org.apache.spark.launcher.SparkLauncher
-
-/**
- * Performs basic configuration for the driver pod.
- */
-private[spark] class BasicDriverConfigurationStep(
-    kubernetesAppId: String,
-    resourceNamePrefix: String,
-    driverLabels: Map[String, String],
-    imagePullPolicy: String,
-    appName: String,
-    mainClass: String,
-    appArgs: Array[String],
-    sparkConf: SparkConf) extends DriverConfigurationStep {
-
-  private val driverPodName = sparkConf
-    .get(KUBERNETES_DRIVER_POD_NAME)
-    .getOrElse(s"$resourceNamePrefix-driver")
-
-  private val driverExtraClasspath = sparkConf.get(DRIVER_CLASS_PATH)
-
-  private val driverContainerImage = sparkConf
-    .get(DRIVER_CONTAINER_IMAGE)
-    .getOrElse(throw new SparkException("Must specify the driver container image"))
-
-  private val imagePullSecrets = sparkConf.get(IMAGE_PULL_SECRETS)
-
-  // CPU settings
-  private val driverCpuCores = sparkConf.getOption("spark.driver.cores").getOrElse("1")
-  private val driverLimitCores = sparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
-
-  // Memory settings
-  private val driverMemoryMiB = sparkConf.get(DRIVER_MEMORY)
-  private val memoryOverheadMiB = sparkConf
-    .get(DRIVER_MEMORY_OVERHEAD)
-    .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB))
-  private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB
-
-  override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
-    val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
-      new EnvVarBuilder()
-        .withName(ENV_CLASSPATH)
-        .withValue(classPath)
-        .build()
-    }
-
-    val driverCustomAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX)
-    require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION),
-      s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" +
-        " Spark bookkeeping operations.")
-
-    val driverCustomEnvs = sparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
-      .map { env =>
-        new EnvVarBuilder()
-          .withName(env._1)
-          .withValue(env._2)
-          .build()
-      }
-
-    val driverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName)
-
-    val nodeSelector = KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)
-
-    val driverCpuQuantity = new QuantityBuilder(false)
-      .withAmount(driverCpuCores)
-      .build()
-    val driverMemoryQuantity = new QuantityBuilder(false)
-      .withAmount(s"${driverMemoryWithOverheadMiB}Mi")
-      .build()
-    val maybeCpuLimitQuantity = driverLimitCores.map { limitCores =>
-      ("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
-    }
-
-    val driverContainerWithoutArgs = new ContainerBuilder(driverSpec.driverContainer)
-      .withName(DRIVER_CONTAINER_NAME)
-      .withImage(driverContainerImage)
-      .withImagePullPolicy(imagePullPolicy)
-      .addAllToEnv(driverCustomEnvs.asJava)
-      .addToEnv(driverExtraClasspathEnv.toSeq: _*)
-      .addNewEnv()
-        .withName(ENV_DRIVER_BIND_ADDRESS)
-        .withValueFrom(new EnvVarSourceBuilder()
-          .withNewFieldRef("v1", "status.podIP")
-          .build())
-        .endEnv()
-      .withNewResources()
-        .addToRequests("cpu", driverCpuQuantity)
-        .addToRequests("memory", driverMemoryQuantity)
-        .addToLimits("memory", driverMemoryQuantity)
-        .addToLimits(maybeCpuLimitQuantity.toMap.asJava)
-        .endResources()
-      .addToArgs("driver")
-      .addToArgs("--properties-file", SPARK_CONF_PATH)
-      .addToArgs("--class", mainClass)
-      // The user application jar is merged into the spark.jars list and managed through that
-      // property, so there is no need to reference it explicitly here.
-      .addToArgs(SparkLauncher.NO_RESOURCE)
-
-    val driverContainer = appArgs.toList match {
-      case "" :: Nil | Nil => driverContainerWithoutArgs.build()
-      case _ => driverContainerWithoutArgs.addToArgs(appArgs: _*).build()
-    }
-
-    val parsedImagePullSecrets = KubernetesUtils.parseImagePullSecrets(imagePullSecrets)
-
-    val baseDriverPod = new PodBuilder(driverSpec.driverPod)
-      .editOrNewMetadata()
-        .withName(driverPodName)
-        .addToLabels(driverLabels.asJava)
-        .addToAnnotations(driverAnnotations.asJava)
-      .endMetadata()
-      .withNewSpec()
-        .withRestartPolicy("Never")
-        .withNodeSelector(nodeSelector.asJava)
-        .withImagePullSecrets(parsedImagePullSecrets.asJava)
-        .endSpec()
-      .build()
-
-    val resolvedSparkConf = driverSpec.driverSparkConf.clone()
-      .setIfMissing(KUBERNETES_DRIVER_POD_NAME, driverPodName)
-      .set("spark.app.id", kubernetesAppId)
-      .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, resourceNamePrefix)
-      // to set the config variables to allow client-mode spark-submit from driver
-      .set(KUBERNETES_DRIVER_SUBMIT_CHECK, true)
-
-    driverSpec.copy(
-      driverPod = baseDriverPod,
-      driverSparkConf = resolvedSparkConf,
-      driverContainer = driverContainer)
-  }
-
-}
-


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org