You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fo...@apache.org on 2018/04/13 15:44:21 UTC

[1/3] spark git commit: [SPARK-22839][K8S] Refactor to unify driver and executor pod builder APIs

Repository: spark
Updated Branches:
  refs/heads/master 0323e6146 -> a83ae0d9b


http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
new file mode 100644
index 0000000..9d02f56
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.PodBuilder
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SecretVolumeUtils, SparkPod}
+
+class MountSecretsFeatureStepSuite extends SparkFunSuite {
+
+  private val SECRET_FOO = "foo"
+  private val SECRET_BAR = "bar"
+  private val SECRET_MOUNT_PATH = "/etc/secrets/driver"
+
+  test("mounts all given secrets") {
+    val baseDriverPod = SparkPod.initialPod()
+    val secretNamesToMountPaths = Map(
+      SECRET_FOO -> SECRET_MOUNT_PATH,
+      SECRET_BAR -> SECRET_MOUNT_PATH)
+    val sparkConf = new SparkConf(false)
+    val kubernetesConf = KubernetesConf(
+      sparkConf,
+      KubernetesExecutorSpecificConf("1", new PodBuilder().build()),
+      "resource-name-prefix",
+      "app-id",
+      Map.empty,
+      Map.empty,
+      secretNamesToMountPaths,
+      Map.empty)
+
+    val step = new MountSecretsFeatureStep(kubernetesConf)
+    val driverPodWithSecretsMounted = step.configurePod(baseDriverPod).pod
+    val driverContainerWithSecretsMounted = step.configurePod(baseDriverPod).container
+
+    Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach { volumeName =>
+      assert(SecretVolumeUtils.podHasVolume(driverPodWithSecretsMounted, volumeName))
+    }
+    Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach { volumeName =>
+      assert(SecretVolumeUtils.containerHasVolume(
+        driverContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index 6a50159..c1b203e 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -16,22 +16,17 @@
  */
 package org.apache.spark.deploy.k8s.submit
 
-import scala.collection.JavaConverters._
-
-import com.google.common.collect.Iterables
 import io.fabric8.kubernetes.api.model._
 import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
 import io.fabric8.kubernetes.client.dsl.{MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource}
 import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
 import org.mockito.Mockito.{doReturn, verify, when}
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
 import org.scalatest.BeforeAndAfter
 import org.scalatest.mockito.MockitoSugar._
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, SparkPod}
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
 
 class ClientSuite extends SparkFunSuite with BeforeAndAfter {
 
@@ -39,6 +34,74 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
   private val DRIVER_POD_API_VERSION = "v1"
   private val DRIVER_POD_KIND = "pod"
   private val KUBERNETES_RESOURCE_PREFIX = "resource-example"
+  private val POD_NAME = "driver"
+  private val CONTAINER_NAME = "container"
+  private val APP_ID = "app-id"
+  private val APP_NAME = "app"
+  private val MAIN_CLASS = "main"
+  private val APP_ARGS = Seq("arg1", "arg2")
+  private val RESOLVED_JAVA_OPTIONS = Map(
+    "conf1key" -> "conf1value",
+    "conf2key" -> "conf2value")
+  private val BUILT_DRIVER_POD =
+    new PodBuilder()
+      .withNewMetadata()
+        .withName(POD_NAME)
+        .endMetadata()
+      .withNewSpec()
+        .withHostname("localhost")
+        .endSpec()
+      .build()
+  private val BUILT_DRIVER_CONTAINER = new ContainerBuilder().withName(CONTAINER_NAME).build()
+  private val ADDITIONAL_RESOURCES = Seq(
+    new SecretBuilder().withNewMetadata().withName("secret").endMetadata().build())
+
+  private val BUILT_KUBERNETES_SPEC = KubernetesDriverSpec(
+    SparkPod(BUILT_DRIVER_POD, BUILT_DRIVER_CONTAINER),
+    ADDITIONAL_RESOURCES,
+    RESOLVED_JAVA_OPTIONS)
+
+  private val FULL_EXPECTED_CONTAINER = new ContainerBuilder(BUILT_DRIVER_CONTAINER)
+    .addNewEnv()
+      .withName(ENV_SPARK_CONF_DIR)
+      .withValue(SPARK_CONF_DIR_INTERNAL)
+      .endEnv()
+    .addNewVolumeMount()
+      .withName(SPARK_CONF_VOLUME)
+      .withMountPath(SPARK_CONF_DIR_INTERNAL)
+      .endVolumeMount()
+    .build()
+  private val FULL_EXPECTED_POD = new PodBuilder(BUILT_DRIVER_POD)
+    .editSpec()
+      .addToContainers(FULL_EXPECTED_CONTAINER)
+      .addNewVolume()
+        .withName(SPARK_CONF_VOLUME)
+        .withNewConfigMap().withName(s"$KUBERNETES_RESOURCE_PREFIX-driver-conf-map").endConfigMap()
+        .endVolume()
+      .endSpec()
+    .build()
+
+  private val POD_WITH_OWNER_REFERENCE = new PodBuilder(FULL_EXPECTED_POD)
+    .editMetadata()
+      .withUid(DRIVER_POD_UID)
+      .endMetadata()
+    .withApiVersion(DRIVER_POD_API_VERSION)
+    .withKind(DRIVER_POD_KIND)
+    .build()
+
+  private val ADDITIONAL_RESOURCES_WITH_OWNER_REFERENCES = ADDITIONAL_RESOURCES.map { secret =>
+    new SecretBuilder(secret)
+      .editMetadata()
+        .addNewOwnerReference()
+          .withName(POD_NAME)
+          .withApiVersion(DRIVER_POD_API_VERSION)
+          .withKind(DRIVER_POD_KIND)
+          .withController(true)
+          .withUid(DRIVER_POD_UID)
+          .endOwnerReference()
+        .endMetadata()
+      .build()
+  }
 
   private type ResourceList = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[
       HasMetadata, Boolean]
@@ -57,112 +120,85 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
   private var loggingPodStatusWatcher: LoggingPodStatusWatcher = _
 
   @Mock
+  private var driverBuilder: KubernetesDriverBuilder = _
+
+  @Mock
   private var resourceList: ResourceList = _
 
-  private val submissionSteps = Seq(FirstTestConfigurationStep, SecondTestConfigurationStep)
+  private var kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf] = _
+
+  private var sparkConf: SparkConf = _
   private var createdPodArgumentCaptor: ArgumentCaptor[Pod] = _
   private var createdResourcesArgumentCaptor: ArgumentCaptor[HasMetadata] = _
-  private var createdContainerArgumentCaptor: ArgumentCaptor[Container] = _
 
   before {
     MockitoAnnotations.initMocks(this)
+    sparkConf = new SparkConf(false)
+    kubernetesConf = KubernetesConf[KubernetesDriverSpecificConf](
+      sparkConf,
+      KubernetesDriverSpecificConf(None, MAIN_CLASS, APP_NAME, APP_ARGS),
+      KUBERNETES_RESOURCE_PREFIX,
+      APP_ID,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty)
+    when(driverBuilder.buildFromFeatures(kubernetesConf)).thenReturn(BUILT_KUBERNETES_SPEC)
     when(kubernetesClient.pods()).thenReturn(podOperations)
-    when(podOperations.withName(FirstTestConfigurationStep.podName)).thenReturn(namedPods)
+    when(podOperations.withName(POD_NAME)).thenReturn(namedPods)
 
     createdPodArgumentCaptor = ArgumentCaptor.forClass(classOf[Pod])
     createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata])
-    when(podOperations.create(createdPodArgumentCaptor.capture())).thenAnswer(new Answer[Pod] {
-      override def answer(invocation: InvocationOnMock): Pod = {
-        new PodBuilder(invocation.getArgumentAt(0, classOf[Pod]))
-          .editMetadata()
-            .withUid(DRIVER_POD_UID)
-            .endMetadata()
-          .withApiVersion(DRIVER_POD_API_VERSION)
-          .withKind(DRIVER_POD_KIND)
-          .build()
-      }
-    })
-    when(podOperations.withName(FirstTestConfigurationStep.podName)).thenReturn(namedPods)
+    when(podOperations.create(FULL_EXPECTED_POD)).thenReturn(POD_WITH_OWNER_REFERENCE)
     when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch])
     doReturn(resourceList)
       .when(kubernetesClient)
       .resourceList(createdResourcesArgumentCaptor.capture())
   }
 
-  test("The client should configure the pod with the submission steps.") {
+  test("The client should configure the pod using the builder.") {
     val submissionClient = new Client(
-      submissionSteps,
-      new SparkConf(false),
+      driverBuilder,
+      kubernetesConf,
       kubernetesClient,
       false,
       "spark",
       loggingPodStatusWatcher,
       KUBERNETES_RESOURCE_PREFIX)
     submissionClient.run()
-    val createdPod = createdPodArgumentCaptor.getValue
-    assert(createdPod.getMetadata.getName === FirstTestConfigurationStep.podName)
-    assert(createdPod.getMetadata.getLabels.asScala ===
-      Map(FirstTestConfigurationStep.labelKey -> FirstTestConfigurationStep.labelValue))
-    assert(createdPod.getMetadata.getAnnotations.asScala ===
-      Map(SecondTestConfigurationStep.annotationKey ->
-        SecondTestConfigurationStep.annotationValue))
-    assert(createdPod.getSpec.getContainers.size() === 1)
-    assert(createdPod.getSpec.getContainers.get(0).getName ===
-      SecondTestConfigurationStep.containerName)
+    verify(podOperations).create(FULL_EXPECTED_POD)
   }
 
   test("The client should create Kubernetes resources") {
-    val EXAMPLE_JAVA_OPTS = "-XX:+HeapDumpOnOutOfMemoryError -XX:+PrintGCDetails"
-    val EXPECTED_JAVA_OPTS = "-XX\\:+HeapDumpOnOutOfMemoryError -XX\\:+PrintGCDetails"
     val submissionClient = new Client(
-      submissionSteps,
-      new SparkConf(false)
-        .set(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS, EXAMPLE_JAVA_OPTS),
+      driverBuilder,
+      kubernetesConf,
       kubernetesClient,
       false,
       "spark",
       loggingPodStatusWatcher,
       KUBERNETES_RESOURCE_PREFIX)
     submissionClient.run()
-    val createdPod = createdPodArgumentCaptor.getValue
     val otherCreatedResources = createdResourcesArgumentCaptor.getAllValues
     assert(otherCreatedResources.size === 2)
-    val secrets = otherCreatedResources.toArray
-      .filter(_.isInstanceOf[Secret]).map(_.asInstanceOf[Secret])
+    val secrets = otherCreatedResources.toArray.filter(_.isInstanceOf[Secret]).toSeq
+    assert(secrets === ADDITIONAL_RESOURCES_WITH_OWNER_REFERENCES)
     val configMaps = otherCreatedResources.toArray
       .filter(_.isInstanceOf[ConfigMap]).map(_.asInstanceOf[ConfigMap])
     assert(secrets.nonEmpty)
-    val secret = secrets.head
-    assert(secret.getMetadata.getName === FirstTestConfigurationStep.secretName)
-    assert(secret.getData.asScala ===
-      Map(FirstTestConfigurationStep.secretKey -> FirstTestConfigurationStep.secretData))
-    val ownerReference = Iterables.getOnlyElement(secret.getMetadata.getOwnerReferences)
-    assert(ownerReference.getName === createdPod.getMetadata.getName)
-    assert(ownerReference.getKind === DRIVER_POD_KIND)
-    assert(ownerReference.getUid === DRIVER_POD_UID)
-    assert(ownerReference.getApiVersion === DRIVER_POD_API_VERSION)
     assert(configMaps.nonEmpty)
     val configMap = configMaps.head
     assert(configMap.getMetadata.getName ===
       s"$KUBERNETES_RESOURCE_PREFIX-driver-conf-map")
     assert(configMap.getData.containsKey(SPARK_CONF_FILE_NAME))
-    assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains(EXPECTED_JAVA_OPTS))
-    assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains(
-      "spark.custom-conf=custom-conf-value"))
-    val driverContainer = Iterables.getOnlyElement(createdPod.getSpec.getContainers)
-    assert(driverContainer.getName === SecondTestConfigurationStep.containerName)
-    val driverEnv = driverContainer.getEnv.asScala.head
-    assert(driverEnv.getName === ENV_SPARK_CONF_DIR)
-    assert(driverEnv.getValue === SPARK_CONF_DIR_INTERNAL)
-    val driverMount = driverContainer.getVolumeMounts.asScala.head
-    assert(driverMount.getName === SPARK_CONF_VOLUME)
-    assert(driverMount.getMountPath === SPARK_CONF_DIR_INTERNAL)
+    assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains("conf1key=conf1value"))
+    assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains("conf2key=conf2value"))
   }
 
   test("Waiting for app completion should stall on the watcher") {
     val submissionClient = new Client(
-      submissionSteps,
-      new SparkConf(false),
+      driverBuilder,
+      kubernetesConf,
       kubernetesClient,
       true,
       "spark",
@@ -171,56 +207,4 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
     submissionClient.run()
     verify(loggingPodStatusWatcher).awaitCompletion()
   }
-
-}
-
-private object FirstTestConfigurationStep extends DriverConfigurationStep {
-
-  val podName = "test-pod"
-  val secretName = "test-secret"
-  val labelKey = "first-submit"
-  val labelValue = "true"
-  val secretKey = "secretKey"
-  val secretData = "secretData"
-
-  override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
-    val modifiedPod = new PodBuilder(driverSpec.driverPod)
-      .editMetadata()
-      .withName(podName)
-      .addToLabels(labelKey, labelValue)
-      .endMetadata()
-      .build()
-    val additionalResource = new SecretBuilder()
-      .withNewMetadata()
-      .withName(secretName)
-      .endMetadata()
-      .addToData(secretKey, secretData)
-      .build()
-    driverSpec.copy(
-      driverPod = modifiedPod,
-      otherKubernetesResources = driverSpec.otherKubernetesResources ++ Seq(additionalResource))
-  }
-}
-
-private object SecondTestConfigurationStep extends DriverConfigurationStep {
-  val annotationKey = "second-submit"
-  val annotationValue = "submitted"
-  val sparkConfKey = "spark.custom-conf"
-  val sparkConfValue = "custom-conf-value"
-  val containerName = "driverContainer"
-  override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
-    val modifiedPod = new PodBuilder(driverSpec.driverPod)
-      .editMetadata()
-        .addToAnnotations(annotationKey, annotationValue)
-        .endMetadata()
-      .build()
-    val resolvedSparkConf = driverSpec.driverSparkConf.clone().set(sparkConfKey, sparkConfValue)
-    val modifiedContainer = new ContainerBuilder(driverSpec.driverContainer)
-      .withName(containerName)
-      .build()
-    driverSpec.copy(
-      driverPod = modifiedPod,
-      driverSparkConf = resolvedSparkConf,
-      driverContainer = modifiedContainer)
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala
deleted file mode 100644
index df34d2d..0000000
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit
-
-import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.submit.steps._
-
-class DriverConfigOrchestratorSuite extends SparkFunSuite {
-
-  private val DRIVER_IMAGE = "driver-image"
-  private val IC_IMAGE = "init-container-image"
-  private val APP_ID = "spark-app-id"
-  private val KUBERNETES_RESOURCE_PREFIX = "example-prefix"
-  private val APP_NAME = "spark"
-  private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
-  private val APP_ARGS = Array("arg1", "arg2")
-  private val SECRET_FOO = "foo"
-  private val SECRET_BAR = "bar"
-  private val SECRET_MOUNT_PATH = "/etc/secrets/driver"
-
-  test("Base submission steps with a main app resource.") {
-    val sparkConf = new SparkConf(false).set(CONTAINER_IMAGE, DRIVER_IMAGE)
-    val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar")
-    val orchestrator = new DriverConfigOrchestrator(
-      APP_ID,
-      KUBERNETES_RESOURCE_PREFIX,
-      Some(mainAppResource),
-      APP_NAME,
-      MAIN_CLASS,
-      APP_ARGS,
-      sparkConf)
-    validateStepTypes(
-      orchestrator,
-      classOf[BasicDriverConfigurationStep],
-      classOf[DriverServiceBootstrapStep],
-      classOf[DriverKubernetesCredentialsStep],
-      classOf[DependencyResolutionStep])
-  }
-
-  test("Base submission steps without a main app resource.") {
-    val sparkConf = new SparkConf(false).set(CONTAINER_IMAGE, DRIVER_IMAGE)
-    val orchestrator = new DriverConfigOrchestrator(
-      APP_ID,
-      KUBERNETES_RESOURCE_PREFIX,
-      Option.empty,
-      APP_NAME,
-      MAIN_CLASS,
-      APP_ARGS,
-      sparkConf)
-    validateStepTypes(
-      orchestrator,
-      classOf[BasicDriverConfigurationStep],
-      classOf[DriverServiceBootstrapStep],
-      classOf[DriverKubernetesCredentialsStep])
-  }
-
-  test("Submission steps with driver secrets to mount") {
-    val sparkConf = new SparkConf(false)
-      .set(CONTAINER_IMAGE, DRIVER_IMAGE)
-      .set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_FOO", SECRET_MOUNT_PATH)
-      .set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_BAR", SECRET_MOUNT_PATH)
-    val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar")
-    val orchestrator = new DriverConfigOrchestrator(
-      APP_ID,
-      KUBERNETES_RESOURCE_PREFIX,
-      Some(mainAppResource),
-      APP_NAME,
-      MAIN_CLASS,
-      APP_ARGS,
-      sparkConf)
-    validateStepTypes(
-      orchestrator,
-      classOf[BasicDriverConfigurationStep],
-      classOf[DriverServiceBootstrapStep],
-      classOf[DriverKubernetesCredentialsStep],
-      classOf[DependencyResolutionStep],
-      classOf[DriverMountSecretsStep])
-  }
-
-  test("Submission using client local dependencies") {
-    val sparkConf = new SparkConf(false)
-      .set(CONTAINER_IMAGE, DRIVER_IMAGE)
-    var orchestrator = new DriverConfigOrchestrator(
-      APP_ID,
-      KUBERNETES_RESOURCE_PREFIX,
-      Some(JavaMainAppResource("file:///var/apps/jars/main.jar")),
-      APP_NAME,
-      MAIN_CLASS,
-      APP_ARGS,
-      sparkConf)
-    assertThrows[SparkException] {
-      orchestrator.getAllConfigurationSteps
-    }
-
-    sparkConf.set("spark.files", "/path/to/file1,/path/to/file2")
-    orchestrator = new DriverConfigOrchestrator(
-      APP_ID,
-      KUBERNETES_RESOURCE_PREFIX,
-      Some(JavaMainAppResource("local:///var/apps/jars/main.jar")),
-      APP_NAME,
-      MAIN_CLASS,
-      APP_ARGS,
-      sparkConf)
-    assertThrows[SparkException] {
-      orchestrator.getAllConfigurationSteps
-    }
-  }
-
-  private def validateStepTypes(
-      orchestrator: DriverConfigOrchestrator,
-      types: Class[_ <: DriverConfigurationStep]*): Unit = {
-    val steps = orchestrator.getAllConfigurationSteps
-    assert(steps.size === types.size)
-    assert(steps.map(_.getClass) === types)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
new file mode 100644
index 0000000..161f9af
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf}
+import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, KubernetesFeaturesTestUtils, MountSecretsFeatureStep}
+
+class KubernetesDriverBuilderSuite extends SparkFunSuite {
+
+  private val BASIC_STEP_TYPE = "basic"
+  private val CREDENTIALS_STEP_TYPE = "credentials"
+  private val SERVICE_STEP_TYPE = "service"
+  private val SECRETS_STEP_TYPE = "mount-secrets"
+
+  private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+    BASIC_STEP_TYPE, classOf[BasicDriverFeatureStep])
+
+  private val credentialsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+    CREDENTIALS_STEP_TYPE, classOf[DriverKubernetesCredentialsFeatureStep])
+
+  private val serviceStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+    SERVICE_STEP_TYPE, classOf[DriverServiceFeatureStep])
+
+  private val secretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+    SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep])
+
+  private val builderUnderTest: KubernetesDriverBuilder =
+    new KubernetesDriverBuilder(
+      _ => basicFeatureStep,
+      _ => credentialsStep,
+      _ => serviceStep,
+      _ => secretsStep)
+
+  test("Apply fundamental steps all the time.") {
+    val conf = KubernetesConf(
+      new SparkConf(false),
+      KubernetesDriverSpecificConf(
+        None,
+        "test-app",
+        "main",
+        Seq.empty),
+      "prefix",
+      "appId",
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty)
+    validateStepTypesApplied(
+      builderUnderTest.buildFromFeatures(conf),
+      BASIC_STEP_TYPE,
+      CREDENTIALS_STEP_TYPE,
+      SERVICE_STEP_TYPE)
+  }
+
+  test("Apply secrets step if secrets are present.") {
+    val conf = KubernetesConf(
+      new SparkConf(false),
+      KubernetesDriverSpecificConf(
+        None,
+        "test-app",
+        "main",
+        Seq.empty),
+      "prefix",
+      "appId",
+      Map.empty,
+      Map.empty,
+      Map("secret" -> "secretMountPath"),
+      Map.empty)
+    validateStepTypesApplied(
+      builderUnderTest.buildFromFeatures(conf),
+      BASIC_STEP_TYPE,
+      CREDENTIALS_STEP_TYPE,
+      SERVICE_STEP_TYPE,
+      SECRETS_STEP_TYPE)
+  }
+
+  private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec, stepTypes: String*)
+    : Unit = {
+    assert(resolvedSpec.systemProperties.size === stepTypes.size)
+    stepTypes.foreach { stepType =>
+      assert(resolvedSpec.pod.pod.getMetadata.getLabels.get(stepType) === stepType)
+      assert(resolvedSpec.driverKubernetesResources.containsSlice(
+        KubernetesFeaturesTestUtils.getSecretsForStepType(stepType)))
+      assert(resolvedSpec.systemProperties(stepType) === stepType)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
deleted file mode 100644
index ee450ff..0000000
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder}
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-
-class BasicDriverConfigurationStepSuite extends SparkFunSuite {
-
-  private val APP_ID = "spark-app-id"
-  private val RESOURCE_NAME_PREFIX = "spark"
-  private val DRIVER_LABELS = Map("labelkey" -> "labelvalue")
-  private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent"
-  private val APP_NAME = "spark-test"
-  private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
-  private val APP_ARGS = Array("arg1", "arg2", "\"arg 3\"")
-  private val CUSTOM_ANNOTATION_KEY = "customAnnotation"
-  private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue"
-  private val DRIVER_CUSTOM_ENV_KEY1 = "customDriverEnv1"
-  private val DRIVER_CUSTOM_ENV_KEY2 = "customDriverEnv2"
-
-  test("Set all possible configurations from the user.") {
-    val sparkConf = new SparkConf()
-      .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
-      .set(org.apache.spark.internal.config.DRIVER_CLASS_PATH, "/opt/spark/spark-examples.jar")
-      .set("spark.driver.cores", "2")
-      .set(KUBERNETES_DRIVER_LIMIT_CORES, "4")
-      .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "256M")
-      .set(org.apache.spark.internal.config.DRIVER_MEMORY_OVERHEAD, 200L)
-      .set(CONTAINER_IMAGE, "spark-driver:latest")
-      .set(s"$KUBERNETES_DRIVER_ANNOTATION_PREFIX$CUSTOM_ANNOTATION_KEY", CUSTOM_ANNOTATION_VALUE)
-      .set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY1", "customDriverEnv1")
-      .set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY2", "customDriverEnv2")
-      .set(IMAGE_PULL_SECRETS, "imagePullSecret1, imagePullSecret2")
-
-    val submissionStep = new BasicDriverConfigurationStep(
-      APP_ID,
-      RESOURCE_NAME_PREFIX,
-      DRIVER_LABELS,
-      CONTAINER_IMAGE_PULL_POLICY,
-      APP_NAME,
-      MAIN_CLASS,
-      APP_ARGS,
-      sparkConf)
-    val basePod = new PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build()
-    val baseDriverSpec = KubernetesDriverSpec(
-      driverPod = basePod,
-      driverContainer = new ContainerBuilder().build(),
-      driverSparkConf = new SparkConf(false),
-      otherKubernetesResources = Seq.empty[HasMetadata])
-    val preparedDriverSpec = submissionStep.configureDriver(baseDriverSpec)
-
-    assert(preparedDriverSpec.driverContainer.getName === DRIVER_CONTAINER_NAME)
-    assert(preparedDriverSpec.driverContainer.getImage === "spark-driver:latest")
-    assert(preparedDriverSpec.driverContainer.getImagePullPolicy === CONTAINER_IMAGE_PULL_POLICY)
-
-    assert(preparedDriverSpec.driverContainer.getEnv.size === 4)
-    val envs = preparedDriverSpec.driverContainer
-      .getEnv
-      .asScala
-      .map(env => (env.getName, env.getValue))
-      .toMap
-    assert(envs(ENV_CLASSPATH) === "/opt/spark/spark-examples.jar")
-    assert(envs(DRIVER_CUSTOM_ENV_KEY1) === "customDriverEnv1")
-    assert(envs(DRIVER_CUSTOM_ENV_KEY2) === "customDriverEnv2")
-
-    assert(preparedDriverSpec.driverContainer.getEnv.asScala.exists(envVar =>
-      envVar.getName.equals(ENV_DRIVER_BIND_ADDRESS) &&
-        envVar.getValueFrom.getFieldRef.getApiVersion.equals("v1") &&
-        envVar.getValueFrom.getFieldRef.getFieldPath.equals("status.podIP")))
-
-    val resourceRequirements = preparedDriverSpec.driverContainer.getResources
-    val requests = resourceRequirements.getRequests.asScala
-    assert(requests("cpu").getAmount === "2")
-    assert(requests("memory").getAmount === "456Mi")
-    val limits = resourceRequirements.getLimits.asScala
-    assert(limits("memory").getAmount === "456Mi")
-    assert(limits("cpu").getAmount === "4")
-
-    val driverPodMetadata = preparedDriverSpec.driverPod.getMetadata
-    assert(driverPodMetadata.getName === "spark-driver-pod")
-    assert(driverPodMetadata.getLabels.asScala === DRIVER_LABELS)
-    val expectedAnnotations = Map(
-      CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE,
-      SPARK_APP_NAME_ANNOTATION -> APP_NAME)
-    assert(driverPodMetadata.getAnnotations.asScala === expectedAnnotations)
-
-    val driverPodSpec = preparedDriverSpec.driverPod.getSpec
-    assert(driverPodSpec.getRestartPolicy === "Never")
-    assert(driverPodSpec.getImagePullSecrets.size() === 2)
-    assert(driverPodSpec.getImagePullSecrets.get(0).getName === "imagePullSecret1")
-    assert(driverPodSpec.getImagePullSecrets.get(1).getName === "imagePullSecret2")
-
-    val resolvedSparkConf = preparedDriverSpec.driverSparkConf.getAll.toMap
-    val expectedSparkConf = Map(
-      KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
-      "spark.app.id" -> APP_ID,
-      KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX,
-      "spark.kubernetes.submitInDriver" -> "true")
-    assert(resolvedSparkConf === expectedSparkConf)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala
deleted file mode 100644
index ca43fc9..0000000
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import java.io.File
-
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder}
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-
-class DependencyResolutionStepSuite extends SparkFunSuite {
-
-  private val SPARK_JARS = Seq(
-    "apps/jars/jar1.jar",
-    "local:///var/apps/jars/jar2.jar")
-
-  private val SPARK_FILES = Seq(
-    "apps/files/file1.txt",
-    "local:///var/apps/files/file2.txt")
-
-  test("Added dependencies should be resolved in Spark configuration and environment") {
-    val dependencyResolutionStep = new DependencyResolutionStep(
-      SPARK_JARS,
-      SPARK_FILES)
-    val driverPod = new PodBuilder().build()
-    val baseDriverSpec = KubernetesDriverSpec(
-      driverPod = driverPod,
-      driverContainer = new ContainerBuilder().build(),
-      driverSparkConf = new SparkConf(false),
-      otherKubernetesResources = Seq.empty[HasMetadata])
-    val preparedDriverSpec = dependencyResolutionStep.configureDriver(baseDriverSpec)
-    assert(preparedDriverSpec.driverPod === driverPod)
-    assert(preparedDriverSpec.otherKubernetesResources.isEmpty)
-    val resolvedSparkJars = preparedDriverSpec.driverSparkConf.get("spark.jars").split(",").toSet
-    val expectedResolvedSparkJars = Set(
-      "apps/jars/jar1.jar",
-      "/var/apps/jars/jar2.jar")
-    assert(resolvedSparkJars === expectedResolvedSparkJars)
-    val resolvedSparkFiles = preparedDriverSpec.driverSparkConf.get("spark.files").split(",").toSet
-    val expectedResolvedSparkFiles = Set(
-      "apps/files/file1.txt",
-      "/var/apps/files/file2.txt")
-    assert(resolvedSparkFiles === expectedResolvedSparkFiles)
-    val driverEnv = preparedDriverSpec.driverContainer.getEnv.asScala
-    assert(driverEnv.size === 1)
-    assert(driverEnv.head.getName === ENV_MOUNTED_CLASSPATH)
-    val resolvedDriverClasspath = driverEnv.head.getValue.split(File.pathSeparator).toSet
-    val expectedResolvedDriverClasspath = expectedResolvedSparkJars
-    assert(resolvedDriverClasspath === expectedResolvedDriverClasspath)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStepSuite.scala
deleted file mode 100644
index 64553d2..0000000
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStepSuite.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import java.io.File
-
-import scala.collection.JavaConverters._
-
-import com.google.common.base.Charsets
-import com.google.common.io.{BaseEncoding, Files}
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, Secret}
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-import org.apache.spark.util.Utils
-
-class DriverKubernetesCredentialsStepSuite extends SparkFunSuite with BeforeAndAfter {
-
-  private val KUBERNETES_RESOURCE_NAME_PREFIX = "spark"
-  private var credentialsTempDirectory: File = _
-  private val BASE_DRIVER_SPEC = new KubernetesDriverSpec(
-    driverPod = new PodBuilder().build(),
-    driverContainer = new ContainerBuilder().build(),
-    driverSparkConf = new SparkConf(false),
-    otherKubernetesResources = Seq.empty[HasMetadata])
-
-  before {
-    credentialsTempDirectory = Utils.createTempDir()
-  }
-
-  after {
-    credentialsTempDirectory.delete()
-  }
-
-  test("Don't set any credentials") {
-    val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
-        new SparkConf(false), KUBERNETES_RESOURCE_NAME_PREFIX)
-    val preparedDriverSpec = kubernetesCredentialsStep.configureDriver(BASE_DRIVER_SPEC)
-    assert(preparedDriverSpec.driverPod === BASE_DRIVER_SPEC.driverPod)
-    assert(preparedDriverSpec.driverContainer === BASE_DRIVER_SPEC.driverContainer)
-    assert(preparedDriverSpec.otherKubernetesResources.isEmpty)
-    assert(preparedDriverSpec.driverSparkConf.getAll.isEmpty)
-  }
-
-  test("Only set credentials that are manually mounted.") {
-    val submissionSparkConf = new SparkConf(false)
-      .set(
-        s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX",
-        "/mnt/secrets/my-token.txt")
-      .set(
-        s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
-        "/mnt/secrets/my-key.pem")
-      .set(
-        s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
-        "/mnt/secrets/my-cert.pem")
-      .set(
-        s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
-        "/mnt/secrets/my-ca.pem")
-
-    val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
-      submissionSparkConf, KUBERNETES_RESOURCE_NAME_PREFIX)
-    val preparedDriverSpec = kubernetesCredentialsStep.configureDriver(BASE_DRIVER_SPEC)
-    assert(preparedDriverSpec.driverPod === BASE_DRIVER_SPEC.driverPod)
-    assert(preparedDriverSpec.driverContainer === BASE_DRIVER_SPEC.driverContainer)
-    assert(preparedDriverSpec.otherKubernetesResources.isEmpty)
-    assert(preparedDriverSpec.driverSparkConf.getAll.toMap === submissionSparkConf.getAll.toMap)
-  }
-
-  test("Mount credentials from the submission client as a secret.") {
-    val caCertFile = writeCredentials("ca.pem", "ca-cert")
-    val clientKeyFile = writeCredentials("key.pem", "key")
-    val clientCertFile = writeCredentials("cert.pem", "cert")
-    val submissionSparkConf = new SparkConf(false)
-      .set(
-        s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX",
-        "token")
-      .set(
-        s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
-        clientKeyFile.getAbsolutePath)
-      .set(
-        s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
-        clientCertFile.getAbsolutePath)
-      .set(
-        s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
-        caCertFile.getAbsolutePath)
-    val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
-      submissionSparkConf, KUBERNETES_RESOURCE_NAME_PREFIX)
-    val preparedDriverSpec = kubernetesCredentialsStep.configureDriver(
-      BASE_DRIVER_SPEC.copy(driverSparkConf = submissionSparkConf))
-    val expectedSparkConf = Map(
-      s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX" -> "<present_but_redacted>",
-      s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX" ->
-        DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH,
-      s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX" ->
-        DRIVER_CREDENTIALS_CLIENT_KEY_PATH,
-      s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX" ->
-        DRIVER_CREDENTIALS_CLIENT_CERT_PATH,
-      s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX" ->
-        DRIVER_CREDENTIALS_CA_CERT_PATH,
-      s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX" ->
-        clientKeyFile.getAbsolutePath,
-      s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX" ->
-        clientCertFile.getAbsolutePath,
-      s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX" ->
-        caCertFile.getAbsolutePath)
-    assert(preparedDriverSpec.driverSparkConf.getAll.toMap === expectedSparkConf)
-    assert(preparedDriverSpec.otherKubernetesResources.size === 1)
-    val credentialsSecret = preparedDriverSpec.otherKubernetesResources.head.asInstanceOf[Secret]
-    assert(credentialsSecret.getMetadata.getName ===
-      s"$KUBERNETES_RESOURCE_NAME_PREFIX-kubernetes-credentials")
-    val decodedSecretData = credentialsSecret.getData.asScala.map { data =>
-      (data._1, new String(BaseEncoding.base64().decode(data._2), Charsets.UTF_8))
-    }
-    val expectedSecretData = Map(
-      DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME -> "ca-cert",
-      DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME -> "token",
-      DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME -> "key",
-      DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME -> "cert")
-    assert(decodedSecretData === expectedSecretData)
-    val driverPodVolumes = preparedDriverSpec.driverPod.getSpec.getVolumes.asScala
-    assert(driverPodVolumes.size === 1)
-    assert(driverPodVolumes.head.getName === DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
-    assert(driverPodVolumes.head.getSecret != null)
-    assert(driverPodVolumes.head.getSecret.getSecretName === credentialsSecret.getMetadata.getName)
-    val driverContainerVolumeMount = preparedDriverSpec.driverContainer.getVolumeMounts.asScala
-    assert(driverContainerVolumeMount.size === 1)
-    assert(driverContainerVolumeMount.head.getName === DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
-    assert(driverContainerVolumeMount.head.getMountPath === DRIVER_CREDENTIALS_SECRETS_BASE_DIR)
-  }
-
-  private def writeCredentials(credentialsFileName: String, credentialsContents: String): File = {
-    val credentialsFile = new File(credentialsTempDirectory, credentialsFileName)
-    Files.write(credentialsContents, credentialsFile, Charsets.UTF_8)
-    credentialsFile
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStepSuite.scala
deleted file mode 100644
index 960d0bd..0000000
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStepSuite.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.{MountSecretsBootstrap, SecretVolumeUtils}
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-
-class DriverMountSecretsStepSuite extends SparkFunSuite {
-
-  private val SECRET_FOO = "foo"
-  private val SECRET_BAR = "bar"
-  private val SECRET_MOUNT_PATH = "/etc/secrets/driver"
-
-  test("mounts all given secrets") {
-    val baseDriverSpec = KubernetesDriverSpec.initialSpec(new SparkConf(false))
-    val secretNamesToMountPaths = Map(
-      SECRET_FOO -> SECRET_MOUNT_PATH,
-      SECRET_BAR -> SECRET_MOUNT_PATH)
-
-    val mountSecretsBootstrap = new MountSecretsBootstrap(secretNamesToMountPaths)
-    val mountSecretsStep = new DriverMountSecretsStep(mountSecretsBootstrap)
-    val configuredDriverSpec = mountSecretsStep.configureDriver(baseDriverSpec)
-    val driverPodWithSecretsMounted = configuredDriverSpec.driverPod
-    val driverContainerWithSecretsMounted = configuredDriverSpec.driverContainer
-
-    Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach { volumeName =>
-      assert(SecretVolumeUtils.podHasVolume(driverPodWithSecretsMounted, volumeName))
-    }
-    Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach { volumeName =>
-      assert(SecretVolumeUtils.containerHasVolume(
-        driverContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala
deleted file mode 100644
index 78c8c3b..0000000
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model.Service
-import org.mockito.{Mock, MockitoAnnotations}
-import org.mockito.Mockito.when
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-import org.apache.spark.util.Clock
-
-class DriverServiceBootstrapStepSuite extends SparkFunSuite with BeforeAndAfter {
-
-  private val SHORT_RESOURCE_NAME_PREFIX =
-    "a" * (DriverServiceBootstrapStep.MAX_SERVICE_NAME_LENGTH -
-      DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX.length)
-
-  private val LONG_RESOURCE_NAME_PREFIX =
-    "a" * (DriverServiceBootstrapStep.MAX_SERVICE_NAME_LENGTH -
-      DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX.length + 1)
-  private val DRIVER_LABELS = Map(
-    "label1key" -> "label1value",
-    "label2key" -> "label2value")
-
-  @Mock
-  private var clock: Clock = _
-
-  private var sparkConf: SparkConf = _
-
-  before {
-    MockitoAnnotations.initMocks(this)
-    sparkConf = new SparkConf(false)
-  }
-
-  test("Headless service has a port for the driver RPC and the block manager.") {
-    val configurationStep = new DriverServiceBootstrapStep(
-      SHORT_RESOURCE_NAME_PREFIX,
-      DRIVER_LABELS,
-      sparkConf
-        .set("spark.driver.port", "9000")
-        .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080),
-      clock)
-    val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
-    val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
-    assert(resolvedDriverSpec.otherKubernetesResources.size === 1)
-    assert(resolvedDriverSpec.otherKubernetesResources.head.isInstanceOf[Service])
-    val driverService = resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service]
-    verifyService(
-      9000,
-      8080,
-      s"$SHORT_RESOURCE_NAME_PREFIX${DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX}",
-      driverService)
-  }
-
-  test("Hostname and ports are set according to the service name.") {
-    val configurationStep = new DriverServiceBootstrapStep(
-      SHORT_RESOURCE_NAME_PREFIX,
-      DRIVER_LABELS,
-      sparkConf
-        .set("spark.driver.port", "9000")
-        .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080)
-        .set(KUBERNETES_NAMESPACE, "my-namespace"),
-      clock)
-    val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
-    val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
-    val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX +
-      DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX
-    val expectedHostName = s"$expectedServiceName.my-namespace.svc"
-    verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, expectedHostName)
-  }
-
-  test("Ports should resolve to defaults in SparkConf and in the service.") {
-    val configurationStep = new DriverServiceBootstrapStep(
-      SHORT_RESOURCE_NAME_PREFIX,
-      DRIVER_LABELS,
-      sparkConf,
-      clock)
-    val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
-    val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
-    verifyService(
-      DEFAULT_DRIVER_PORT,
-      DEFAULT_BLOCKMANAGER_PORT,
-      s"$SHORT_RESOURCE_NAME_PREFIX${DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX}",
-      resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service])
-    assert(resolvedDriverSpec.driverSparkConf.get("spark.driver.port") ===
-      DEFAULT_DRIVER_PORT.toString)
-    assert(resolvedDriverSpec.driverSparkConf.get(
-      org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT) === DEFAULT_BLOCKMANAGER_PORT)
-  }
-
-  test("Long prefixes should switch to using a generated name.") {
-    val configurationStep = new DriverServiceBootstrapStep(
-      LONG_RESOURCE_NAME_PREFIX,
-      DRIVER_LABELS,
-      sparkConf.set(KUBERNETES_NAMESPACE, "my-namespace"),
-      clock)
-    when(clock.getTimeMillis()).thenReturn(10000)
-    val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
-    val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
-    val driverService = resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service]
-    val expectedServiceName = s"spark-10000${DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX}"
-    assert(driverService.getMetadata.getName === expectedServiceName)
-    val expectedHostName = s"$expectedServiceName.my-namespace.svc"
-    verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, expectedHostName)
-  }
-
-  test("Disallow bind address and driver host to be set explicitly.") {
-    val configurationStep = new DriverServiceBootstrapStep(
-      LONG_RESOURCE_NAME_PREFIX,
-      DRIVER_LABELS,
-      sparkConf.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, "host"),
-      clock)
-    try {
-      configurationStep.configureDriver(KubernetesDriverSpec.initialSpec(sparkConf))
-      fail("The driver bind address should not be allowed.")
-    } catch {
-      case e: Throwable =>
-        assert(e.getMessage ===
-          s"requirement failed: ${DriverServiceBootstrapStep.DRIVER_BIND_ADDRESS_KEY} is" +
-          " not supported in Kubernetes mode, as the driver's bind address is managed" +
-          " and set to the driver pod's IP address.")
-    }
-    sparkConf.remove(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS)
-    sparkConf.set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, "host")
-    try {
-      configurationStep.configureDriver(KubernetesDriverSpec.initialSpec(sparkConf))
-      fail("The driver host address should not be allowed.")
-    } catch {
-      case e: Throwable =>
-        assert(e.getMessage ===
-          s"requirement failed: ${DriverServiceBootstrapStep.DRIVER_HOST_KEY} is" +
-          " not supported in Kubernetes mode, as the driver's hostname will be managed via" +
-          " a Kubernetes service.")
-    }
-  }
-
-  private def verifyService(
-      driverPort: Int,
-      blockManagerPort: Int,
-      expectedServiceName: String,
-      service: Service): Unit = {
-    assert(service.getMetadata.getName === expectedServiceName)
-    assert(service.getSpec.getClusterIP === "None")
-    assert(service.getSpec.getSelector.asScala === DRIVER_LABELS)
-    assert(service.getSpec.getPorts.size() === 2)
-    val driverServicePorts = service.getSpec.getPorts.asScala
-    assert(driverServicePorts.head.getName === DRIVER_PORT_NAME)
-    assert(driverServicePorts.head.getPort.intValue() === driverPort)
-    assert(driverServicePorts.head.getTargetPort.getIntVal === driverPort)
-    assert(driverServicePorts(1).getName === BLOCK_MANAGER_PORT_NAME)
-    assert(driverServicePorts(1).getPort.intValue() === blockManagerPort)
-    assert(driverServicePorts(1).getTargetPort.getIntVal === blockManagerPort)
-  }
-
-  private def verifySparkConfHostNames(
-      driverSparkConf: SparkConf, expectedHostName: String): Unit = {
-    assert(driverSparkConf.get(
-      org.apache.spark.internal.config.DRIVER_HOST_ADDRESS) === expectedHostName)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
deleted file mode 100644
index d73df20..0000000
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.scheduler.cluster.k8s
-
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model._
-import org.mockito.MockitoAnnotations
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach}
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.MountSecretsBootstrap
-
-class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterEach {
-
-  private val driverPodName: String = "driver-pod"
-  private val driverPodUid: String = "driver-uid"
-  private val executorPrefix: String = "base"
-  private val executorImage: String = "executor-image"
-  private val imagePullSecrets: String = "imagePullSecret1, imagePullSecret2"
-  private val driverPod = new PodBuilder()
-    .withNewMetadata()
-    .withName(driverPodName)
-    .withUid(driverPodUid)
-    .endMetadata()
-    .withNewSpec()
-    .withNodeName("some-node")
-    .endSpec()
-    .withNewStatus()
-    .withHostIP("192.168.99.100")
-    .endStatus()
-    .build()
-  private var baseConf: SparkConf = _
-
-  before {
-    MockitoAnnotations.initMocks(this)
-    baseConf = new SparkConf()
-      .set(KUBERNETES_DRIVER_POD_NAME, driverPodName)
-      .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, executorPrefix)
-      .set(CONTAINER_IMAGE, executorImage)
-      .set(KUBERNETES_DRIVER_SUBMIT_CHECK, true)
-      .set(IMAGE_PULL_SECRETS, imagePullSecrets)
-  }
-
-  test("basic executor pod has reasonable defaults") {
-    val factory = new ExecutorPodFactory(baseConf, None)
-    val executor = factory.createExecutorPod(
-      "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
-
-    // The executor pod name and default labels.
-    assert(executor.getMetadata.getName === s"$executorPrefix-exec-1")
-    assert(executor.getMetadata.getLabels.size() === 3)
-    assert(executor.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL) === "1")
-
-    // There is exactly 1 container with no volume mounts and default memory limits and requests.
-    // Default memory limit/request is 1024M + 384M (minimum overhead constant).
-    assert(executor.getSpec.getContainers.size() === 1)
-    assert(executor.getSpec.getContainers.get(0).getImage === executorImage)
-    assert(executor.getSpec.getContainers.get(0).getVolumeMounts.isEmpty)
-    assert(executor.getSpec.getContainers.get(0).getResources.getLimits.size() === 1)
-    assert(executor.getSpec.getContainers.get(0).getResources
-      .getRequests.get("memory").getAmount === "1408Mi")
-    assert(executor.getSpec.getContainers.get(0).getResources
-      .getLimits.get("memory").getAmount === "1408Mi")
-    assert(executor.getSpec.getImagePullSecrets.size() === 2)
-    assert(executor.getSpec.getImagePullSecrets.get(0).getName === "imagePullSecret1")
-    assert(executor.getSpec.getImagePullSecrets.get(1).getName === "imagePullSecret2")
-
-    // The pod has no node selector, volumes.
-    assert(executor.getSpec.getNodeSelector.isEmpty)
-    assert(executor.getSpec.getVolumes.isEmpty)
-
-    checkEnv(executor, Map())
-    checkOwnerReferences(executor, driverPodUid)
-  }
-
-  test("executor core request specification") {
-    var factory = new ExecutorPodFactory(baseConf, None)
-    var executor = factory.createExecutorPod(
-      "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
-    assert(executor.getSpec.getContainers.size() === 1)
-    assert(executor.getSpec.getContainers.get(0).getResources.getRequests.get("cpu").getAmount
-      === "1")
-
-    val conf = baseConf.clone()
-
-    conf.set(KUBERNETES_EXECUTOR_REQUEST_CORES, "0.1")
-    factory = new ExecutorPodFactory(conf, None)
-    executor = factory.createExecutorPod(
-      "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
-    assert(executor.getSpec.getContainers.size() === 1)
-    assert(executor.getSpec.getContainers.get(0).getResources.getRequests.get("cpu").getAmount
-      === "0.1")
-
-    conf.set(KUBERNETES_EXECUTOR_REQUEST_CORES, "100m")
-    factory = new ExecutorPodFactory(conf, None)
-    conf.set(KUBERNETES_EXECUTOR_REQUEST_CORES, "100m")
-    executor = factory.createExecutorPod(
-      "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
-    assert(executor.getSpec.getContainers.get(0).getResources.getRequests.get("cpu").getAmount
-      === "100m")
-  }
-
-  test("executor pod hostnames get truncated to 63 characters") {
-    val conf = baseConf.clone()
-    conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX,
-      "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple")
-
-    val factory = new ExecutorPodFactory(conf, None)
-    val executor = factory.createExecutorPod(
-      "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
-
-    assert(executor.getSpec.getHostname.length === 63)
-  }
-
-  test("classpath and extra java options get translated into environment variables") {
-    val conf = baseConf.clone()
-    conf.set(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS, "foo=bar")
-    conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz")
-
-    val factory = new ExecutorPodFactory(conf, None)
-    val executor = factory.createExecutorPod(
-      "1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]())
-
-    checkEnv(executor,
-      Map("SPARK_JAVA_OPT_0" -> "foo=bar",
-        ENV_CLASSPATH -> "bar=baz",
-        "qux" -> "quux"))
-    checkOwnerReferences(executor, driverPodUid)
-  }
-
-  test("executor secrets get mounted") {
-    val conf = baseConf.clone()
-
-    val secretsBootstrap = new MountSecretsBootstrap(Map("secret1" -> "/var/secret1"))
-    val factory = new ExecutorPodFactory(conf, Some(secretsBootstrap))
-    val executor = factory.createExecutorPod(
-      "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
-
-    assert(executor.getSpec.getContainers.size() === 1)
-    assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 1)
-    assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName
-      === "secret1-volume")
-    assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0)
-      .getMountPath === "/var/secret1")
-
-    // check volume mounted.
-    assert(executor.getSpec.getVolumes.size() === 1)
-    assert(executor.getSpec.getVolumes.get(0).getSecret.getSecretName === "secret1")
-
-    checkOwnerReferences(executor, driverPodUid)
-  }
-
-  // There is always exactly one controller reference, and it points to the driver pod.
-  private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
-    assert(executor.getMetadata.getOwnerReferences.size() === 1)
-    assert(executor.getMetadata.getOwnerReferences.get(0).getUid === driverPodUid)
-    assert(executor.getMetadata.getOwnerReferences.get(0).getController === true)
-  }
-
-  // Check that the expected environment variables are present.
-  private def checkEnv(executor: Pod, additionalEnvVars: Map[String, String]): Unit = {
-    val defaultEnvs = Map(
-      ENV_EXECUTOR_ID -> "1",
-      ENV_DRIVER_URL -> "dummy",
-      ENV_EXECUTOR_CORES -> "1",
-      ENV_EXECUTOR_MEMORY -> "1g",
-      ENV_APPLICATION_ID -> "dummy",
-      ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
-      ENV_EXECUTOR_POD_IP -> null) ++ additionalEnvVars
-
-    assert(executor.getSpec.getContainers.size() === 1)
-    assert(executor.getSpec.getContainers.get(0).getEnv.size() === defaultEnvs.size)
-    val mapEnvs = executor.getSpec.getContainers.get(0).getEnv.asScala.map {
-      x => (x.getName, x.getValue)
-    }.toMap
-    assert(defaultEnvs === mapEnvs)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index b2f26f2..96065e8 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -18,11 +18,12 @@ package org.apache.spark.scheduler.cluster.k8s
 
 import java.util.concurrent.{ExecutorService, ScheduledExecutorService, TimeUnit}
 
-import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder, PodList}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, DoneablePod, Pod, PodBuilder, PodList}
 import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher}
 import io.fabric8.kubernetes.client.Watcher.Action
 import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NonNamespaceOperation, PodResource}
-import org.mockito.{AdditionalAnswers, ArgumentCaptor, Mock, MockitoAnnotations}
+import org.hamcrest.{BaseMatcher, Description, Matcher}
+import org.mockito.{AdditionalAnswers, ArgumentCaptor, Matchers, Mock, MockitoAnnotations}
 import org.mockito.Matchers.{any, eq => mockitoEq}
 import org.mockito.Mockito.{doNothing, never, times, verify, when}
 import org.scalatest.BeforeAndAfter
@@ -31,6 +32,7 @@ import scala.collection.JavaConverters._
 import scala.concurrent.Future
 
 import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.rpc._
@@ -47,8 +49,6 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
   private val SPARK_DRIVER_HOST = "localhost"
   private val SPARK_DRIVER_PORT = 7077
   private val POD_ALLOCATION_INTERVAL = "1m"
-  private val DRIVER_URL = RpcEndpointAddress(
-    SPARK_DRIVER_HOST, SPARK_DRIVER_PORT, CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
   private val FIRST_EXECUTOR_POD = new PodBuilder()
     .withNewMetadata()
       .withName("pod1")
@@ -94,7 +94,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
   private var requestExecutorsService: ExecutorService = _
 
   @Mock
-  private var executorPodFactory: ExecutorPodFactory = _
+  private var executorBuilder: KubernetesExecutorBuilder = _
 
   @Mock
   private var kubernetesClient: KubernetesClient = _
@@ -399,7 +399,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
     new KubernetesClusterSchedulerBackend(
       taskSchedulerImpl,
       rpcEnv,
-      executorPodFactory,
+      executorBuilder,
       kubernetesClient,
       allocatorExecutor,
       requestExecutorsService) {
@@ -428,13 +428,22 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
         .addToLabels(SPARK_EXECUTOR_ID_LABEL, executorId.toString)
         .endMetadata()
       .build()
-    when(executorPodFactory.createExecutorPod(
-      executorId.toString,
-      APP_ID,
-      DRIVER_URL,
-      sparkConf.getExecutorEnv,
-      driverPod,
-      Map.empty)).thenReturn(resolvedPod)
-    resolvedPod
+    val resolvedContainer = new ContainerBuilder().build()
+    when(executorBuilder.buildFromFeatures(Matchers.argThat(
+      new BaseMatcher[KubernetesConf[KubernetesExecutorSpecificConf]] {
+        override def matches(argument: scala.Any)
+          : Boolean = {
+          argument.isInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]] &&
+            argument.asInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]]
+              .roleSpecificConf.executorId == executorId.toString
+        }
+
+        override def describeTo(description: Description): Unit = {}
+      }))).thenReturn(SparkPod(resolvedPod, resolvedContainer))
+    new PodBuilder(resolvedPod)
+      .editSpec()
+        .addToContainers(resolvedContainer)
+        .endSpec()
+      .build()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
new file mode 100644
index 0000000..f527062
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model.PodBuilder
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, KubernetesFeaturesTestUtils, MountSecretsFeatureStep}
+
+class KubernetesExecutorBuilderSuite extends SparkFunSuite {
+  private val BASIC_STEP_TYPE = "basic"
+  private val SECRETS_STEP_TYPE = "mount-secrets"
+
+  private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+    BASIC_STEP_TYPE, classOf[BasicExecutorFeatureStep])
+  private val mountSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+    SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep])
+
+  private val builderUnderTest = new KubernetesExecutorBuilder(
+    _ => basicFeatureStep,
+    _ => mountSecretsStep)
+
+  test("Basic steps are consistently applied.") {
+    val conf = KubernetesConf(
+      new SparkConf(false),
+      KubernetesExecutorSpecificConf(
+        "executor-id", new PodBuilder().build()),
+      "prefix",
+      "appId",
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty)
+    validateStepTypesApplied(builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE)
+  }
+
+  test("Apply secrets step if secrets are present.") {
+    val conf = KubernetesConf(
+      new SparkConf(false),
+      KubernetesExecutorSpecificConf(
+        "executor-id", new PodBuilder().build()),
+      "prefix",
+      "appId",
+      Map.empty,
+      Map.empty,
+      Map("secret" -> "secretMountPath"),
+      Map.empty)
+    validateStepTypesApplied(
+      builderUnderTest.buildFromFeatures(conf),
+      BASIC_STEP_TYPE,
+      SECRETS_STEP_TYPE)
+  }
+
+  private def validateStepTypesApplied(resolvedPod: SparkPod, stepTypes: String*): Unit = {
+    assert(resolvedPod.pod.getMetadata.getLabels.size === stepTypes.size)
+    stepTypes.foreach { stepType =>
+      assert(resolvedPod.pod.getMetadata.getLabels.get(stepType) === stepType)
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[3/3] spark git commit: [SPARK-22839][K8S] Refactor to unify driver and executor pod builder APIs

Posted by fo...@apache.org.
[SPARK-22839][K8S] Refactor to unify driver and executor pod builder APIs

## What changes were proposed in this pull request?

Breaks down the construction of driver pods and executor pods in a way that uses a common abstraction for both spark-submit creating the driver and KubernetesClusterSchedulerBackend creating the executor. Encourages more code reuse and is more legible than the older approach.

The high-level design is discussed in more detail on the JIRA ticket. This pull request is the implementation of that design with some minor changes in the implementation details.

No user-facing behavior should break as a result of this change.

## How was this patch tested?

Migrated all unit tests from the old submission steps architecture to the new architecture. Integration tests should not have to change and pass given that this shouldn't change any outward behavior.

Author: mcheah <mc...@palantir.com>

Closes #20910 from mccheah/spark-22839-incremental.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a83ae0d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a83ae0d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a83ae0d9

Branch: refs/heads/master
Commit: a83ae0d9bc1b8f4909b9338370efe4020079bea7
Parents: 0323e61
Author: mcheah <mc...@palantir.com>
Authored: Fri Apr 13 08:43:58 2018 -0700
Committer: Anirudh Ramanathan <ra...@google.com>
Committed: Fri Apr 13 08:43:58 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/k8s/Config.scala    |   2 +-
 .../spark/deploy/k8s/KubernetesConf.scala       | 184 ++++++++++++++
 .../spark/deploy/k8s/KubernetesDriverSpec.scala |  31 +++
 .../spark/deploy/k8s/KubernetesUtils.scala      |  11 -
 .../deploy/k8s/MountSecretsBootstrap.scala      |  72 ------
 .../org/apache/spark/deploy/k8s/SparkPod.scala  |  34 +++
 .../k8s/features/BasicDriverFeatureStep.scala   | 136 ++++++++++
 .../k8s/features/BasicExecutorFeatureStep.scala | 179 ++++++++++++++
 ...DriverKubernetesCredentialsFeatureStep.scala | 216 ++++++++++++++++
 .../k8s/features/DriverServiceFeatureStep.scala |  97 ++++++++
 .../features/KubernetesFeatureConfigStep.scala  |  71 ++++++
 .../k8s/features/MountSecretsFeatureStep.scala  |  62 +++++
 .../k8s/submit/DriverConfigOrchestrator.scala   | 145 -----------
 .../submit/KubernetesClientApplication.scala    |  80 +++---
 .../k8s/submit/KubernetesDriverBuilder.scala    |  56 +++++
 .../k8s/submit/KubernetesDriverSpec.scala       |  47 ----
 .../steps/BasicDriverConfigurationStep.scala    | 163 ------------
 .../submit/steps/DependencyResolutionStep.scala |  61 -----
 .../submit/steps/DriverConfigurationStep.scala  |  30 ---
 .../steps/DriverKubernetesCredentialsStep.scala | 245 -------------------
 .../submit/steps/DriverMountSecretsStep.scala   |  38 ---
 .../steps/DriverServiceBootstrapStep.scala      | 104 --------
 .../cluster/k8s/ExecutorPodFactory.scala        | 227 -----------------
 .../cluster/k8s/KubernetesClusterManager.scala  |  12 +-
 .../k8s/KubernetesClusterSchedulerBackend.scala |  20 +-
 .../cluster/k8s/KubernetesExecutorBuilder.scala |  41 ++++
 .../spark/deploy/k8s/KubernetesConfSuite.scala  | 175 +++++++++++++
 .../spark/deploy/k8s/KubernetesUtilsTest.scala  |  36 ---
 .../features/BasicDriverFeatureStepSuite.scala  | 153 ++++++++++++
 .../BasicExecutorFeatureStepSuite.scala         | 179 ++++++++++++++
 ...rKubernetesCredentialsFeatureStepSuite.scala | 174 +++++++++++++
 .../DriverServiceFeatureStepSuite.scala         | 227 +++++++++++++++++
 .../features/KubernetesFeaturesTestUtils.scala  |  61 +++++
 .../features/MountSecretsFeatureStepSuite.scala |  58 +++++
 .../spark/deploy/k8s/submit/ClientSuite.scala   | 216 ++++++++--------
 .../submit/DriverConfigOrchestratorSuite.scala  | 131 ----------
 .../submit/KubernetesDriverBuilderSuite.scala   | 102 ++++++++
 .../BasicDriverConfigurationStepSuite.scala     | 122 ---------
 .../steps/DependencyResolutionStepSuite.scala   |  69 ------
 .../DriverKubernetesCredentialsStepSuite.scala  | 153 ------------
 .../steps/DriverMountSecretsStepSuite.scala     |  49 ----
 .../steps/DriverServiceBootstrapStepSuite.scala | 180 --------------
 .../cluster/k8s/ExecutorPodFactorySuite.scala   | 195 ---------------
 ...KubernetesClusterSchedulerBackendSuite.scala |  37 +--
 .../k8s/KubernetesExecutorBuilderSuite.scala    |  75 ++++++
 45 files changed, 2482 insertions(+), 2274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 82f6c71..4086970 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -167,5 +167,5 @@ private[spark] object Config extends Logging {
   val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
   val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."
 
-  val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."
+  val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv."
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
new file mode 100644
index 0000000..77b634d
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, MainAppResource}
+import org.apache.spark.internal.config.ConfigEntry
+
+private[spark] sealed trait KubernetesRoleSpecificConf
+
+/*
+ * Structure containing metadata for Kubernetes logic that builds a Spark driver.
+ */
+private[spark] case class KubernetesDriverSpecificConf(
+    mainAppResource: Option[MainAppResource],
+    mainClass: String,
+    appName: String,
+    appArgs: Seq[String]) extends KubernetesRoleSpecificConf
+
+/*
+ * Structure containing metadata for Kubernetes logic that builds a Spark executor.
+ */
+private[spark] case class KubernetesExecutorSpecificConf(
+    executorId: String,
+    driverPod: Pod)
+  extends KubernetesRoleSpecificConf
+
+/**
+ * Structure containing metadata for Kubernetes logic to build Spark pods.
+ */
+private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
+    sparkConf: SparkConf,
+    roleSpecificConf: T,
+    appResourceNamePrefix: String,
+    appId: String,
+    roleLabels: Map[String, String],
+    roleAnnotations: Map[String, String],
+    roleSecretNamesToMountPaths: Map[String, String],
+    roleEnvs: Map[String, String]) {
+
+  def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
+
+  def sparkJars(): Seq[String] = sparkConf
+    .getOption("spark.jars")
+    .map(str => str.split(",").toSeq)
+    .getOrElse(Seq.empty[String])
+
+  def sparkFiles(): Seq[String] = sparkConf
+    .getOption("spark.files")
+    .map(str => str.split(",").toSeq)
+    .getOrElse(Seq.empty[String])
+
+  def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
+
+  def imagePullSecrets(): Seq[LocalObjectReference] = {
+    sparkConf
+      .get(IMAGE_PULL_SECRETS)
+      .map(_.split(","))
+      .getOrElse(Array.empty[String])
+      .map(_.trim)
+      .map { secret =>
+        new LocalObjectReferenceBuilder().withName(secret).build()
+      }
+  }
+
+  def nodeSelector(): Map[String, String] =
+    KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)
+
+  def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)
+
+  def get(conf: String): String = sparkConf.get(conf)
+
+  def get(conf: String, defaultValue: String): String = sparkConf.get(conf, defaultValue)
+
+  def getOption(key: String): Option[String] = sparkConf.getOption(key)
+}
+
+private[spark] object KubernetesConf {
+  def createDriverConf(
+      sparkConf: SparkConf,
+      appName: String,
+      appResourceNamePrefix: String,
+      appId: String,
+      mainAppResource: Option[MainAppResource],
+      mainClass: String,
+      appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = {
+    val sparkConfWithMainAppJar = sparkConf.clone()
+    mainAppResource.foreach {
+      case JavaMainAppResource(res) =>
+        val previousJars = sparkConf
+          .getOption("spark.jars")
+          .map(_.split(","))
+          .getOrElse(Array.empty)
+        if (!previousJars.contains(res)) {
+          sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
+        }
+    }
+
+    val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
+      sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX)
+    require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " +
+      s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " +
+      "operations.")
+    require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with key " +
+      s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " +
+      "operations.")
+    val driverLabels = driverCustomLabels ++ Map(
+      SPARK_APP_ID_LABEL -> appId,
+      SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
+    val driverAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
+      sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX)
+    val driverSecretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
+      sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX)
+    val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs(
+      sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)
+
+    KubernetesConf(
+      sparkConfWithMainAppJar,
+      KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs),
+      appResourceNamePrefix,
+      appId,
+      driverLabels,
+      driverAnnotations,
+      driverSecretNamesToMountPaths,
+      driverEnvs)
+  }
+
+  def createExecutorConf(
+      sparkConf: SparkConf,
+      executorId: String,
+      appId: String,
+      driverPod: Pod): KubernetesConf[KubernetesExecutorSpecificConf] = {
+    val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
+      sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX)
+    require(
+      !executorCustomLabels.contains(SPARK_APP_ID_LABEL),
+      s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.")
+    require(
+      !executorCustomLabels.contains(SPARK_EXECUTOR_ID_LABEL),
+      s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" +
+        " Spark.")
+    require(
+      !executorCustomLabels.contains(SPARK_ROLE_LABEL),
+      s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.")
+    val executorLabels = Map(
+      SPARK_EXECUTOR_ID_LABEL -> executorId,
+      SPARK_APP_ID_LABEL -> appId,
+      SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++
+      executorCustomLabels
+    val executorAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
+      sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
+    val executorSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
+      sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
+    val executorEnv = sparkConf.getExecutorEnv.toMap
+
+    KubernetesConf(
+      sparkConf.clone(),
+      KubernetesExecutorSpecificConf(executorId, driverPod),
+      sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX),
+      appId,
+      executorLabels,
+      executorAnnotations,
+      executorSecrets,
+      executorEnv)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala
new file mode 100644
index 0000000..0c5ae02
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.HasMetadata
+
+private[spark] case class KubernetesDriverSpec(
+    pod: SparkPod,
+    driverKubernetesResources: Seq[HasMetadata],
+    systemProperties: Map[String, String])
+
+private[spark] object KubernetesDriverSpec {
+  def initialSpec(initialProps: Map[String, String]): KubernetesDriverSpec = KubernetesDriverSpec(
+    SparkPod.initialPod(),
+    Seq.empty,
+    initialProps)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index 5b2bb81..ee62906 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -37,17 +37,6 @@ private[spark] object KubernetesUtils {
     sparkConf.getAllWithPrefix(prefix).toMap
   }
 
-  /**
-   * Parses comma-separated list of imagePullSecrets into K8s-understandable format
-   */
-  def parseImagePullSecrets(imagePullSecrets: Option[String]): List[LocalObjectReference] = {
-    imagePullSecrets match {
-      case Some(secretsCommaSeparated) =>
-        secretsCommaSeparated.split(',').map(_.trim).map(new LocalObjectReference(_)).toList
-      case None => Nil
-    }
-  }
-
   def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
     opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala
deleted file mode 100644
index c35e7db..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s
-
-import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}
-
-/**
- * Bootstraps a driver or executor container or an init-container with needed secrets mounted.
- */
-private[spark] class MountSecretsBootstrap(secretNamesToMountPaths: Map[String, String]) {
-
-  /**
-   * Add new secret volumes for the secrets specified in secretNamesToMountPaths into the given pod.
-   *
-   * @param pod the pod into which the secret volumes are being added.
-   * @return the updated pod with the secret volumes added.
-   */
-  def addSecretVolumes(pod: Pod): Pod = {
-    var podBuilder = new PodBuilder(pod)
-    secretNamesToMountPaths.keys.foreach { name =>
-      podBuilder = podBuilder
-        .editOrNewSpec()
-          .addNewVolume()
-            .withName(secretVolumeName(name))
-            .withNewSecret()
-              .withSecretName(name)
-              .endSecret()
-            .endVolume()
-          .endSpec()
-    }
-
-    podBuilder.build()
-  }
-
-  /**
-   * Mounts Kubernetes secret volumes of the secrets specified in secretNamesToMountPaths into the
-   * given container.
-   *
-   * @param container the container into which the secret volumes are being mounted.
-   * @return the updated container with the secrets mounted.
-   */
-  def mountSecrets(container: Container): Container = {
-    var containerBuilder = new ContainerBuilder(container)
-    secretNamesToMountPaths.foreach { case (name, path) =>
-      containerBuilder = containerBuilder
-        .addNewVolumeMount()
-          .withName(secretVolumeName(name))
-          .withMountPath(path)
-          .endVolumeMount()
-    }
-
-    containerBuilder.build()
-  }
-
-  private def secretVolumeName(secretName: String): String = {
-    secretName + "-volume"
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala
new file mode 100644
index 0000000..345dd11
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}
+
+private[spark] case class SparkPod(pod: Pod, container: Container)
+
+private[spark] object SparkPod {
+  def initialPod(): SparkPod = {
+    SparkPod(
+      new PodBuilder()
+        .withNewMetadata()
+        .endMetadata()
+        .withNewSpec()
+        .endSpec()
+        .build(),
+      new ContainerBuilder().build())
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
new file mode 100644
index 0000000..07bdccb
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
+
+private[spark] class BasicDriverFeatureStep(
+    conf: KubernetesConf[KubernetesDriverSpecificConf])
+  extends KubernetesFeatureConfigStep {
+
+  private val driverPodName = conf
+    .get(KUBERNETES_DRIVER_POD_NAME)
+    .getOrElse(s"${conf.appResourceNamePrefix}-driver")
+
+  private val driverContainerImage = conf
+    .get(DRIVER_CONTAINER_IMAGE)
+    .getOrElse(throw new SparkException("Must specify the driver container image"))
+
+  // CPU settings
+  private val driverCpuCores = conf.get("spark.driver.cores", "1")
+  private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES)
+
+  // Memory settings
+  private val driverMemoryMiB = conf.get(DRIVER_MEMORY)
+  private val memoryOverheadMiB = conf
+    .get(DRIVER_MEMORY_OVERHEAD)
+    .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB))
+  private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB
+
+  override def configurePod(pod: SparkPod): SparkPod = {
+    val driverCustomEnvs = conf.roleEnvs
+      .toSeq
+      .map { env =>
+        new EnvVarBuilder()
+          .withName(env._1)
+          .withValue(env._2)
+          .build()
+      }
+
+    val driverCpuQuantity = new QuantityBuilder(false)
+      .withAmount(driverCpuCores)
+      .build()
+    val driverMemoryQuantity = new QuantityBuilder(false)
+      .withAmount(s"${driverMemoryWithOverheadMiB}Mi")
+      .build()
+    val maybeCpuLimitQuantity = driverLimitCores.map { limitCores =>
+      ("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
+    }
+
+    val driverContainer = new ContainerBuilder(pod.container)
+      .withName(DRIVER_CONTAINER_NAME)
+      .withImage(driverContainerImage)
+      .withImagePullPolicy(conf.imagePullPolicy())
+      .addAllToEnv(driverCustomEnvs.asJava)
+      .addNewEnv()
+        .withName(ENV_DRIVER_BIND_ADDRESS)
+        .withValueFrom(new EnvVarSourceBuilder()
+          .withNewFieldRef("v1", "status.podIP")
+          .build())
+        .endEnv()
+      .withNewResources()
+        .addToRequests("cpu", driverCpuQuantity)
+        .addToLimits(maybeCpuLimitQuantity.toMap.asJava)
+        .addToRequests("memory", driverMemoryQuantity)
+        .addToLimits("memory", driverMemoryQuantity)
+        .endResources()
+      .addToArgs("driver")
+      .addToArgs("--properties-file", SPARK_CONF_PATH)
+      .addToArgs("--class", conf.roleSpecificConf.mainClass)
+      // The user application jar is merged into the spark.jars list and managed through that
+      // property, so there is no need to reference it explicitly here.
+      .addToArgs(SparkLauncher.NO_RESOURCE)
+      .addToArgs(conf.roleSpecificConf.appArgs: _*)
+      .build()
+
+    val driverPod = new PodBuilder(pod.pod)
+      .editOrNewMetadata()
+        .withName(driverPodName)
+        .addToLabels(conf.roleLabels.asJava)
+        .addToAnnotations(conf.roleAnnotations.asJava)
+        .endMetadata()
+      .withNewSpec()
+        .withRestartPolicy("Never")
+        .withNodeSelector(conf.nodeSelector().asJava)
+        .addToImagePullSecrets(conf.imagePullSecrets(): _*)
+        .endSpec()
+      .build()
+    SparkPod(driverPod, driverContainer)
+  }
+
+  override def getAdditionalPodSystemProperties(): Map[String, String] = {
+    val additionalProps = mutable.Map(
+      KUBERNETES_DRIVER_POD_NAME.key -> driverPodName,
+      "spark.app.id" -> conf.appId,
+      KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.appResourceNamePrefix,
+      KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true")
+
+    val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath(
+      conf.sparkJars())
+    val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath(
+      conf.sparkFiles())
+    if (resolvedSparkJars.nonEmpty) {
+      additionalProps.put("spark.jars", resolvedSparkJars.mkString(","))
+    }
+    if (resolvedSparkFiles.nonEmpty) {
+      additionalProps.put("spark.files", resolvedSparkFiles.mkString(","))
+    }
+    additionalProps.toMap
+  }
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
new file mode 100644
index 0000000..d220975
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
+import org.apache.spark.rpc.RpcEndpointAddress
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+private[spark] class BasicExecutorFeatureStep(
+    kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf])
+  extends KubernetesFeatureConfigStep {
+
+  // Consider moving some of these fields to KubernetesConf or KubernetesExecutorSpecificConf
+  private val executorExtraClasspath = kubernetesConf.get(EXECUTOR_CLASS_PATH)
+  private val executorContainerImage = kubernetesConf
+    .get(EXECUTOR_CONTAINER_IMAGE)
+    .getOrElse(throw new SparkException("Must specify the executor container image"))
+  private val blockManagerPort = kubernetesConf
+    .sparkConf
+    .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
+
+  private val executorPodNamePrefix = kubernetesConf.appResourceNamePrefix
+
+  private val driverUrl = RpcEndpointAddress(
+    kubernetesConf.get("spark.driver.host"),
+    kubernetesConf.sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+    CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+  private val executorMemoryMiB = kubernetesConf.get(EXECUTOR_MEMORY)
+  private val executorMemoryString = kubernetesConf.get(
+    EXECUTOR_MEMORY.key, EXECUTOR_MEMORY.defaultValueString)
+
+  private val memoryOverheadMiB = kubernetesConf
+    .get(EXECUTOR_MEMORY_OVERHEAD)
+    .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
+      MEMORY_OVERHEAD_MIN_MIB))
+  private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB
+
+  private val executorCores = kubernetesConf.sparkConf.getInt("spark.executor.cores", 1)
+  private val executorCoresRequest =
+    if (kubernetesConf.sparkConf.contains(KUBERNETES_EXECUTOR_REQUEST_CORES)) {
+      kubernetesConf.get(KUBERNETES_EXECUTOR_REQUEST_CORES).get
+    } else {
+      executorCores.toString
+    }
+  private val executorLimitCores = kubernetesConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
+
+  override def configurePod(pod: SparkPod): SparkPod = {
+    val name = s"$executorPodNamePrefix-exec-${kubernetesConf.roleSpecificConf.executorId}"
+
+    // hostname must be no longer than 63 characters, so take the last 63 characters of the pod
+    // name as the hostname.  This preserves uniqueness since the end of name contains
+    // executorId
+    val hostname = name.substring(Math.max(0, name.length - 63))
+    val executorMemoryQuantity = new QuantityBuilder(false)
+      .withAmount(s"${executorMemoryWithOverhead}Mi")
+      .build()
+    val executorCpuQuantity = new QuantityBuilder(false)
+      .withAmount(executorCoresRequest)
+      .build()
+    val executorExtraClasspathEnv = executorExtraClasspath.map { cp =>
+      new EnvVarBuilder()
+        .withName(ENV_CLASSPATH)
+        .withValue(cp)
+        .build()
+    }
+    val executorExtraJavaOptionsEnv = kubernetesConf
+      .get(EXECUTOR_JAVA_OPTIONS)
+      .map { opts =>
+        val delimitedOpts = Utils.splitCommandString(opts)
+        delimitedOpts.zipWithIndex.map {
+          case (opt, index) =>
+            new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build()
+        }
+      }.getOrElse(Seq.empty[EnvVar])
+    val executorEnv = (Seq(
+      (ENV_DRIVER_URL, driverUrl),
+      (ENV_EXECUTOR_CORES, executorCores.toString),
+      (ENV_EXECUTOR_MEMORY, executorMemoryString),
+      (ENV_APPLICATION_ID, kubernetesConf.appId),
+      // This is to set the SPARK_CONF_DIR to be /opt/spark/conf
+      (ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL),
+      (ENV_EXECUTOR_ID, kubernetesConf.roleSpecificConf.executorId)) ++
+      kubernetesConf.roleEnvs)
+      .map(env => new EnvVarBuilder()
+        .withName(env._1)
+        .withValue(env._2)
+        .build()
+      ) ++ Seq(
+      new EnvVarBuilder()
+        .withName(ENV_EXECUTOR_POD_IP)
+        .withValueFrom(new EnvVarSourceBuilder()
+          .withNewFieldRef("v1", "status.podIP")
+          .build())
+        .build()
+    ) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq
+    val requiredPorts = Seq(
+      (BLOCK_MANAGER_PORT_NAME, blockManagerPort))
+      .map { case (name, port) =>
+        new ContainerPortBuilder()
+          .withName(name)
+          .withContainerPort(port)
+          .build()
+      }
+
+    val executorContainer = new ContainerBuilder(pod.container)
+      .withName("executor")
+      .withImage(executorContainerImage)
+      .withImagePullPolicy(kubernetesConf.imagePullPolicy())
+      .withNewResources()
+        .addToRequests("memory", executorMemoryQuantity)
+        .addToLimits("memory", executorMemoryQuantity)
+        .addToRequests("cpu", executorCpuQuantity)
+        .endResources()
+      .addAllToEnv(executorEnv.asJava)
+      .withPorts(requiredPorts.asJava)
+      .addToArgs("executor")
+      .build()
+    val containerWithLimitCores = executorLimitCores.map { limitCores =>
+      val executorCpuLimitQuantity = new QuantityBuilder(false)
+        .withAmount(limitCores)
+        .build()
+      new ContainerBuilder(executorContainer)
+        .editResources()
+          .addToLimits("cpu", executorCpuLimitQuantity)
+          .endResources()
+        .build()
+    }.getOrElse(executorContainer)
+    val driverPod = kubernetesConf.roleSpecificConf.driverPod
+    val executorPod = new PodBuilder(pod.pod)
+      .editOrNewMetadata()
+        .withName(name)
+        .withLabels(kubernetesConf.roleLabels.asJava)
+        .withAnnotations(kubernetesConf.roleAnnotations.asJava)
+        .withOwnerReferences()
+        .addNewOwnerReference()
+          .withController(true)
+          .withApiVersion(driverPod.getApiVersion)
+          .withKind(driverPod.getKind)
+          .withName(driverPod.getMetadata.getName)
+          .withUid(driverPod.getMetadata.getUid)
+          .endOwnerReference()
+        .endMetadata()
+      .editOrNewSpec()
+        .withHostname(hostname)
+        .withRestartPolicy("Never")
+        .withNodeSelector(kubernetesConf.nodeSelector().asJava)
+        .addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*)
+        .endSpec()
+      .build()
+    SparkPod(executorPod, containerWithLimitCores)
+  }
+
+  override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala
new file mode 100644
index 0000000..ff5ad66
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, Secret, SecretBuilder}
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+
+private[spark] class DriverKubernetesCredentialsFeatureStep(kubernetesConf: KubernetesConf[_])
+  extends KubernetesFeatureConfigStep {
+  // TODO clean up this class, and credentials in general. See also SparkKubernetesClientFactory.
+  // We should use a struct to hold all creds-related fields. A lot of the code is very repetitive.
+
+  private val maybeMountedOAuthTokenFile = kubernetesConf.getOption(
+    s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX")
+  private val maybeMountedClientKeyFile = kubernetesConf.getOption(
+    s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX")
+  private val maybeMountedClientCertFile = kubernetesConf.getOption(
+    s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX")
+  private val maybeMountedCaCertFile = kubernetesConf.getOption(
+    s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")
+  private val driverServiceAccount = kubernetesConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
+
+  private val oauthTokenBase64 = kubernetesConf
+    .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX")
+    .map { token =>
+      BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8))
+    }
+
+  private val caCertDataBase64 = safeFileConfToBase64(
+    s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+    "Driver CA cert file")
+  private val clientKeyDataBase64 = safeFileConfToBase64(
+    s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+    "Driver client key file")
+  private val clientCertDataBase64 = safeFileConfToBase64(
+    s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+    "Driver client cert file")
+
+  // TODO decide whether or not to apply this step entirely in the caller, i.e. the builder.
+  private val shouldMountSecret = oauthTokenBase64.isDefined ||
+    caCertDataBase64.isDefined ||
+    clientKeyDataBase64.isDefined ||
+    clientCertDataBase64.isDefined
+
+  private val driverCredentialsSecretName =
+    s"${kubernetesConf.appResourceNamePrefix}-kubernetes-credentials"
+
+  override def configurePod(pod: SparkPod): SparkPod = {
+    if (!shouldMountSecret) {
+      pod.copy(
+        pod = driverServiceAccount.map { account =>
+          new PodBuilder(pod.pod)
+            .editOrNewSpec()
+              .withServiceAccount(account)
+              .withServiceAccountName(account)
+              .endSpec()
+            .build()
+        }.getOrElse(pod.pod))
+    } else {
+      val driverPodWithMountedKubernetesCredentials =
+        new PodBuilder(pod.pod)
+          .editOrNewSpec()
+            .addNewVolume()
+              .withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+              .withNewSecret().withSecretName(driverCredentialsSecretName).endSecret()
+              .endVolume()
+          .endSpec()
+          .build()
+
+      val driverContainerWithMountedSecretVolume =
+        new ContainerBuilder(pod.container)
+          .addNewVolumeMount()
+            .withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+            .withMountPath(DRIVER_CREDENTIALS_SECRETS_BASE_DIR)
+            .endVolumeMount()
+          .build()
+      SparkPod(driverPodWithMountedKubernetesCredentials, driverContainerWithMountedSecretVolume)
+    }
+  }
+
+  override def getAdditionalPodSystemProperties(): Map[String, String] = {
+    val resolvedMountedOAuthTokenFile = resolveSecretLocation(
+      maybeMountedOAuthTokenFile,
+      oauthTokenBase64,
+      DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH)
+    val resolvedMountedClientKeyFile = resolveSecretLocation(
+      maybeMountedClientKeyFile,
+      clientKeyDataBase64,
+      DRIVER_CREDENTIALS_CLIENT_KEY_PATH)
+    val resolvedMountedClientCertFile = resolveSecretLocation(
+      maybeMountedClientCertFile,
+      clientCertDataBase64,
+      DRIVER_CREDENTIALS_CLIENT_CERT_PATH)
+    val resolvedMountedCaCertFile = resolveSecretLocation(
+      maybeMountedCaCertFile,
+      caCertDataBase64,
+      DRIVER_CREDENTIALS_CA_CERT_PATH)
+
+    val redactedTokens = kubernetesConf.sparkConf.getAll
+      .filter(_._1.endsWith(OAUTH_TOKEN_CONF_SUFFIX))
+      .toMap
+      .mapValues( _ => "<present_but_redacted>")
+    redactedTokens ++
+      resolvedMountedCaCertFile.map { file =>
+        Map(
+          s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX" ->
+            file)
+      }.getOrElse(Map.empty) ++
+      resolvedMountedClientKeyFile.map { file =>
+        Map(
+          s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX" ->
+            file)
+      }.getOrElse(Map.empty) ++
+      resolvedMountedClientCertFile.map { file =>
+        Map(
+          s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX" ->
+            file)
+      }.getOrElse(Map.empty) ++
+      resolvedMountedOAuthTokenFile.map { file =>
+        Map(
+          s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX" ->
+          file)
+      }.getOrElse(Map.empty)
+  }
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
+    if (shouldMountSecret) {
+      Seq(createCredentialsSecret())
+    } else {
+      Seq.empty
+    }
+  }
+
+  private def safeFileConfToBase64(conf: String, fileType: String): Option[String] = {
+    kubernetesConf.getOption(conf)
+      .map(new File(_))
+      .map { file =>
+        require(file.isFile, String.format("%s provided at %s does not exist or is not a file.",
+          fileType, file.getAbsolutePath))
+        BaseEncoding.base64().encode(Files.toByteArray(file))
+      }
+  }
+
+  /**
+   * Resolve a Kubernetes secret data entry from an optional client credential used by the
+   * driver to talk to the Kubernetes API server.
+   *
+   * @param userSpecifiedCredential the optional user-specified client credential.
+   * @param secretName name of the Kubernetes secret storing the client credential.
+   * @return a secret data entry in the form of a map from the secret name to the secret data,
+   *         which may be empty if the user-specified credential is empty.
+   */
+  private def resolveSecretData(
+    userSpecifiedCredential: Option[String],
+    secretName: String): Map[String, String] = {
+    userSpecifiedCredential.map { valueBase64 =>
+      Map(secretName -> valueBase64)
+    }.getOrElse(Map.empty[String, String])
+  }
+
+  private def resolveSecretLocation(
+    mountedUserSpecified: Option[String],
+    valueMountedFromSubmitter: Option[String],
+    mountedCanonicalLocation: String): Option[String] = {
+    mountedUserSpecified.orElse(valueMountedFromSubmitter.map { _ =>
+      mountedCanonicalLocation
+    })
+  }
+
+  private def createCredentialsSecret(): Secret = {
+    val allSecretData =
+      resolveSecretData(
+        clientKeyDataBase64,
+        DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME) ++
+        resolveSecretData(
+          clientCertDataBase64,
+          DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME) ++
+        resolveSecretData(
+          caCertDataBase64,
+          DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME) ++
+        resolveSecretData(
+          oauthTokenBase64,
+          DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME)
+
+    new SecretBuilder()
+      .withNewMetadata()
+        .withName(driverCredentialsSecretName)
+        .endMetadata()
+      .withData(allSecretData.asJava)
+      .build()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
new file mode 100644
index 0000000..f2d7bbd
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder}
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.{Clock, SystemClock}
+
+private[spark] class DriverServiceFeatureStep(
+    kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf],
+    clock: Clock = new SystemClock)
+  extends KubernetesFeatureConfigStep with Logging {
+  import DriverServiceFeatureStep._
+
+  require(kubernetesConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
+    s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's bind " +
+      "address is managed and set to the driver pod's IP address.")
+  require(kubernetesConf.getOption(DRIVER_HOST_KEY).isEmpty,
+    s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be " +
+      "managed via a Kubernetes service.")
+
+  private val preferredServiceName = s"${kubernetesConf.appResourceNamePrefix}$DRIVER_SVC_POSTFIX"
+  private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
+    preferredServiceName
+  } else {
+    val randomServiceId = clock.getTimeMillis()
+    val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
+    logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
+      s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
+      s"$shorterServiceName as the driver service's name.")
+    shorterServiceName
+  }
+
+  private val driverPort = kubernetesConf.sparkConf.getInt(
+    "spark.driver.port", DEFAULT_DRIVER_PORT)
+  private val driverBlockManagerPort = kubernetesConf.sparkConf.getInt(
+    org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT)
+
+  override def configurePod(pod: SparkPod): SparkPod = pod
+
+  override def getAdditionalPodSystemProperties(): Map[String, String] = {
+    val driverHostname = s"$resolvedServiceName.${kubernetesConf.namespace()}.svc"
+    Map(DRIVER_HOST_KEY -> driverHostname,
+      "spark.driver.port" -> driverPort.toString,
+      org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key ->
+        driverBlockManagerPort.toString)
+  }
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
+    val driverService = new ServiceBuilder()
+      .withNewMetadata()
+        .withName(resolvedServiceName)
+        .endMetadata()
+      .withNewSpec()
+        .withClusterIP("None")
+        .withSelector(kubernetesConf.roleLabels.asJava)
+        .addNewPort()
+          .withName(DRIVER_PORT_NAME)
+          .withPort(driverPort)
+          .withNewTargetPort(driverPort)
+          .endPort()
+        .addNewPort()
+          .withName(BLOCK_MANAGER_PORT_NAME)
+          .withPort(driverBlockManagerPort)
+          .withNewTargetPort(driverBlockManagerPort)
+          .endPort()
+        .endSpec()
+      .build()
+    Seq(driverService)
+  }
+}
+
+private[spark] object DriverServiceFeatureStep {
+  val DRIVER_BIND_ADDRESS_KEY = org.apache.spark.internal.config.DRIVER_BIND_ADDRESS.key
+  val DRIVER_HOST_KEY = org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key
+  val DRIVER_SVC_POSTFIX = "-driver-svc"
+  val MAX_SERVICE_NAME_LENGTH = 63
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala
new file mode 100644
index 0000000..4c1be3b
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.HasMetadata
+
+import org.apache.spark.deploy.k8s.SparkPod
+
+/**
+ * A collection of functions that together represent a "feature" in pods that are launched for
+ * Spark drivers and executors.
+ */
+private[spark] trait KubernetesFeatureConfigStep {
+
+  /**
+   * Apply modifications on the given pod in accordance to this feature. This can include attaching
+   * volumes, adding environment variables, and adding labels/annotations.
+   * <p>
+   * Note that we should return a SparkPod that keeps all of the properties of the passed SparkPod
+   * object. So this is correct:
+   * <pre>
+   * {@code val configuredPod = new PodBuilder(pod.pod)
+   *     .editSpec()
+   *     ...
+   *     .build()
+   *   val configuredContainer = new ContainerBuilder(pod.container)
+   *     ...
+   *     .build()
+   *   SparkPod(configuredPod, configuredContainer)
+   *  }
+   * </pre>
+   * This is incorrect:
+   * <pre>
+   * {@code val configuredPod = new PodBuilder() // Loses the original state
+   *     .editSpec()
+   *     ...
+   *     .build()
+   *   val configuredContainer = new ContainerBuilder() // Loses the original state
+   *     ...
+   *     .build()
+   *   SparkPod(configuredPod, configuredContainer)
+   *  }
+   * </pre>
+   */
+  def configurePod(pod: SparkPod): SparkPod
+
+  /**
+   * Return any system properties that should be set on the JVM in accordance to this feature.
+   */
+  def getAdditionalPodSystemProperties(): Map[String, String]
+
+  /**
+   * Return any additional Kubernetes resources that should be added to support this feature. Only
+   * applicable when creating the driver in cluster mode.
+   */
+  def getAdditionalKubernetesResources(): Seq[HasMetadata]
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStep.scala
new file mode 100644
index 0000000..97fa949
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStep.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMountBuilder}
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod}
+
+private[spark] class MountSecretsFeatureStep(
+    kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf])
+  extends KubernetesFeatureConfigStep {
+  override def configurePod(pod: SparkPod): SparkPod = {
+    val addedVolumes = kubernetesConf
+      .roleSecretNamesToMountPaths
+      .keys
+      .map(secretName =>
+        new VolumeBuilder()
+          .withName(secretVolumeName(secretName))
+          .withNewSecret()
+            .withSecretName(secretName)
+            .endSecret()
+          .build())
+    val podWithVolumes = new PodBuilder(pod.pod)
+      .editOrNewSpec()
+        .addToVolumes(addedVolumes.toSeq: _*)
+        .endSpec()
+      .build()
+    val addedVolumeMounts = kubernetesConf
+      .roleSecretNamesToMountPaths
+      .map {
+        case (secretName, mountPath) =>
+          new VolumeMountBuilder()
+            .withName(secretVolumeName(secretName))
+            .withMountPath(mountPath)
+            .build()
+      }
+    val containerWithMounts = new ContainerBuilder(pod.container)
+      .addToVolumeMounts(addedVolumeMounts.toSeq: _*)
+      .build()
+    SparkPod(podWithVolumes, containerWithMounts)
+  }
+
+  override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
+
+  private def secretVolumeName(secretName: String): String = s"$secretName-volume"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala
deleted file mode 100644
index b4d3f04..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit
-
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.steps._
-import org.apache.spark.launcher.SparkLauncher
-import org.apache.spark.util.SystemClock
-import org.apache.spark.util.Utils
-
-/**
- * Figures out and returns the complete ordered list of needed DriverConfigurationSteps to
- * configure the Spark driver pod. The returned steps will be applied one by one in the given
- * order to produce a final KubernetesDriverSpec that is used in KubernetesClientApplication
- * to construct and create the driver pod.
- */
-private[spark] class DriverConfigOrchestrator(
-    kubernetesAppId: String,
-    kubernetesResourceNamePrefix: String,
-    mainAppResource: Option[MainAppResource],
-    appName: String,
-    mainClass: String,
-    appArgs: Array[String],
-    sparkConf: SparkConf) {
-
-  // The resource name prefix is derived from the Spark application name, making it easy to connect
-  // the names of the Kubernetes resources from e.g. kubectl or the Kubernetes dashboard to the
-  // application the user submitted.
-
-  private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
-
-  def getAllConfigurationSteps: Seq[DriverConfigurationStep] = {
-    val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf,
-      KUBERNETES_DRIVER_LABEL_PREFIX)
-    require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " +
-      s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " +
-      "operations.")
-    require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with key " +
-      s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " +
-      "operations.")
-
-    val secretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf,
-      KUBERNETES_DRIVER_SECRETS_PREFIX)
-
-    val allDriverLabels = driverCustomLabels ++ Map(
-      SPARK_APP_ID_LABEL -> kubernetesAppId,
-      SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
-
-    val initialSubmissionStep = new BasicDriverConfigurationStep(
-      kubernetesAppId,
-      kubernetesResourceNamePrefix,
-      allDriverLabels,
-      imagePullPolicy,
-      appName,
-      mainClass,
-      appArgs,
-      sparkConf)
-
-    val serviceBootstrapStep = new DriverServiceBootstrapStep(
-      kubernetesResourceNamePrefix,
-      allDriverLabels,
-      sparkConf,
-      new SystemClock)
-
-    val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
-      sparkConf, kubernetesResourceNamePrefix)
-
-    val additionalMainAppJar = if (mainAppResource.nonEmpty) {
-       val mayBeResource = mainAppResource.get match {
-        case JavaMainAppResource(resource) if resource != SparkLauncher.NO_RESOURCE =>
-          Some(resource)
-        case _ => None
-      }
-      mayBeResource
-    } else {
-      None
-    }
-
-    val sparkJars = sparkConf.getOption("spark.jars")
-      .map(_.split(","))
-      .getOrElse(Array.empty[String]) ++
-      additionalMainAppJar.toSeq
-    val sparkFiles = sparkConf.getOption("spark.files")
-      .map(_.split(","))
-      .getOrElse(Array.empty[String])
-
-    // TODO(SPARK-23153): remove once submission client local dependencies are supported.
-    if (existSubmissionLocalFiles(sparkJars) || existSubmissionLocalFiles(sparkFiles)) {
-      throw new SparkException("The Kubernetes mode does not yet support referencing application " +
-        "dependencies in the local file system.")
-    }
-
-    val dependencyResolutionStep = if (sparkJars.nonEmpty || sparkFiles.nonEmpty) {
-      Seq(new DependencyResolutionStep(
-        sparkJars,
-        sparkFiles))
-    } else {
-      Nil
-    }
-
-    val mountSecretsStep = if (secretNamesToMountPaths.nonEmpty) {
-      Seq(new DriverMountSecretsStep(new MountSecretsBootstrap(secretNamesToMountPaths)))
-    } else {
-      Nil
-    }
-
-    Seq(
-      initialSubmissionStep,
-      serviceBootstrapStep,
-      kubernetesCredentialsStep) ++
-      dependencyResolutionStep ++
-      mountSecretsStep
-  }
-
-  private def existSubmissionLocalFiles(files: Seq[String]): Boolean = {
-    files.exists { uri =>
-      Utils.resolveURI(uri).getScheme == "file"
-    }
-  }
-
-  private def existNonContainerLocalFiles(files: Seq[String]): Boolean = {
-    files.exists { uri =>
-      Utils.resolveURI(uri).getScheme != "local"
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index e16d1ad..a97f565 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -27,12 +27,10 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkKubernetesClientFactory}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
-import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.ConfigBuilder
 import org.apache.spark.util.Utils
 
 /**
@@ -43,9 +41,9 @@ import org.apache.spark.util.Utils
  * @param driverArgs arguments to the driver
  */
 private[spark] case class ClientArguments(
-     mainAppResource: Option[MainAppResource],
-     mainClass: String,
-     driverArgs: Array[String])
+    mainAppResource: Option[MainAppResource],
+    mainClass: String,
+    driverArgs: Array[String])
 
 private[spark] object ClientArguments {
 
@@ -80,8 +78,9 @@ private[spark] object ClientArguments {
  * watcher that monitors and logs the application status. Waits for the application to terminate if
  * spark.kubernetes.submission.waitAppCompletion is true.
  *
- * @param submissionSteps steps that collectively configure the driver
- * @param sparkConf the submission client Spark configuration
+ * @param builder Responsible for building the base driver pod based on a composition of
+ *                implemented features.
+ * @param kubernetesConf application configuration
  * @param kubernetesClient the client to talk to the Kubernetes API server
  * @param waitForAppCompletion a flag indicating whether the client should wait for the application
  *                             to complete
@@ -89,31 +88,21 @@ private[spark] object ClientArguments {
  * @param watcher a watcher that monitors and logs the application status
  */
 private[spark] class Client(
-    submissionSteps: Seq[DriverConfigurationStep],
-    sparkConf: SparkConf,
+    builder: KubernetesDriverBuilder,
+    kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf],
     kubernetesClient: KubernetesClient,
     waitForAppCompletion: Boolean,
     appName: String,
     watcher: LoggingPodStatusWatcher,
     kubernetesResourceNamePrefix: String) extends Logging {
 
-   /**
-    * Run command that initializes a DriverSpec that will be updated after each
-    * DriverConfigurationStep in the sequence that is passed in. The final KubernetesDriverSpec
-    * will be used to build the Driver Container, Driver Pod, and Kubernetes Resources
-    */
   def run(): Unit = {
-    var currentDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf)
-    // submissionSteps contain steps necessary to take, to resolve varying
-    // client arguments that are passed in, created by orchestrator
-    for (nextStep <- submissionSteps) {
-      currentDriverSpec = nextStep.configureDriver(currentDriverSpec)
-    }
+    val resolvedDriverSpec = builder.buildFromFeatures(kubernetesConf)
     val configMapName = s"$kubernetesResourceNamePrefix-driver-conf-map"
-    val configMap = buildConfigMap(configMapName, currentDriverSpec.driverSparkConf)
+    val configMap = buildConfigMap(configMapName, resolvedDriverSpec.systemProperties)
     // The include of the ENV_VAR for "SPARK_CONF_DIR" is to allow for the
     // Spark command builder to pickup on the Java Options present in the ConfigMap
-    val resolvedDriverContainer = new ContainerBuilder(currentDriverSpec.driverContainer)
+    val resolvedDriverContainer = new ContainerBuilder(resolvedDriverSpec.pod.container)
       .addNewEnv()
         .withName(ENV_SPARK_CONF_DIR)
         .withValue(SPARK_CONF_DIR_INTERNAL)
@@ -123,7 +112,7 @@ private[spark] class Client(
         .withMountPath(SPARK_CONF_DIR_INTERNAL)
         .endVolumeMount()
       .build()
-    val resolvedDriverPod = new PodBuilder(currentDriverSpec.driverPod)
+    val resolvedDriverPod = new PodBuilder(resolvedDriverSpec.pod.pod)
       .editSpec()
         .addToContainers(resolvedDriverContainer)
         .addNewVolume()
@@ -141,12 +130,10 @@ private[spark] class Client(
         .watch(watcher)) { _ =>
       val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
       try {
-        if (currentDriverSpec.otherKubernetesResources.nonEmpty) {
-          val otherKubernetesResources =
-            currentDriverSpec.otherKubernetesResources ++ Seq(configMap)
-          addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
-          kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
-        }
+        val otherKubernetesResources =
+          resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
+        addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
+        kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
       } catch {
         case NonFatal(e) =>
           kubernetesClient.pods().delete(createdDriverPod)
@@ -180,20 +167,17 @@ private[spark] class Client(
   }
 
   // Build a Config Map that will house spark conf properties in a single file for spark-submit
-  private def buildConfigMap(configMapName: String, conf: SparkConf): ConfigMap = {
+  private def buildConfigMap(configMapName: String, conf: Map[String, String]): ConfigMap = {
     val properties = new Properties()
-    conf.getAll.foreach { case (k, v) =>
+    conf.foreach { case (k, v) =>
       properties.setProperty(k, v)
     }
     val propertiesWriter = new StringWriter()
     properties.store(propertiesWriter,
       s"Java properties built from Kubernetes config map with name: $configMapName")
-
-    val namespace = conf.get(KUBERNETES_NAMESPACE)
     new ConfigMapBuilder()
       .withNewMetadata()
         .withName(configMapName)
-        .withNamespace(namespace)
         .endMetadata()
       .addToData(SPARK_CONF_FILE_NAME, propertiesWriter.toString)
       .build()
@@ -211,7 +195,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
   }
 
   private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = {
-    val namespace = sparkConf.get(KUBERNETES_NAMESPACE)
+    val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
     // For constructing the app ID, we can't use the Spark application name, as the app ID is going
     // to be added as a label to group resources belonging to the same application. Label values are
     // considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate
@@ -219,10 +203,19 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
     val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
     val launchTime = System.currentTimeMillis()
     val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
-    val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
     val kubernetesResourceNamePrefix = {
       s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
     }
+    val kubernetesConf = KubernetesConf.createDriverConf(
+      sparkConf,
+      appName,
+      kubernetesResourceNamePrefix,
+      kubernetesAppId,
+      clientArguments.mainAppResource,
+      clientArguments.mainClass,
+      clientArguments.driverArgs)
+    val builder = new KubernetesDriverBuilder
+    val namespace = kubernetesConf.namespace()
     // The master URL has been checked for validity already in SparkSubmit.
     // We just need to get rid of the "k8s://" prefix here.
     val master = sparkConf.get("spark.master").substring("k8s://".length)
@@ -230,15 +223,6 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
 
     val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval)
 
-    val orchestrator = new DriverConfigOrchestrator(
-      kubernetesAppId,
-      kubernetesResourceNamePrefix,
-      clientArguments.mainAppResource,
-      appName,
-      clientArguments.mainClass,
-      clientArguments.driverArgs,
-      sparkConf)
-
     Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
       master,
       Some(namespace),
@@ -247,8 +231,8 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
       None,
       None)) { kubernetesClient =>
         val client = new Client(
-          orchestrator.getAllConfigurationSteps,
-          sparkConf,
+          builder,
+          kubernetesConf,
           kubernetesClient,
           waitForAppCompletion,
           appName,

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
new file mode 100644
index 0000000..c7579ed
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf}
+import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, MountSecretsFeatureStep}
+
+private[spark] class KubernetesDriverBuilder(
+    provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => BasicDriverFeatureStep =
+      new BasicDriverFeatureStep(_),
+    provideCredentialsStep: (KubernetesConf[KubernetesDriverSpecificConf])
+      => DriverKubernetesCredentialsFeatureStep =
+      new DriverKubernetesCredentialsFeatureStep(_),
+    provideServiceStep: (KubernetesConf[KubernetesDriverSpecificConf]) => DriverServiceFeatureStep =
+      new DriverServiceFeatureStep(_),
+    provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
+      => MountSecretsFeatureStep) =
+      new MountSecretsFeatureStep(_)) {
+
+  def buildFromFeatures(
+    kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = {
+    val baseFeatures = Seq(
+      provideBasicStep(kubernetesConf),
+      provideCredentialsStep(kubernetesConf),
+      provideServiceStep(kubernetesConf))
+    val allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
+      baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
+    } else baseFeatures
+
+    var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap)
+    for (feature <- allFeatures) {
+      val configuredPod = feature.configurePod(spec.pod)
+      val addedSystemProperties = feature.getAdditionalPodSystemProperties()
+      val addedResources = feature.getAdditionalKubernetesResources()
+      spec = KubernetesDriverSpec(
+        configuredPod,
+        spec.driverKubernetesResources ++ addedResources,
+        spec.systemProperties ++ addedSystemProperties)
+    }
+    spec
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverSpec.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverSpec.scala
deleted file mode 100644
index db13f09..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverSpec.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit
-
-import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, HasMetadata, Pod, PodBuilder}
-
-import org.apache.spark.SparkConf
-
-/**
- * Represents the components and characteristics of a Spark driver. The driver can be considered
- * as being comprised of the driver pod itself, any other Kubernetes resources that the driver
- * pod depends on, and the SparkConf that should be supplied to the Spark application. The driver
- * container should be operated on via the specific field of this case class as opposed to trying
- * to edit the container directly on the pod. The driver container should be attached at the
- * end of executing all submission steps.
- */
-private[spark] case class KubernetesDriverSpec(
-    driverPod: Pod,
-    driverContainer: Container,
-    otherKubernetesResources: Seq[HasMetadata],
-    driverSparkConf: SparkConf)
-
-private[spark] object KubernetesDriverSpec {
-  def initialSpec(initialSparkConf: SparkConf): KubernetesDriverSpec = {
-    KubernetesDriverSpec(
-      // Set new metadata and a new spec so that submission steps can use
-      // PodBuilder#editMetadata() and/or PodBuilder#editSpec() safely.
-      new PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build(),
-      new ContainerBuilder().build(),
-      Seq.empty[HasMetadata],
-      initialSparkConf.clone())
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
deleted file mode 100644
index fcb1db8..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model._
-
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.KubernetesUtils
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-import org.apache.spark.internal.config.{DRIVER_CLASS_PATH, DRIVER_MEMORY, DRIVER_MEMORY_OVERHEAD}
-import org.apache.spark.launcher.SparkLauncher
-
-/**
- * Performs basic configuration for the driver pod.
- */
-private[spark] class BasicDriverConfigurationStep(
-    kubernetesAppId: String,
-    resourceNamePrefix: String,
-    driverLabels: Map[String, String],
-    imagePullPolicy: String,
-    appName: String,
-    mainClass: String,
-    appArgs: Array[String],
-    sparkConf: SparkConf) extends DriverConfigurationStep {
-
-  private val driverPodName = sparkConf
-    .get(KUBERNETES_DRIVER_POD_NAME)
-    .getOrElse(s"$resourceNamePrefix-driver")
-
-  private val driverExtraClasspath = sparkConf.get(DRIVER_CLASS_PATH)
-
-  private val driverContainerImage = sparkConf
-    .get(DRIVER_CONTAINER_IMAGE)
-    .getOrElse(throw new SparkException("Must specify the driver container image"))
-
-  private val imagePullSecrets = sparkConf.get(IMAGE_PULL_SECRETS)
-
-  // CPU settings
-  private val driverCpuCores = sparkConf.getOption("spark.driver.cores").getOrElse("1")
-  private val driverLimitCores = sparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
-
-  // Memory settings
-  private val driverMemoryMiB = sparkConf.get(DRIVER_MEMORY)
-  private val memoryOverheadMiB = sparkConf
-    .get(DRIVER_MEMORY_OVERHEAD)
-    .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB))
-  private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB
-
-  override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
-    val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
-      new EnvVarBuilder()
-        .withName(ENV_CLASSPATH)
-        .withValue(classPath)
-        .build()
-    }
-
-    val driverCustomAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX)
-    require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION),
-      s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" +
-        " Spark bookkeeping operations.")
-
-    val driverCustomEnvs = sparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
-      .map { env =>
-        new EnvVarBuilder()
-          .withName(env._1)
-          .withValue(env._2)
-          .build()
-      }
-
-    val driverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName)
-
-    val nodeSelector = KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)
-
-    val driverCpuQuantity = new QuantityBuilder(false)
-      .withAmount(driverCpuCores)
-      .build()
-    val driverMemoryQuantity = new QuantityBuilder(false)
-      .withAmount(s"${driverMemoryWithOverheadMiB}Mi")
-      .build()
-    val maybeCpuLimitQuantity = driverLimitCores.map { limitCores =>
-      ("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
-    }
-
-    val driverContainerWithoutArgs = new ContainerBuilder(driverSpec.driverContainer)
-      .withName(DRIVER_CONTAINER_NAME)
-      .withImage(driverContainerImage)
-      .withImagePullPolicy(imagePullPolicy)
-      .addAllToEnv(driverCustomEnvs.asJava)
-      .addToEnv(driverExtraClasspathEnv.toSeq: _*)
-      .addNewEnv()
-        .withName(ENV_DRIVER_BIND_ADDRESS)
-        .withValueFrom(new EnvVarSourceBuilder()
-          .withNewFieldRef("v1", "status.podIP")
-          .build())
-        .endEnv()
-      .withNewResources()
-        .addToRequests("cpu", driverCpuQuantity)
-        .addToRequests("memory", driverMemoryQuantity)
-        .addToLimits("memory", driverMemoryQuantity)
-        .addToLimits(maybeCpuLimitQuantity.toMap.asJava)
-        .endResources()
-      .addToArgs("driver")
-      .addToArgs("--properties-file", SPARK_CONF_PATH)
-      .addToArgs("--class", mainClass)
-      // The user application jar is merged into the spark.jars list and managed through that
-      // property, so there is no need to reference it explicitly here.
-      .addToArgs(SparkLauncher.NO_RESOURCE)
-
-    val driverContainer = appArgs.toList match {
-      case "" :: Nil | Nil => driverContainerWithoutArgs.build()
-      case _ => driverContainerWithoutArgs.addToArgs(appArgs: _*).build()
-    }
-
-    val parsedImagePullSecrets = KubernetesUtils.parseImagePullSecrets(imagePullSecrets)
-
-    val baseDriverPod = new PodBuilder(driverSpec.driverPod)
-      .editOrNewMetadata()
-        .withName(driverPodName)
-        .addToLabels(driverLabels.asJava)
-        .addToAnnotations(driverAnnotations.asJava)
-      .endMetadata()
-      .withNewSpec()
-        .withRestartPolicy("Never")
-        .withNodeSelector(nodeSelector.asJava)
-        .withImagePullSecrets(parsedImagePullSecrets.asJava)
-        .endSpec()
-      .build()
-
-    val resolvedSparkConf = driverSpec.driverSparkConf.clone()
-      .setIfMissing(KUBERNETES_DRIVER_POD_NAME, driverPodName)
-      .set("spark.app.id", kubernetesAppId)
-      .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, resourceNamePrefix)
-      // to set the config variables to allow client-mode spark-submit from driver
-      .set(KUBERNETES_DRIVER_SUBMIT_CHECK, true)
-
-    driverSpec.copy(
-      driverPod = baseDriverPod,
-      driverSparkConf = resolvedSparkConf,
-      driverContainer = driverContainer)
-  }
-
-}
-


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/3] spark git commit: [SPARK-22839][K8S] Refactor to unify driver and executor pod builder APIs

Posted by fo...@apache.org.
http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
deleted file mode 100644
index 43de329..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import java.io.File
-
-import io.fabric8.kubernetes.api.model.ContainerBuilder
-
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.KubernetesUtils
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-
-/**
- * Step that configures the classpath, spark.jars, and spark.files for the driver given that the
- * user may provide remote files or files with local:// schemes.
- */
-private[spark] class DependencyResolutionStep(
-    sparkJars: Seq[String],
-    sparkFiles: Seq[String]) extends DriverConfigurationStep {
-
-  override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
-    val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath(sparkJars)
-    val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath(sparkFiles)
-
-    val sparkConf = driverSpec.driverSparkConf.clone()
-    if (resolvedSparkJars.nonEmpty) {
-      sparkConf.set("spark.jars", resolvedSparkJars.mkString(","))
-    }
-    if (resolvedSparkFiles.nonEmpty) {
-      sparkConf.set("spark.files", resolvedSparkFiles.mkString(","))
-    }
-    val resolvedDriverContainer = if (resolvedSparkJars.nonEmpty) {
-      new ContainerBuilder(driverSpec.driverContainer)
-        .addNewEnv()
-          .withName(ENV_MOUNTED_CLASSPATH)
-          .withValue(resolvedSparkJars.mkString(File.pathSeparator))
-          .endEnv()
-        .build()
-    } else {
-      driverSpec.driverContainer
-    }
-
-    driverSpec.copy(
-      driverContainer = resolvedDriverContainer,
-      driverSparkConf = sparkConf)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala
deleted file mode 100644
index 17614e0..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-
-/**
- * Represents a step in configuring the Spark driver pod.
- */
-private[spark] trait DriverConfigurationStep {
-
-  /**
-   * Apply some transformation to the previous state of the driver to add a new feature to it.
-   */
-  def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
deleted file mode 100644
index 2424e63..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import java.io.File
-import java.nio.charset.StandardCharsets
-
-import scala.collection.JavaConverters._
-import scala.language.implicitConversions
-
-import com.google.common.io.{BaseEncoding, Files}
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-
-/**
- * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials
- * to request executors.
- */
-private[spark] class DriverKubernetesCredentialsStep(
-    submissionSparkConf: SparkConf,
-    kubernetesResourceNamePrefix: String) extends DriverConfigurationStep {
-
-  private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption(
-    s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX")
-  private val maybeMountedClientKeyFile = submissionSparkConf.getOption(
-    s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX")
-  private val maybeMountedClientCertFile = submissionSparkConf.getOption(
-    s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX")
-  private val maybeMountedCaCertFile = submissionSparkConf.getOption(
-    s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")
-  private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
-
-  override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
-    val driverSparkConf = driverSpec.driverSparkConf.clone()
-
-    val oauthTokenBase64 = submissionSparkConf
-      .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX")
-      .map { token =>
-        BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8))
-      }
-    val caCertDataBase64 = safeFileConfToBase64(
-      s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
-      "Driver CA cert file")
-    val clientKeyDataBase64 = safeFileConfToBase64(
-      s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
-      "Driver client key file")
-    val clientCertDataBase64 = safeFileConfToBase64(
-      s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
-      "Driver client cert file")
-
-    val driverSparkConfWithCredentialsLocations = setDriverPodKubernetesCredentialLocations(
-      driverSparkConf,
-      oauthTokenBase64,
-      caCertDataBase64,
-      clientKeyDataBase64,
-      clientCertDataBase64)
-
-    val kubernetesCredentialsSecret = createCredentialsSecret(
-      oauthTokenBase64,
-      caCertDataBase64,
-      clientKeyDataBase64,
-      clientCertDataBase64)
-
-    val driverPodWithMountedKubernetesCredentials = kubernetesCredentialsSecret.map { secret =>
-      new PodBuilder(driverSpec.driverPod)
-        .editOrNewSpec()
-          .addNewVolume()
-            .withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
-            .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret()
-            .endVolume()
-          .endSpec()
-        .build()
-    }.getOrElse(
-      driverServiceAccount.map { account =>
-        new PodBuilder(driverSpec.driverPod)
-          .editOrNewSpec()
-          .withServiceAccount(account)
-          .withServiceAccountName(account)
-          .endSpec()
-          .build()
-      }.getOrElse(driverSpec.driverPod)
-    )
-
-    val driverContainerWithMountedSecretVolume = kubernetesCredentialsSecret.map { _ =>
-      new ContainerBuilder(driverSpec.driverContainer)
-        .addNewVolumeMount()
-          .withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
-          .withMountPath(DRIVER_CREDENTIALS_SECRETS_BASE_DIR)
-          .endVolumeMount()
-        .build()
-    }.getOrElse(driverSpec.driverContainer)
-
-    driverSpec.copy(
-      driverPod = driverPodWithMountedKubernetesCredentials,
-      otherKubernetesResources =
-        driverSpec.otherKubernetesResources ++ kubernetesCredentialsSecret.toSeq,
-      driverSparkConf = driverSparkConfWithCredentialsLocations,
-      driverContainer = driverContainerWithMountedSecretVolume)
-  }
-
-  private def createCredentialsSecret(
-      driverOAuthTokenBase64: Option[String],
-      driverCaCertDataBase64: Option[String],
-      driverClientKeyDataBase64: Option[String],
-      driverClientCertDataBase64: Option[String]): Option[Secret] = {
-    val allSecretData =
-      resolveSecretData(
-        driverClientKeyDataBase64,
-        DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME) ++
-      resolveSecretData(
-        driverClientCertDataBase64,
-        DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME) ++
-      resolveSecretData(
-        driverCaCertDataBase64,
-        DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME) ++
-      resolveSecretData(
-        driverOAuthTokenBase64,
-        DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME)
-
-    if (allSecretData.isEmpty) {
-      None
-    } else {
-      Some(new SecretBuilder()
-        .withNewMetadata()
-          .withName(s"$kubernetesResourceNamePrefix-kubernetes-credentials")
-          .endMetadata()
-        .withData(allSecretData.asJava)
-        .build())
-    }
-  }
-
-  private def setDriverPodKubernetesCredentialLocations(
-      driverSparkConf: SparkConf,
-      driverOauthTokenBase64: Option[String],
-      driverCaCertDataBase64: Option[String],
-      driverClientKeyDataBase64: Option[String],
-      driverClientCertDataBase64: Option[String]): SparkConf = {
-    val resolvedMountedOAuthTokenFile = resolveSecretLocation(
-      maybeMountedOAuthTokenFile,
-      driverOauthTokenBase64,
-      DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH)
-    val resolvedMountedClientKeyFile = resolveSecretLocation(
-      maybeMountedClientKeyFile,
-      driverClientKeyDataBase64,
-      DRIVER_CREDENTIALS_CLIENT_KEY_PATH)
-    val resolvedMountedClientCertFile = resolveSecretLocation(
-      maybeMountedClientCertFile,
-      driverClientCertDataBase64,
-      DRIVER_CREDENTIALS_CLIENT_CERT_PATH)
-    val resolvedMountedCaCertFile = resolveSecretLocation(
-      maybeMountedCaCertFile,
-      driverCaCertDataBase64,
-      DRIVER_CREDENTIALS_CA_CERT_PATH)
-
-    val sparkConfWithCredentialLocations = driverSparkConf
-      .setOption(
-        s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
-        resolvedMountedCaCertFile)
-      .setOption(
-        s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
-        resolvedMountedClientKeyFile)
-      .setOption(
-        s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
-        resolvedMountedClientCertFile)
-      .setOption(
-        s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX",
-        resolvedMountedOAuthTokenFile)
-
-    // Redact all OAuth token values
-    sparkConfWithCredentialLocations
-      .getAll
-      .filter(_._1.endsWith(OAUTH_TOKEN_CONF_SUFFIX)).map(_._1)
-      .foreach {
-        sparkConfWithCredentialLocations.set(_, "<present_but_redacted>")
-      }
-    sparkConfWithCredentialLocations
-  }
-
-  private def safeFileConfToBase64(conf: String, fileType: String): Option[String] = {
-    submissionSparkConf.getOption(conf)
-      .map(new File(_))
-      .map { file =>
-        require(file.isFile, String.format("%s provided at %s does not exist or is not a file.",
-          fileType, file.getAbsolutePath))
-        BaseEncoding.base64().encode(Files.toByteArray(file))
-      }
-  }
-
-  private def resolveSecretLocation(
-      mountedUserSpecified: Option[String],
-      valueMountedFromSubmitter: Option[String],
-      mountedCanonicalLocation: String): Option[String] = {
-    mountedUserSpecified.orElse(valueMountedFromSubmitter.map { _ =>
-      mountedCanonicalLocation
-    })
-  }
-
-  /**
-   * Resolve a Kubernetes secret data entry from an optional client credential used by the
-   * driver to talk to the Kubernetes API server.
-   *
-   * @param userSpecifiedCredential the optional user-specified client credential.
-   * @param secretName name of the Kubernetes secret storing the client credential.
-   * @return a secret data entry in the form of a map from the secret name to the secret data,
-   *         which may be empty if the user-specified credential is empty.
-   */
-  private def resolveSecretData(
-      userSpecifiedCredential: Option[String],
-      secretName: String): Map[String, String] = {
-    userSpecifiedCredential.map { valueBase64 =>
-      Map(secretName -> valueBase64)
-    }.getOrElse(Map.empty[String, String])
-  }
-
-  private implicit def augmentSparkConf(sparkConf: SparkConf): OptionSettableSparkConf = {
-    new OptionSettableSparkConf(sparkConf)
-  }
-}
-
-private class OptionSettableSparkConf(sparkConf: SparkConf) {
-  def setOption(configEntry: String, option: Option[String]): SparkConf = {
-    option.foreach { opt =>
-      sparkConf.set(configEntry, opt)
-    }
-    sparkConf
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStep.scala
deleted file mode 100644
index 91e9a9f..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStep.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import org.apache.spark.deploy.k8s.MountSecretsBootstrap
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-
-/**
- * A driver configuration step for mounting user-specified secrets onto user-specified paths.
- *
- * @param bootstrap a utility actually handling mounting of the secrets.
- */
-private[spark] class DriverMountSecretsStep(
-    bootstrap: MountSecretsBootstrap) extends DriverConfigurationStep {
-
-  override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
-    val pod = bootstrap.addSecretVolumes(driverSpec.driverPod)
-    val container = bootstrap.mountSecrets(driverSpec.driverContainer)
-    driverSpec.copy(
-      driverPod = pod,
-      driverContainer = container
-    )
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
deleted file mode 100644
index 34af7cd..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model.ServiceBuilder
-
-import org.apache.spark.SparkConf
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.Clock
-
-/**
- * Allows the driver to be reachable by executor pods through a headless service. The service's
- * ports should correspond to the ports that the executor will reach the pod at for RPC.
- */
-private[spark] class DriverServiceBootstrapStep(
-    resourceNamePrefix: String,
-    driverLabels: Map[String, String],
-    sparkConf: SparkConf,
-    clock: Clock) extends DriverConfigurationStep with Logging {
-
-  import DriverServiceBootstrapStep._
-
-  override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
-    require(sparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
-      s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's bind " +
-      "address is managed and set to the driver pod's IP address.")
-    require(sparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
-      s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be " +
-      "managed via a Kubernetes service.")
-
-    val preferredServiceName = s"$resourceNamePrefix$DRIVER_SVC_POSTFIX"
-    val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
-      preferredServiceName
-    } else {
-      val randomServiceId = clock.getTimeMillis()
-      val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
-      logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
-        s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
-        s"$shorterServiceName as the driver service's name.")
-      shorterServiceName
-    }
-
-    val driverPort = sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT)
-    val driverBlockManagerPort = sparkConf.getInt(
-        org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT)
-    val driverService = new ServiceBuilder()
-      .withNewMetadata()
-        .withName(resolvedServiceName)
-        .endMetadata()
-      .withNewSpec()
-        .withClusterIP("None")
-        .withSelector(driverLabels.asJava)
-        .addNewPort()
-          .withName(DRIVER_PORT_NAME)
-          .withPort(driverPort)
-          .withNewTargetPort(driverPort)
-          .endPort()
-        .addNewPort()
-          .withName(BLOCK_MANAGER_PORT_NAME)
-          .withPort(driverBlockManagerPort)
-          .withNewTargetPort(driverBlockManagerPort)
-          .endPort()
-        .endSpec()
-      .build()
-
-    val namespace = sparkConf.get(KUBERNETES_NAMESPACE)
-    val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc"
-    val resolvedSparkConf = driverSpec.driverSparkConf.clone()
-      .set(DRIVER_HOST_KEY, driverHostname)
-      .set("spark.driver.port", driverPort.toString)
-      .set(
-        org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, driverBlockManagerPort)
-
-    driverSpec.copy(
-      driverSparkConf = resolvedSparkConf,
-      otherKubernetesResources = driverSpec.otherKubernetesResources ++ Seq(driverService))
-  }
-}
-
-private[spark] object DriverServiceBootstrapStep {
-  val DRIVER_BIND_ADDRESS_KEY = org.apache.spark.internal.config.DRIVER_BIND_ADDRESS.key
-  val DRIVER_HOST_KEY = org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key
-  val DRIVER_SVC_POSTFIX = "-driver-svc"
-  val MAX_SERVICE_NAME_LENGTH = 63
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
deleted file mode 100644
index 8607d6f..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.scheduler.cluster.k8s
-
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model._
-
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
-import org.apache.spark.util.Utils
-
-/**
- * A factory class for bootstrapping and creating executor pods with the given bootstrapping
- * components.
- *
- * @param sparkConf Spark configuration
- * @param mountSecretsBootstrap an optional component for mounting user-specified secrets onto
- *                              user-specified paths into the executor container
- */
-private[spark] class ExecutorPodFactory(
-    sparkConf: SparkConf,
-    mountSecretsBootstrap: Option[MountSecretsBootstrap]) {
-
-  private val executorExtraClasspath = sparkConf.get(EXECUTOR_CLASS_PATH)
-
-  private val executorLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
-    sparkConf,
-    KUBERNETES_EXECUTOR_LABEL_PREFIX)
-  require(
-    !executorLabels.contains(SPARK_APP_ID_LABEL),
-    s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.")
-  require(
-    !executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
-    s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" +
-      " Spark.")
-  require(
-    !executorLabels.contains(SPARK_ROLE_LABEL),
-    s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.")
-
-  private val executorAnnotations =
-    KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf,
-      KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
-  private val nodeSelector =
-    KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf,
-      KUBERNETES_NODE_SELECTOR_PREFIX)
-
-  private val executorContainerImage = sparkConf
-    .get(EXECUTOR_CONTAINER_IMAGE)
-    .getOrElse(throw new SparkException("Must specify the executor container image"))
-  private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
-  private val imagePullSecrets = sparkConf.get(IMAGE_PULL_SECRETS)
-  private val blockManagerPort = sparkConf
-    .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
-
-  private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
-
-  private val executorMemoryMiB = sparkConf.get(EXECUTOR_MEMORY)
-  private val executorMemoryString = sparkConf.get(
-    EXECUTOR_MEMORY.key, EXECUTOR_MEMORY.defaultValueString)
-
-  private val memoryOverheadMiB = sparkConf
-    .get(EXECUTOR_MEMORY_OVERHEAD)
-    .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
-      MEMORY_OVERHEAD_MIN_MIB))
-  private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB
-
-  private val executorCores = sparkConf.getInt("spark.executor.cores", 1)
-  private val executorCoresRequest = if (sparkConf.contains(KUBERNETES_EXECUTOR_REQUEST_CORES)) {
-    sparkConf.get(KUBERNETES_EXECUTOR_REQUEST_CORES).get
-  } else {
-    executorCores.toString
-  }
-  private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
-
-  /**
-   * Configure and construct an executor pod with the given parameters.
-   */
-  def createExecutorPod(
-      executorId: String,
-      applicationId: String,
-      driverUrl: String,
-      executorEnvs: Seq[(String, String)],
-      driverPod: Pod,
-      nodeToLocalTaskCount: Map[String, Int]): Pod = {
-    val name = s"$executorPodNamePrefix-exec-$executorId"
-
-    val parsedImagePullSecrets = KubernetesUtils.parseImagePullSecrets(imagePullSecrets)
-
-    // hostname must be no longer than 63 characters, so take the last 63 characters of the pod
-    // name as the hostname.  This preserves uniqueness since the end of name contains
-    // executorId
-    val hostname = name.substring(Math.max(0, name.length - 63))
-    val resolvedExecutorLabels = Map(
-      SPARK_EXECUTOR_ID_LABEL -> executorId,
-      SPARK_APP_ID_LABEL -> applicationId,
-      SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++
-      executorLabels
-    val executorMemoryQuantity = new QuantityBuilder(false)
-      .withAmount(s"${executorMemoryWithOverhead}Mi")
-      .build()
-    val executorCpuQuantity = new QuantityBuilder(false)
-      .withAmount(executorCoresRequest)
-      .build()
-    val executorExtraClasspathEnv = executorExtraClasspath.map { cp =>
-      new EnvVarBuilder()
-        .withName(ENV_CLASSPATH)
-        .withValue(cp)
-        .build()
-    }
-    val executorExtraJavaOptionsEnv = sparkConf
-      .get(EXECUTOR_JAVA_OPTIONS)
-      .map { opts =>
-        val delimitedOpts = Utils.splitCommandString(opts)
-        delimitedOpts.zipWithIndex.map {
-          case (opt, index) =>
-            new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build()
-        }
-      }.getOrElse(Seq.empty[EnvVar])
-    val executorEnv = (Seq(
-      (ENV_DRIVER_URL, driverUrl),
-      (ENV_EXECUTOR_CORES, executorCores.toString),
-      (ENV_EXECUTOR_MEMORY, executorMemoryString),
-      (ENV_APPLICATION_ID, applicationId),
-      // This is to set the SPARK_CONF_DIR to be /opt/spark/conf
-      (ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL),
-      (ENV_EXECUTOR_ID, executorId)) ++ executorEnvs)
-      .map(env => new EnvVarBuilder()
-        .withName(env._1)
-        .withValue(env._2)
-        .build()
-      ) ++ Seq(
-      new EnvVarBuilder()
-        .withName(ENV_EXECUTOR_POD_IP)
-        .withValueFrom(new EnvVarSourceBuilder()
-          .withNewFieldRef("v1", "status.podIP")
-          .build())
-        .build()
-    ) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq
-    val requiredPorts = Seq(
-      (BLOCK_MANAGER_PORT_NAME, blockManagerPort))
-      .map { case (name, port) =>
-        new ContainerPortBuilder()
-          .withName(name)
-          .withContainerPort(port)
-          .build()
-      }
-
-    val executorContainer = new ContainerBuilder()
-      .withName("executor")
-      .withImage(executorContainerImage)
-      .withImagePullPolicy(imagePullPolicy)
-      .withNewResources()
-        .addToRequests("memory", executorMemoryQuantity)
-        .addToLimits("memory", executorMemoryQuantity)
-        .addToRequests("cpu", executorCpuQuantity)
-        .endResources()
-      .addAllToEnv(executorEnv.asJava)
-      .withPorts(requiredPorts.asJava)
-      .addToArgs("executor")
-      .build()
-
-    val executorPod = new PodBuilder()
-      .withNewMetadata()
-        .withName(name)
-        .withLabels(resolvedExecutorLabels.asJava)
-        .withAnnotations(executorAnnotations.asJava)
-        .withOwnerReferences()
-          .addNewOwnerReference()
-            .withController(true)
-            .withApiVersion(driverPod.getApiVersion)
-            .withKind(driverPod.getKind)
-            .withName(driverPod.getMetadata.getName)
-            .withUid(driverPod.getMetadata.getUid)
-            .endOwnerReference()
-        .endMetadata()
-      .withNewSpec()
-        .withHostname(hostname)
-        .withRestartPolicy("Never")
-        .withNodeSelector(nodeSelector.asJava)
-        .withImagePullSecrets(parsedImagePullSecrets.asJava)
-        .endSpec()
-      .build()
-
-    val containerWithLimitCores = executorLimitCores.map { limitCores =>
-      val executorCpuLimitQuantity = new QuantityBuilder(false)
-        .withAmount(limitCores)
-        .build()
-      new ContainerBuilder(executorContainer)
-        .editResources()
-        .addToLimits("cpu", executorCpuLimitQuantity)
-        .endResources()
-        .build()
-    }.getOrElse(executorContainer)
-
-    val (maybeSecretsMountedPod, maybeSecretsMountedContainer) =
-      mountSecretsBootstrap.map { bootstrap =>
-        (bootstrap.addSecretVolumes(executorPod), bootstrap.mountSecrets(containerWithLimitCores))
-      }.getOrElse((executorPod, containerWithLimitCores))
-
-
-    new PodBuilder(maybeSecretsMountedPod)
-      .editSpec()
-        .addToContainers(maybeSecretsMountedContainer)
-        .endSpec()
-      .build()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index ff5f680..0ea80df 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -21,7 +21,7 @@ import java.io.File
 import io.fabric8.kubernetes.client.Config
 
 import org.apache.spark.{SparkContext, SparkException}
-import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap, SparkKubernetesClientFactory}
+import org.apache.spark.deploy.k8s.{KubernetesUtils, SparkKubernetesClientFactory}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.Logging
@@ -48,12 +48,6 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
       scheduler: TaskScheduler): SchedulerBackend = {
     val executorSecretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
       sc.conf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
-    val mountSecretBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) {
-      Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths))
-    } else {
-      None
-    }
-
     val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
       KUBERNETES_MASTER_INTERNAL_URL,
       Some(sc.conf.get(KUBERNETES_NAMESPACE)),
@@ -62,8 +56,6 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
       Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
       Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
 
-    val executorPodFactory = new ExecutorPodFactory(sc.conf, mountSecretBootstrap)
-
     val allocatorExecutor = ThreadUtils
       .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
     val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(
@@ -71,7 +63,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
     new KubernetesClusterSchedulerBackend(
       scheduler.asInstanceOf[TaskSchedulerImpl],
       sc.env.rpcEnv,
-      executorPodFactory,
+      new KubernetesExecutorBuilder,
       kubernetesClient,
       allocatorExecutor,
       requestExecutorsService)

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index 9de4b16..d86664c 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -32,6 +32,7 @@ import scala.concurrent.{ExecutionContext, Future}
 import org.apache.spark.SparkException
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
 import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
@@ -40,7 +41,7 @@ import org.apache.spark.util.Utils
 private[spark] class KubernetesClusterSchedulerBackend(
     scheduler: TaskSchedulerImpl,
     rpcEnv: RpcEnv,
-    executorPodFactory: ExecutorPodFactory,
+    executorBuilder: KubernetesExecutorBuilder,
     kubernetesClient: KubernetesClient,
     allocatorExecutor: ScheduledExecutorService,
     requestExecutorsService: ExecutorService)
@@ -115,14 +116,19 @@ private[spark] class KubernetesClusterSchedulerBackend(
           for (_ <- 0 until math.min(
             currentTotalExpectedExecutors - runningExecutorsToPods.size, podAllocationSize)) {
             val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString
-            val executorPod = executorPodFactory.createExecutorPod(
+            val executorConf = KubernetesConf.createExecutorConf(
+              conf,
               executorId,
               applicationId(),
-              driverUrl,
-              conf.getExecutorEnv,
-              driverPod,
-              currentNodeToLocalTaskCount)
-            executorsToAllocate(executorId) = executorPod
+              driverPod)
+            val executorPod = executorBuilder.buildFromFeatures(executorConf)
+            val podWithAttachedContainer = new PodBuilder(executorPod.pod)
+              .editOrNewSpec()
+                .addToContainers(executorPod.container)
+                .endSpec()
+              .build()
+
+            executorsToAllocate(executorId) = podWithAttachedContainer
             logInfo(
               s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}")
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
new file mode 100644
index 0000000..22568fe
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, MountSecretsFeatureStep}
+
+private[spark] class KubernetesExecutorBuilder(
+    provideBasicStep: (KubernetesConf[KubernetesExecutorSpecificConf]) => BasicExecutorFeatureStep =
+      new BasicExecutorFeatureStep(_),
+    provideSecretsStep:
+      (KubernetesConf[_ <: KubernetesRoleSpecificConf]) => MountSecretsFeatureStep =
+      new MountSecretsFeatureStep(_)) {
+
+  def buildFromFeatures(
+    kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = {
+    val baseFeatures = Seq(provideBasicStep(kubernetesConf))
+    val allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
+      baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
+    } else baseFeatures
+    var executorPod = SparkPod.initialPod()
+    for (feature <- allFeatures) {
+      executorPod = feature.configurePod(executorPod)
+    }
+    executorPod
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
new file mode 100644
index 0000000..f10202f
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.{LocalObjectReferenceBuilder, PodBuilder}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
+
+class KubernetesConfSuite extends SparkFunSuite {
+
+  private val APP_NAME = "test-app"
+  private val RESOURCE_NAME_PREFIX = "prefix"
+  private val APP_ID = "test-id"
+  private val MAIN_CLASS = "test-class"
+  private val APP_ARGS = Array("arg1", "arg2")
+  private val CUSTOM_LABELS = Map(
+    "customLabel1Key" -> "customLabel1Value",
+    "customLabel2Key" -> "customLabel2Value")
+  private val CUSTOM_ANNOTATIONS = Map(
+    "customAnnotation1Key" -> "customAnnotation1Value",
+    "customAnnotation2Key" -> "customAnnotation2Value")
+  private val SECRET_NAMES_TO_MOUNT_PATHS = Map(
+    "secret1" -> "/mnt/secrets/secret1",
+    "secret2" -> "/mnt/secrets/secret2")
+  private val CUSTOM_ENVS = Map(
+    "customEnvKey1" -> "customEnvValue1",
+    "customEnvKey2" -> "customEnvValue2")
+  private val DRIVER_POD = new PodBuilder().build()
+  private val EXECUTOR_ID = "executor-id"
+
+  test("Basic driver translated fields.") {
+    val sparkConf = new SparkConf(false)
+    val conf = KubernetesConf.createDriverConf(
+      sparkConf,
+      APP_NAME,
+      RESOURCE_NAME_PREFIX,
+      APP_ID,
+      None,
+      MAIN_CLASS,
+      APP_ARGS)
+    assert(conf.appId === APP_ID)
+    assert(conf.sparkConf.getAll.toMap === sparkConf.getAll.toMap)
+    assert(conf.appResourceNamePrefix === RESOURCE_NAME_PREFIX)
+    assert(conf.roleSpecificConf.appName === APP_NAME)
+    assert(conf.roleSpecificConf.mainAppResource.isEmpty)
+    assert(conf.roleSpecificConf.mainClass === MAIN_CLASS)
+    assert(conf.roleSpecificConf.appArgs === APP_ARGS)
+  }
+
+  test("Creating driver conf with and without the main app jar influences spark.jars") {
+    val sparkConf = new SparkConf(false)
+      .setJars(Seq("local:///opt/spark/jar1.jar"))
+    val mainAppJar = Some(JavaMainAppResource("local:///opt/spark/main.jar"))
+    val kubernetesConfWithMainJar = KubernetesConf.createDriverConf(
+      sparkConf,
+      APP_NAME,
+      RESOURCE_NAME_PREFIX,
+      APP_ID,
+      mainAppJar,
+      MAIN_CLASS,
+      APP_ARGS)
+    assert(kubernetesConfWithMainJar.sparkConf.get("spark.jars")
+      .split(",")
+      === Array("local:///opt/spark/jar1.jar", "local:///opt/spark/main.jar"))
+    val kubernetesConfWithoutMainJar = KubernetesConf.createDriverConf(
+      sparkConf,
+      APP_NAME,
+      RESOURCE_NAME_PREFIX,
+      APP_ID,
+      None,
+      MAIN_CLASS,
+      APP_ARGS)
+    assert(kubernetesConfWithoutMainJar.sparkConf.get("spark.jars").split(",")
+      === Array("local:///opt/spark/jar1.jar"))
+  }
+
+  test("Resolve driver labels, annotations, secret mount paths, and envs.") {
+    val sparkConf = new SparkConf(false)
+    CUSTOM_LABELS.foreach { case (key, value) =>
+      sparkConf.set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$key", value)
+    }
+    CUSTOM_ANNOTATIONS.foreach { case (key, value) =>
+      sparkConf.set(s"$KUBERNETES_DRIVER_ANNOTATION_PREFIX$key", value)
+    }
+    SECRET_NAMES_TO_MOUNT_PATHS.foreach { case (key, value) =>
+      sparkConf.set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$key", value)
+    }
+    CUSTOM_ENVS.foreach { case (key, value) =>
+      sparkConf.set(s"$KUBERNETES_DRIVER_ENV_PREFIX$key", value)
+    }
+
+    val conf = KubernetesConf.createDriverConf(
+      sparkConf,
+      APP_NAME,
+      RESOURCE_NAME_PREFIX,
+      APP_ID,
+      None,
+      MAIN_CLASS,
+      APP_ARGS)
+    assert(conf.roleLabels === Map(
+      SPARK_APP_ID_LABEL -> APP_ID,
+      SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++
+      CUSTOM_LABELS)
+    assert(conf.roleAnnotations === CUSTOM_ANNOTATIONS)
+    assert(conf.roleSecretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
+    assert(conf.roleEnvs === CUSTOM_ENVS)
+  }
+
+  test("Basic executor translated fields.") {
+    val conf = KubernetesConf.createExecutorConf(
+      new SparkConf(false),
+      EXECUTOR_ID,
+      APP_ID,
+      DRIVER_POD)
+    assert(conf.roleSpecificConf.executorId === EXECUTOR_ID)
+    assert(conf.roleSpecificConf.driverPod === DRIVER_POD)
+  }
+
+  test("Image pull secrets.") {
+    val conf = KubernetesConf.createExecutorConf(
+      new SparkConf(false)
+        .set(IMAGE_PULL_SECRETS, "my-secret-1,my-secret-2 "),
+      EXECUTOR_ID,
+      APP_ID,
+      DRIVER_POD)
+    assert(conf.imagePullSecrets() ===
+      Seq(
+        new LocalObjectReferenceBuilder().withName("my-secret-1").build(),
+        new LocalObjectReferenceBuilder().withName("my-secret-2").build()))
+  }
+
+  test("Set executor labels, annotations, and secrets") {
+    val sparkConf = new SparkConf(false)
+    CUSTOM_LABELS.foreach { case (key, value) =>
+      sparkConf.set(s"$KUBERNETES_EXECUTOR_LABEL_PREFIX$key", value)
+    }
+    CUSTOM_ANNOTATIONS.foreach { case (key, value) =>
+      sparkConf.set(s"$KUBERNETES_EXECUTOR_ANNOTATION_PREFIX$key", value)
+    }
+    SECRET_NAMES_TO_MOUNT_PATHS.foreach { case (key, value) =>
+      sparkConf.set(s"$KUBERNETES_EXECUTOR_SECRETS_PREFIX$key", value)
+    }
+
+    val conf = KubernetesConf.createExecutorConf(
+      sparkConf,
+      EXECUTOR_ID,
+      APP_ID,
+      DRIVER_POD)
+    assert(conf.roleLabels === Map(
+      SPARK_EXECUTOR_ID_LABEL -> EXECUTOR_ID,
+      SPARK_APP_ID_LABEL -> APP_ID,
+      SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ CUSTOM_LABELS)
+    assert(conf.roleAnnotations === CUSTOM_ANNOTATIONS)
+    assert(conf.roleSecretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsTest.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsTest.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsTest.scala
deleted file mode 100644
index cf41b22..0000000
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsTest.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s
-
-import io.fabric8.kubernetes.api.model.LocalObjectReference
-
-import org.apache.spark.SparkFunSuite
-
-class KubernetesUtilsTest extends SparkFunSuite {
-
-  test("testParseImagePullSecrets") {
-    val noSecrets = KubernetesUtils.parseImagePullSecrets(None)
-    assert(noSecrets === Nil)
-
-    val oneSecret = KubernetesUtils.parseImagePullSecrets(Some("imagePullSecret"))
-    assert(oneSecret === new LocalObjectReference("imagePullSecret") :: Nil)
-
-    val commaSeparatedSecrets = KubernetesUtils.parseImagePullSecrets(Some("s1, s2  , s3,s4"))
-    assert(commaSeparatedSecrets.map(_.getName) === "s1" :: "s2" :: "s3" :: "s4" :: Nil)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
new file mode 100644
index 0000000..eee85b8
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.LocalObjectReferenceBuilder
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+
+class BasicDriverFeatureStepSuite extends SparkFunSuite {
+
+  private val APP_ID = "spark-app-id"
+  private val RESOURCE_NAME_PREFIX = "spark"
+  private val DRIVER_LABELS = Map("labelkey" -> "labelvalue")
+  private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent"
+  private val APP_NAME = "spark-test"
+  private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
+  private val APP_ARGS = Array("arg1", "arg2", "\"arg 3\"")
+  private val CUSTOM_ANNOTATION_KEY = "customAnnotation"
+  private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue"
+  private val DRIVER_ANNOTATIONS = Map(CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE)
+  private val DRIVER_CUSTOM_ENV1 = "customDriverEnv1"
+  private val DRIVER_CUSTOM_ENV2 = "customDriverEnv2"
+  private val DRIVER_ENVS = Map(
+    DRIVER_CUSTOM_ENV1 -> DRIVER_CUSTOM_ENV1,
+    DRIVER_CUSTOM_ENV2 -> DRIVER_CUSTOM_ENV2)
+  private val TEST_IMAGE_PULL_SECRETS = Seq("my-secret-1", "my-secret-2")
+  private val TEST_IMAGE_PULL_SECRET_OBJECTS =
+    TEST_IMAGE_PULL_SECRETS.map { secret =>
+      new LocalObjectReferenceBuilder().withName(secret).build()
+    }
+
+  test("Check the pod respects all configurations from the user.") {
+    val sparkConf = new SparkConf()
+      .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
+      .set("spark.driver.cores", "2")
+      .set(KUBERNETES_DRIVER_LIMIT_CORES, "4")
+      .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "256M")
+      .set(org.apache.spark.internal.config.DRIVER_MEMORY_OVERHEAD, 200L)
+      .set(CONTAINER_IMAGE, "spark-driver:latest")
+      .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS.mkString(","))
+    val kubernetesConf = KubernetesConf(
+      sparkConf,
+      KubernetesDriverSpecificConf(
+        None,
+        APP_NAME,
+        MAIN_CLASS,
+        APP_ARGS),
+      RESOURCE_NAME_PREFIX,
+      APP_ID,
+      DRIVER_LABELS,
+      DRIVER_ANNOTATIONS,
+      Map.empty,
+      DRIVER_ENVS)
+
+    val featureStep = new BasicDriverFeatureStep(kubernetesConf)
+    val basePod = SparkPod.initialPod()
+    val configuredPod = featureStep.configurePod(basePod)
+
+    assert(configuredPod.container.getName === DRIVER_CONTAINER_NAME)
+    assert(configuredPod.container.getImage === "spark-driver:latest")
+    assert(configuredPod.container.getImagePullPolicy === CONTAINER_IMAGE_PULL_POLICY)
+
+    assert(configuredPod.container.getEnv.size === 3)
+    val envs = configuredPod.container
+      .getEnv
+      .asScala
+      .map(env => (env.getName, env.getValue))
+      .toMap
+    assert(envs(DRIVER_CUSTOM_ENV1) === DRIVER_ENVS(DRIVER_CUSTOM_ENV1))
+    assert(envs(DRIVER_CUSTOM_ENV2) === DRIVER_ENVS(DRIVER_CUSTOM_ENV2))
+
+    assert(configuredPod.pod.getSpec().getImagePullSecrets.asScala ===
+      TEST_IMAGE_PULL_SECRET_OBJECTS)
+
+    assert(configuredPod.container.getEnv.asScala.exists(envVar =>
+      envVar.getName.equals(ENV_DRIVER_BIND_ADDRESS) &&
+        envVar.getValueFrom.getFieldRef.getApiVersion.equals("v1") &&
+        envVar.getValueFrom.getFieldRef.getFieldPath.equals("status.podIP")))
+
+    val resourceRequirements = configuredPod.container.getResources
+    val requests = resourceRequirements.getRequests.asScala
+    assert(requests("cpu").getAmount === "2")
+    assert(requests("memory").getAmount === "456Mi")
+    val limits = resourceRequirements.getLimits.asScala
+    assert(limits("memory").getAmount === "456Mi")
+    assert(limits("cpu").getAmount === "4")
+
+    val driverPodMetadata = configuredPod.pod.getMetadata
+    assert(driverPodMetadata.getName === "spark-driver-pod")
+    assert(driverPodMetadata.getLabels.asScala === DRIVER_LABELS)
+    assert(driverPodMetadata.getAnnotations.asScala === DRIVER_ANNOTATIONS)
+    assert(configuredPod.pod.getSpec.getRestartPolicy === "Never")
+
+    val expectedSparkConf = Map(
+      KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
+      "spark.app.id" -> APP_ID,
+      KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX,
+      "spark.kubernetes.submitInDriver" -> "true")
+    assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf)
+  }
+
+  test("Additional system properties resolve jars and set cluster-mode confs.") {
+    val allJars = Seq("local:///opt/spark/jar1.jar", "hdfs:///opt/spark/jar2.jar")
+    val allFiles = Seq("https://localhost:9000/file1.txt", "local:///opt/spark/file2.txt")
+    val sparkConf = new SparkConf()
+      .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
+      .setJars(allJars)
+      .set("spark.files", allFiles.mkString(","))
+      .set(CONTAINER_IMAGE, "spark-driver:latest")
+    val kubernetesConf = KubernetesConf(
+      sparkConf,
+      KubernetesDriverSpecificConf(
+        None,
+        APP_NAME,
+        MAIN_CLASS,
+        APP_ARGS),
+      RESOURCE_NAME_PREFIX,
+      APP_ID,
+      DRIVER_LABELS,
+      DRIVER_ANNOTATIONS,
+      Map.empty,
+      Map.empty)
+    val step = new BasicDriverFeatureStep(kubernetesConf)
+    val additionalProperties = step.getAdditionalPodSystemProperties()
+    val expectedSparkConf = Map(
+      KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
+      "spark.app.id" -> APP_ID,
+      KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX,
+      "spark.kubernetes.submitInDriver" -> "true",
+      "spark.jars" -> "/opt/spark/jar1.jar,hdfs:///opt/spark/jar2.jar",
+      "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt")
+    assert(additionalProperties === expectedSparkConf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
new file mode 100644
index 0000000..a764f76
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+import org.mockito.MockitoAnnotations
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.rpc.RpcEndpointAddress
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+
+class BasicExecutorFeatureStepSuite
+  extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterEach {
+
+  private val APP_ID = "app-id"
+  private val DRIVER_HOSTNAME = "localhost"
+  private val DRIVER_PORT = 7098
+  private val DRIVER_ADDRESS = RpcEndpointAddress(
+    DRIVER_HOSTNAME,
+    DRIVER_PORT.toInt,
+    CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
+  private val DRIVER_POD_NAME = "driver-pod"
+
+  private val DRIVER_POD_UID = "driver-uid"
+  private val RESOURCE_NAME_PREFIX = "base"
+  private val EXECUTOR_IMAGE = "executor-image"
+  private val LABELS = Map("label1key" -> "label1value")
+  private val ANNOTATIONS = Map("annotation1key" -> "annotation1value")
+  private val TEST_IMAGE_PULL_SECRETS = Seq("my-1secret-1", "my-secret-2")
+  private val TEST_IMAGE_PULL_SECRET_OBJECTS =
+    TEST_IMAGE_PULL_SECRETS.map { secret =>
+      new LocalObjectReferenceBuilder().withName(secret).build()
+    }
+  private val DRIVER_POD = new PodBuilder()
+    .withNewMetadata()
+      .withName(DRIVER_POD_NAME)
+      .withUid(DRIVER_POD_UID)
+      .endMetadata()
+    .withNewSpec()
+      .withNodeName("some-node")
+      .endSpec()
+    .withNewStatus()
+      .withHostIP("192.168.99.100")
+      .endStatus()
+    .build()
+  private var baseConf: SparkConf = _
+
+  before {
+    MockitoAnnotations.initMocks(this)
+    baseConf = new SparkConf()
+      .set(KUBERNETES_DRIVER_POD_NAME, DRIVER_POD_NAME)
+      .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, RESOURCE_NAME_PREFIX)
+      .set(CONTAINER_IMAGE, EXECUTOR_IMAGE)
+      .set(KUBERNETES_DRIVER_SUBMIT_CHECK, true)
+      .set("spark.driver.host", DRIVER_HOSTNAME)
+      .set("spark.driver.port", DRIVER_PORT.toString)
+      .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS.mkString(","))
+  }
+
+  test("basic executor pod has reasonable defaults") {
+    val step = new BasicExecutorFeatureStep(
+      KubernetesConf(
+        baseConf,
+        KubernetesExecutorSpecificConf("1", DRIVER_POD),
+        RESOURCE_NAME_PREFIX,
+        APP_ID,
+        LABELS,
+        ANNOTATIONS,
+        Map.empty,
+        Map.empty))
+    val executor = step.configurePod(SparkPod.initialPod())
+
+    // The executor pod name and default labels.
+    assert(executor.pod.getMetadata.getName === s"$RESOURCE_NAME_PREFIX-exec-1")
+    assert(executor.pod.getMetadata.getLabels.asScala === LABELS)
+    assert(executor.pod.getSpec.getImagePullSecrets.asScala === TEST_IMAGE_PULL_SECRET_OBJECTS)
+
+    // There is exactly 1 container with no volume mounts and default memory limits.
+    // Default memory limit is 1024M + 384M (minimum overhead constant).
+    assert(executor.container.getImage === EXECUTOR_IMAGE)
+    assert(executor.container.getVolumeMounts.isEmpty)
+    assert(executor.container.getResources.getLimits.size() === 1)
+    assert(executor.container.getResources
+      .getLimits.get("memory").getAmount === "1408Mi")
+
+    // The pod has no node selector, volumes.
+    assert(executor.pod.getSpec.getNodeSelector.isEmpty)
+    assert(executor.pod.getSpec.getVolumes.isEmpty)
+
+    checkEnv(executor, Map())
+    checkOwnerReferences(executor.pod, DRIVER_POD_UID)
+  }
+
+  test("executor pod hostnames get truncated to 63 characters") {
+    val conf = baseConf.clone()
+    val longPodNamePrefix = "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple"
+
+    val step = new BasicExecutorFeatureStep(
+      KubernetesConf(
+        conf,
+        KubernetesExecutorSpecificConf("1", DRIVER_POD),
+        longPodNamePrefix,
+        APP_ID,
+        LABELS,
+        ANNOTATIONS,
+        Map.empty,
+        Map.empty))
+    assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63)
+  }
+
+  test("classpath and extra java options get translated into environment variables") {
+    val conf = baseConf.clone()
+    conf.set(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS, "foo=bar")
+    conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz")
+
+    val step = new BasicExecutorFeatureStep(
+      KubernetesConf(
+        conf,
+        KubernetesExecutorSpecificConf("1", DRIVER_POD),
+        RESOURCE_NAME_PREFIX,
+        APP_ID,
+        LABELS,
+        ANNOTATIONS,
+        Map.empty,
+        Map("qux" -> "quux")))
+    val executor = step.configurePod(SparkPod.initialPod())
+
+    checkEnv(executor,
+      Map("SPARK_JAVA_OPT_0" -> "foo=bar",
+        ENV_CLASSPATH -> "bar=baz",
+        "qux" -> "quux"))
+    checkOwnerReferences(executor.pod, DRIVER_POD_UID)
+  }
+
+  // There is always exactly one controller reference, and it points to the driver pod.
+  private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
+    assert(executor.getMetadata.getOwnerReferences.size() === 1)
+    assert(executor.getMetadata.getOwnerReferences.get(0).getUid === driverPodUid)
+    assert(executor.getMetadata.getOwnerReferences.get(0).getController === true)
+  }
+
+  // Check that the expected environment variables are present.
+  private def checkEnv(executorPod: SparkPod, additionalEnvVars: Map[String, String]): Unit = {
+    val defaultEnvs = Map(
+      ENV_EXECUTOR_ID -> "1",
+      ENV_DRIVER_URL -> DRIVER_ADDRESS.toString,
+      ENV_EXECUTOR_CORES -> "1",
+      ENV_EXECUTOR_MEMORY -> "1g",
+      ENV_APPLICATION_ID -> APP_ID,
+      ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
+      ENV_EXECUTOR_POD_IP -> null) ++ additionalEnvVars
+
+    assert(executorPod.container.getEnv.size() === defaultEnvs.size)
+    val mapEnvs = executorPod.container.getEnv.asScala.map {
+      x => (x.getName, x.getValue)
+    }.toMap
+    assert(defaultEnvs === mapEnvs)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
new file mode 100644
index 0000000..9f817d3
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import java.io.File
+
+import com.google.common.base.Charsets
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, Secret}
+import org.mockito.{Mock, MockitoAnnotations}
+import org.scalatest.BeforeAndAfter
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.Utils
+
+class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
+
+  private val KUBERNETES_RESOURCE_NAME_PREFIX = "spark"
+  private val APP_ID = "k8s-app"
+  private var credentialsTempDirectory: File = _
+  private val BASE_DRIVER_POD = SparkPod.initialPod()
+
+  @Mock
+  private var driverSpecificConf: KubernetesDriverSpecificConf = _
+
+  before {
+    MockitoAnnotations.initMocks(this)
+    credentialsTempDirectory = Utils.createTempDir()
+  }
+
+  after {
+    credentialsTempDirectory.delete()
+  }
+
+  test("Don't set any credentials") {
+    val kubernetesConf = KubernetesConf(
+      new SparkConf(false),
+      driverSpecificConf,
+      KUBERNETES_RESOURCE_NAME_PREFIX,
+      APP_ID,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty)
+    val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
+    assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD)
+    assert(kubernetesCredentialsStep.getAdditionalPodSystemProperties().isEmpty)
+    assert(kubernetesCredentialsStep.getAdditionalKubernetesResources().isEmpty)
+  }
+
+  test("Only set credentials that are manually mounted.") {
+    val submissionSparkConf = new SparkConf(false)
+      .set(
+        s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX",
+        "/mnt/secrets/my-token.txt")
+      .set(
+        s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+        "/mnt/secrets/my-key.pem")
+      .set(
+        s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+        "/mnt/secrets/my-cert.pem")
+      .set(
+        s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+        "/mnt/secrets/my-ca.pem")
+    val kubernetesConf = KubernetesConf(
+      submissionSparkConf,
+      driverSpecificConf,
+      KUBERNETES_RESOURCE_NAME_PREFIX,
+      APP_ID,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty)
+
+    val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
+    assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD)
+    assert(kubernetesCredentialsStep.getAdditionalKubernetesResources().isEmpty)
+    val resolvedProperties = kubernetesCredentialsStep.getAdditionalPodSystemProperties()
+    resolvedProperties.foreach { case (propKey, propValue) =>
+      assert(submissionSparkConf.get(propKey) === propValue)
+    }
+  }
+
+  test("Mount credentials from the submission client as a secret.") {
+    val caCertFile = writeCredentials("ca.pem", "ca-cert")
+    val clientKeyFile = writeCredentials("key.pem", "key")
+    val clientCertFile = writeCredentials("cert.pem", "cert")
+    val submissionSparkConf = new SparkConf(false)
+      .set(
+        s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX",
+        "token")
+      .set(
+        s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+        clientKeyFile.getAbsolutePath)
+      .set(
+        s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+        clientCertFile.getAbsolutePath)
+      .set(
+        s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+        caCertFile.getAbsolutePath)
+    val kubernetesConf = KubernetesConf(
+      submissionSparkConf,
+      driverSpecificConf,
+      KUBERNETES_RESOURCE_NAME_PREFIX,
+      APP_ID,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty)
+    val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
+    val resolvedProperties = kubernetesCredentialsStep.getAdditionalPodSystemProperties()
+    val expectedSparkConf = Map(
+      s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX" -> "<present_but_redacted>",
+      s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX" ->
+        DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH,
+      s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX" ->
+        DRIVER_CREDENTIALS_CLIENT_KEY_PATH,
+      s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX" ->
+        DRIVER_CREDENTIALS_CLIENT_CERT_PATH,
+      s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX" ->
+        DRIVER_CREDENTIALS_CA_CERT_PATH)
+    assert(resolvedProperties === expectedSparkConf)
+    assert(kubernetesCredentialsStep.getAdditionalKubernetesResources().size === 1)
+    val credentialsSecret = kubernetesCredentialsStep
+      .getAdditionalKubernetesResources()
+      .head
+      .asInstanceOf[Secret]
+    assert(credentialsSecret.getMetadata.getName ===
+      s"$KUBERNETES_RESOURCE_NAME_PREFIX-kubernetes-credentials")
+    val decodedSecretData = credentialsSecret.getData.asScala.map { data =>
+      (data._1, new String(BaseEncoding.base64().decode(data._2), Charsets.UTF_8))
+    }
+    val expectedSecretData = Map(
+      DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME -> "ca-cert",
+      DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME -> "token",
+      DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME -> "key",
+      DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME -> "cert")
+    assert(decodedSecretData === expectedSecretData)
+    val driverPod = kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD)
+    val driverPodVolumes = driverPod.pod.getSpec.getVolumes.asScala
+    assert(driverPodVolumes.size === 1)
+    assert(driverPodVolumes.head.getName === DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+    assert(driverPodVolumes.head.getSecret != null)
+    assert(driverPodVolumes.head.getSecret.getSecretName === credentialsSecret.getMetadata.getName)
+    val driverContainerVolumeMount = driverPod.container.getVolumeMounts.asScala
+    assert(driverContainerVolumeMount.size === 1)
+    assert(driverContainerVolumeMount.head.getName === DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+    assert(driverContainerVolumeMount.head.getMountPath === DRIVER_CREDENTIALS_SECRETS_BASE_DIR)
+  }
+
+  private def writeCredentials(credentialsFileName: String, credentialsContents: String): File = {
+    val credentialsFile = new File(credentialsTempDirectory, credentialsFileName)
+    Files.write(credentialsContents, credentialsFile, Charsets.UTF_8)
+    credentialsFile
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
new file mode 100644
index 0000000..c299d56
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.Service
+import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfter
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.Clock
+
+class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
+
+  private val SHORT_RESOURCE_NAME_PREFIX =
+    "a" * (DriverServiceFeatureStep.MAX_SERVICE_NAME_LENGTH -
+      DriverServiceFeatureStep.DRIVER_SVC_POSTFIX.length)
+
+  private val LONG_RESOURCE_NAME_PREFIX =
+    "a" * (DriverServiceFeatureStep.MAX_SERVICE_NAME_LENGTH -
+      DriverServiceFeatureStep.DRIVER_SVC_POSTFIX.length + 1)
+  private val DRIVER_LABELS = Map(
+    "label1key" -> "label1value",
+    "label2key" -> "label2value")
+
+  @Mock
+  private var clock: Clock = _
+
+  private var sparkConf: SparkConf = _
+
+  before {
+    MockitoAnnotations.initMocks(this)
+    sparkConf = new SparkConf(false)
+  }
+
+  test("Headless service has a port for the driver RPC and the block manager.") {
+    sparkConf = sparkConf
+      .set("spark.driver.port", "9000")
+      .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080)
+    val configurationStep = new DriverServiceFeatureStep(
+      KubernetesConf(
+        sparkConf,
+        KubernetesDriverSpecificConf(
+          None, "main", "app", Seq.empty),
+        SHORT_RESOURCE_NAME_PREFIX,
+        "app-id",
+        DRIVER_LABELS,
+        Map.empty,
+        Map.empty,
+        Map.empty))
+    assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod())
+    assert(configurationStep.getAdditionalKubernetesResources().size === 1)
+    assert(configurationStep.getAdditionalKubernetesResources().head.isInstanceOf[Service])
+    val driverService = configurationStep
+      .getAdditionalKubernetesResources()
+      .head
+      .asInstanceOf[Service]
+    verifyService(
+      9000,
+      8080,
+      s"$SHORT_RESOURCE_NAME_PREFIX${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}",
+      driverService)
+  }
+
+  test("Hostname and ports are set according to the service name.") {
+    val configurationStep = new DriverServiceFeatureStep(
+      KubernetesConf(
+        sparkConf
+          .set("spark.driver.port", "9000")
+          .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080)
+          .set(KUBERNETES_NAMESPACE, "my-namespace"),
+        KubernetesDriverSpecificConf(
+          None, "main", "app", Seq.empty),
+        SHORT_RESOURCE_NAME_PREFIX,
+        "app-id",
+        DRIVER_LABELS,
+        Map.empty,
+        Map.empty,
+        Map.empty))
+    val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX +
+      DriverServiceFeatureStep.DRIVER_SVC_POSTFIX
+    val expectedHostName = s"$expectedServiceName.my-namespace.svc"
+    val additionalProps = configurationStep.getAdditionalPodSystemProperties()
+    verifySparkConfHostNames(additionalProps, expectedHostName)
+  }
+
+  test("Ports should resolve to defaults in SparkConf and in the service.") {
+    val configurationStep = new DriverServiceFeatureStep(
+      KubernetesConf(
+        sparkConf,
+        KubernetesDriverSpecificConf(
+          None, "main", "app", Seq.empty),
+        SHORT_RESOURCE_NAME_PREFIX,
+        "app-id",
+        DRIVER_LABELS,
+        Map.empty,
+        Map.empty,
+        Map.empty))
+    val resolvedService = configurationStep
+      .getAdditionalKubernetesResources()
+      .head
+      .asInstanceOf[Service]
+    verifyService(
+      DEFAULT_DRIVER_PORT,
+      DEFAULT_BLOCKMANAGER_PORT,
+      s"$SHORT_RESOURCE_NAME_PREFIX${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}",
+      resolvedService)
+    val additionalProps = configurationStep.getAdditionalPodSystemProperties()
+    assert(additionalProps("spark.driver.port") === DEFAULT_DRIVER_PORT.toString)
+    assert(additionalProps(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key)
+      === DEFAULT_BLOCKMANAGER_PORT.toString)
+  }
+
+  test("Long prefixes should switch to using a generated name.") {
+    when(clock.getTimeMillis()).thenReturn(10000)
+    val configurationStep = new DriverServiceFeatureStep(
+      KubernetesConf(
+        sparkConf.set(KUBERNETES_NAMESPACE, "my-namespace"),
+        KubernetesDriverSpecificConf(
+          None, "main", "app", Seq.empty),
+        LONG_RESOURCE_NAME_PREFIX,
+        "app-id",
+        DRIVER_LABELS,
+        Map.empty,
+        Map.empty,
+        Map.empty),
+      clock)
+    val driverService = configurationStep
+      .getAdditionalKubernetesResources()
+      .head
+      .asInstanceOf[Service]
+    val expectedServiceName = s"spark-10000${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}"
+    assert(driverService.getMetadata.getName === expectedServiceName)
+    val expectedHostName = s"$expectedServiceName.my-namespace.svc"
+    val additionalProps = configurationStep.getAdditionalPodSystemProperties()
+    verifySparkConfHostNames(additionalProps, expectedHostName)
+  }
+
+  test("Disallow bind address and driver host to be set explicitly.") {
+    try {
+      new DriverServiceFeatureStep(
+        KubernetesConf(
+          sparkConf.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, "host"),
+          KubernetesDriverSpecificConf(
+            None, "main", "app", Seq.empty),
+          LONG_RESOURCE_NAME_PREFIX,
+          "app-id",
+          DRIVER_LABELS,
+          Map.empty,
+          Map.empty,
+          Map.empty),
+        clock)
+      fail("The driver bind address should not be allowed.")
+    } catch {
+      case e: Throwable =>
+        assert(e.getMessage ===
+          s"requirement failed: ${DriverServiceFeatureStep.DRIVER_BIND_ADDRESS_KEY} is" +
+          " not supported in Kubernetes mode, as the driver's bind address is managed" +
+          " and set to the driver pod's IP address.")
+    }
+    sparkConf.remove(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS)
+    sparkConf.set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, "host")
+    try {
+      new DriverServiceFeatureStep(
+        KubernetesConf(
+          sparkConf,
+          KubernetesDriverSpecificConf(
+            None, "main", "app", Seq.empty),
+          LONG_RESOURCE_NAME_PREFIX,
+          "app-id",
+          DRIVER_LABELS,
+          Map.empty,
+          Map.empty,
+          Map.empty),
+        clock)
+      fail("The driver host address should not be allowed.")
+    } catch {
+      case e: Throwable =>
+        assert(e.getMessage ===
+          s"requirement failed: ${DriverServiceFeatureStep.DRIVER_HOST_KEY} is" +
+          " not supported in Kubernetes mode, as the driver's hostname will be managed via" +
+          " a Kubernetes service.")
+    }
+  }
+
+  private def verifyService(
+      driverPort: Int,
+      blockManagerPort: Int,
+      expectedServiceName: String,
+      service: Service): Unit = {
+    assert(service.getMetadata.getName === expectedServiceName)
+    assert(service.getSpec.getClusterIP === "None")
+    assert(service.getSpec.getSelector.asScala === DRIVER_LABELS)
+    assert(service.getSpec.getPorts.size() === 2)
+    val driverServicePorts = service.getSpec.getPorts.asScala
+    assert(driverServicePorts.head.getName === DRIVER_PORT_NAME)
+    assert(driverServicePorts.head.getPort.intValue() === driverPort)
+    assert(driverServicePorts.head.getTargetPort.getIntVal === driverPort)
+    assert(driverServicePorts(1).getName === BLOCK_MANAGER_PORT_NAME)
+    assert(driverServicePorts(1).getPort.intValue() === blockManagerPort)
+    assert(driverServicePorts(1).getTargetPort.getIntVal === blockManagerPort)
+  }
+
+  private def verifySparkConfHostNames(
+      driverSparkConf: Map[String, String], expectedHostName: String): Unit = {
+    assert(driverSparkConf(
+      org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key) === expectedHostName)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
new file mode 100644
index 0000000..27bff74
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.{HasMetadata, PodBuilder, SecretBuilder}
+import org.mockito.Matchers
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+
+import org.apache.spark.deploy.k8s.SparkPod
+
+object KubernetesFeaturesTestUtils {
+
+  def getMockConfigStepForStepType[T <: KubernetesFeatureConfigStep](
+    stepType: String, stepClass: Class[T]): T = {
+    val mockStep = mock(stepClass)
+    when(mockStep.getAdditionalKubernetesResources()).thenReturn(
+      getSecretsForStepType(stepType))
+
+    when(mockStep.getAdditionalPodSystemProperties())
+      .thenReturn(Map(stepType -> stepType))
+    when(mockStep.configurePod(Matchers.any(classOf[SparkPod])))
+      .thenAnswer(new Answer[SparkPod]() {
+        override def answer(invocation: InvocationOnMock): SparkPod = {
+          val originalPod = invocation.getArgumentAt(0, classOf[SparkPod])
+          val configuredPod = new PodBuilder(originalPod.pod)
+            .editOrNewMetadata()
+            .addToLabels(stepType, stepType)
+            .endMetadata()
+            .build()
+          SparkPod(configuredPod, originalPod.container)
+        }
+      })
+    mockStep
+  }
+
+  def getSecretsForStepType[T <: KubernetesFeatureConfigStep](stepType: String)
+    : Seq[HasMetadata] = {
+    Seq(new SecretBuilder()
+      .withNewMetadata()
+      .withName(stepType)
+      .endMetadata()
+      .build())
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org