You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2018/12/18 21:33:51 UTC

[GitHub] asfgit closed pull request #22911: [SPARK-25815][k8s] Support kerberos in client mode, keytab-based token renewal.

asfgit closed pull request #22911: [SPARK-25815][k8s] Support kerberos in client mode, keytab-based token renewal.
URL: https://github.com/apache/spark/pull/22911
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index d4055cb6c5853..763bd0a70a035 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy
 
 import java.io._
 import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
-import java.net.URL
+import java.net.{URI, URL}
 import java.security.PrivilegedExceptionAction
 import java.text.ParseException
 import java.util.UUID
@@ -334,19 +334,20 @@ private[spark] class SparkSubmit extends Logging {
     val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
     val targetDir = Utils.createTempDir()
 
-    // assure a keytab is available from any place in a JVM
-    if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient || isKubernetesCluster) {
-      if (args.principal != null) {
-        if (args.keytab != null) {
-          require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
-          // Add keytab and principal configurations in sysProps to make them available
-          // for later use; e.g. in spark sql, the isolated class loader used to talk
-          // to HiveMetastore will use these settings. They will be set as Java system
-          // properties and then loaded by SparkConf
-          sparkConf.set(KEYTAB, args.keytab)
-          sparkConf.set(PRINCIPAL, args.principal)
-          UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
-        }
+    // Kerberos is not supported in standalone mode, and keytab support is not yet available
+    // in Mesos cluster mode.
+    if (clusterManager != STANDALONE
+        && !isMesosCluster
+        && args.principal != null
+        && args.keytab != null) {
+      // If client mode, make sure the keytab is just a local path.
+      if (deployMode == CLIENT && Utils.isLocalUri(args.keytab)) {
+        args.keytab = new URI(args.keytab).getPath()
+      }
+
+      if (!Utils.isLocalUri(args.keytab)) {
+        require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
+        UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
       }
     }
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index 126a6ab801369..f7e3ddecee093 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.deploy.security
 
 import java.io.File
+import java.net.URI
 import java.security.PrivilegedExceptionAction
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
@@ -71,11 +72,13 @@ private[spark] class HadoopDelegationTokenManager(
   private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
 
   private val principal = sparkConf.get(PRINCIPAL).orNull
-  private val keytab = sparkConf.get(KEYTAB).orNull
+
+  // The keytab can be a local: URI for cluster mode, so translate it to a regular path. If it is
+  // needed later on, the code will check that it exists.
+  private val keytab = sparkConf.get(KEYTAB).map { uri => new URI(uri).getPath() }.orNull
 
   require((principal == null) == (keytab == null),
     "Both principal and keytab must be defined, or neither.")
-  require(keytab == null || new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
 
   private val delegationTokenProviders = loadProviders()
   logDebug("Using the following builtin delegation token providers: " +
@@ -264,6 +267,7 @@ private[spark] class HadoopDelegationTokenManager(
 
   private def doLogin(): UserGroupInformation = {
     logInfo(s"Attempting to login to KDC using principal: $principal")
+    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
     val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
     logInfo("Successfully logged into KDC.")
     ugi
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index b4ea1ee950217..e75fc6ed4f7af 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -92,6 +92,9 @@ private[spark] object Utils extends Logging {
   private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
   @volatile private var localRootDirs: Array[String] = null
 
+  /** Scheme used for files that are locally available on worker nodes in the cluster. */
+  val LOCAL_SCHEME = "local"
+
   /** Serialize an object using Java serialization */
   def serialize[T](o: T): Array[Byte] = {
     val bos = new ByteArrayOutputStream()
@@ -2829,6 +2832,11 @@ private[spark] object Utils extends Logging {
   def isClientMode(conf: SparkConf): Boolean = {
     "client".equals(conf.get(SparkLauncher.DEPLOY_MODE, "client"))
   }
+
+  /** Returns whether the URI is a "local:" URI. */
+  def isLocalUri(uri: String): Boolean = {
+    uri.startsWith(s"$LOCAL_SCHEME:")
+  }
 }
 
 private[util] object CallerContext extends Logging {
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index 85917b88e912a..76041e7de5182 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -87,25 +87,22 @@ private[spark] object Constants {
   val NON_JVM_MEMORY_OVERHEAD_FACTOR = 0.4d
 
   // Hadoop Configuration
-  val HADOOP_FILE_VOLUME = "hadoop-properties"
+  val HADOOP_CONF_VOLUME = "hadoop-properties"
   val KRB_FILE_VOLUME = "krb5-file"
   val HADOOP_CONF_DIR_PATH = "/opt/hadoop/conf"
   val KRB_FILE_DIR_PATH = "/etc"
   val ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR"
   val HADOOP_CONFIG_MAP_NAME =
     "spark.kubernetes.executor.hadoopConfigMapName"
-  val KRB5_CONFIG_MAP_NAME =
-    "spark.kubernetes.executor.krb5ConfigMapName"
 
   // Kerberos Configuration
-  val KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME = "delegation-tokens"
   val KERBEROS_DT_SECRET_NAME =
     "spark.kubernetes.kerberos.dt-secret-name"
   val KERBEROS_DT_SECRET_KEY =
     "spark.kubernetes.kerberos.dt-secret-key"
-  val KERBEROS_SPARK_USER_NAME =
-    "spark.kubernetes.kerberos.spark-user-name"
   val KERBEROS_SECRET_KEY = "hadoop-tokens"
+  val KERBEROS_KEYTAB_VOLUME = "kerberos-keytab"
+  val KERBEROS_KEYTAB_MOUNT_POINT = "/mnt/secrets/kerberos-keytab"
 
   // Hadoop credentials secrets for the Spark app.
   val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials"
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index a06c21b47f15e..6febad981af56 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -42,10 +42,6 @@ private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) {
 
   def appName: String = get("spark.app.name", "spark")
 
-  def hadoopConfigMapName: String = s"$resourceNamePrefix-hadoop-config"
-
-  def krbConfigMapName: String = s"$resourceNamePrefix-krb5-file"
-
   def namespace: String = get(KUBERNETES_NAMESPACE)
 
   def imagePullPolicy: String = get(CONTAINER_IMAGE_PULL_POLICY)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala
index 345dd117fd35f..fd1196368a7ff 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala
@@ -18,7 +18,30 @@ package org.apache.spark.deploy.k8s
 
 import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}
 
-private[spark] case class SparkPod(pod: Pod, container: Container)
+private[spark] case class SparkPod(pod: Pod, container: Container) {
+
+  /**
+   * Convenience method to apply a series of chained transformations to a pod.
+   *
+   * Use it like:
+   *
+   *     original.modify { case pod =>
+   *       // update pod and return new one
+   *     }.modify { case pod =>
+   *       // more changes that create a new pod
+   *     }.modify {
+   *       case pod if someCondition => // new pod
+   *     }
+   *
+   * This makes it cleaner to apply multiple transformations, avoiding having to create
+   * a bunch of awkwardly-named local variables. Since the argument is a partial function,
+   * it can do matching without needing to exhaust all the possibilities. If the function
+   * is not applied, then the original pod will be kept.
+   */
+  def transform(fn: PartialFunction[SparkPod, SparkPod]): SparkPod = fn.lift(this).getOrElse(this)
+
+}
+
 
 private[spark] object SparkPod {
   def initialPod(): SparkPod = {
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index d8cf3653d3226..8362c14fb289d 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -110,6 +110,10 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
         .withContainerPort(driverUIPort)
         .withProtocol("TCP")
         .endPort()
+      .addNewEnv()
+        .withName(ENV_SPARK_USER)
+        .withValue(Utils.getCurrentUserName())
+        .endEnv()
       .addAllToEnv(driverCustomEnvs.asJava)
       .addNewEnv()
         .withName(ENV_DRIVER_BIND_ADDRESS)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index 4bcf4c9446aa3..c8bf7cdb4224f 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -163,6 +163,10 @@ private[spark] class BasicExecutorFeatureStep(
         .addToLimits("memory", executorMemoryQuantity)
         .addToRequests("cpu", executorCpuQuantity)
         .endResources()
+        .addNewEnv()
+          .withName(ENV_SPARK_USER)
+          .withValue(Utils.getCurrentUserName())
+          .endEnv()
       .addAllToEnv(executorEnv.asJava)
       .withPorts(requiredPorts.asJava)
       .addToArgs("executor")
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala
new file mode 100644
index 0000000000000..d602ed5481e65
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+import io.fabric8.kubernetes.api.model._
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+
+/**
+ * Mounts the Hadoop configuration - either a pre-defined config map, or a local configuration
+ * directory - on the driver pod.
+ */
+private[spark] class HadoopConfDriverFeatureStep(conf: KubernetesConf)
+  extends KubernetesFeatureConfigStep {
+
+  private val confDir = Option(conf.sparkConf.getenv(ENV_HADOOP_CONF_DIR))
+  private val existingConfMap = conf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP)
+
+  KubernetesUtils.requireNandDefined(
+    confDir,
+    existingConfMap,
+    "Do not specify both the `HADOOP_CONF_DIR` in your ENV and the ConfigMap " +
+    "as the creation of an additional ConfigMap, when one is already specified is extraneous")
+
+  private lazy val confFiles: Seq[File] = {
+    val dir = new File(confDir.get)
+    if (dir.isDirectory) {
+      dir.listFiles.filter(_.isFile).toSeq
+    } else {
+      Nil
+    }
+  }
+
+  private def newConfigMapName: String = s"${conf.resourceNamePrefix}-hadoop-config"
+
+  private def hasHadoopConf: Boolean = confDir.isDefined || existingConfMap.isDefined
+
+  override def configurePod(original: SparkPod): SparkPod = {
+    original.transform { case pod if hasHadoopConf =>
+      val confVolume = if (confDir.isDefined) {
+        val keyPaths = confFiles.map { file =>
+          new KeyToPathBuilder()
+            .withKey(file.getName())
+            .withPath(file.getName())
+            .build()
+        }
+        new VolumeBuilder()
+          .withName(HADOOP_CONF_VOLUME)
+          .withNewConfigMap()
+            .withName(newConfigMapName)
+            .withItems(keyPaths.asJava)
+            .endConfigMap()
+          .build()
+      } else {
+        new VolumeBuilder()
+          .withName(HADOOP_CONF_VOLUME)
+          .withNewConfigMap()
+            .withName(existingConfMap.get)
+            .endConfigMap()
+          .build()
+      }
+
+      val podWithConf = new PodBuilder(pod.pod)
+        .editSpec()
+          .addNewVolumeLike(confVolume)
+            .endVolume()
+          .endSpec()
+          .build()
+
+      val containerWithMount = new ContainerBuilder(pod.container)
+        .addNewVolumeMount()
+          .withName(HADOOP_CONF_VOLUME)
+          .withMountPath(HADOOP_CONF_DIR_PATH)
+          .endVolumeMount()
+        .addNewEnv()
+          .withName(ENV_HADOOP_CONF_DIR)
+          .withValue(HADOOP_CONF_DIR_PATH)
+          .endEnv()
+        .build()
+
+      SparkPod(podWithConf, containerWithMount)
+    }
+  }
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
+    if (confDir.isDefined) {
+      val fileMap = confFiles.map { file =>
+        (file.getName(), Files.toString(file, StandardCharsets.UTF_8))
+      }.toMap.asJava
+
+      Seq(new ConfigMapBuilder()
+        .withNewMetadata()
+          .withName(newConfigMapName)
+          .endMetadata()
+        .addToData(fileMap)
+        .build())
+    } else {
+      Nil
+    }
+  }
+
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala
deleted file mode 100644
index da332881ae1a2..0000000000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.features
-
-import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod}
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil
-import org.apache.spark.internal.Logging
-
-/**
- * This step is responsible for bootstraping the container with ConfigMaps
- * containing Hadoop config files mounted as volumes and an ENV variable
- * pointed to the mounted file directory.
- */
-private[spark] class HadoopConfExecutorFeatureStep(conf: KubernetesExecutorConf)
-  extends KubernetesFeatureConfigStep with Logging {
-
-  override def configurePod(pod: SparkPod): SparkPod = {
-    val hadoopConfDirCMapName = conf.getOption(HADOOP_CONFIG_MAP_NAME)
-    if (hadoopConfDirCMapName.isDefined) {
-      HadoopBootstrapUtil.bootstrapHadoopConfDir(None, None, hadoopConfDirCMapName, pod)
-    } else {
-      pod
-    }
-  }
-}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala
deleted file mode 100644
index c038e75491ca5..0000000000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.features
-
-import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod}
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil
-
-/**
- * This step is responsible for setting ENV_SPARK_USER when HADOOP_FILES are detected
- * however, this step would not be run if Kerberos is enabled, as Kerberos sets SPARK_USER
- */
-private[spark] class HadoopSparkUserExecutorFeatureStep(conf: KubernetesExecutorConf)
-  extends KubernetesFeatureConfigStep {
-
-  override def configurePod(pod: SparkPod): SparkPod = {
-    conf.getOption(KERBEROS_SPARK_USER_NAME).map { user =>
-      HadoopBootstrapUtil.bootstrapSparkUserPod(user, pod)
-    }.getOrElse(pod)
-  }
-}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
index c6d5a866fa7bc..721d7e97b21f8 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
@@ -16,31 +16,40 @@
  */
 package org.apache.spark.deploy.k8s.features
 
-import io.fabric8.kubernetes.api.model.{HasMetadata, Secret, SecretBuilder}
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+import io.fabric8.kubernetes.api.model._
 import org.apache.commons.codec.binary.Base64
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.features.hadooputils._
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils
 
 /**
- * Runs the necessary Hadoop-based logic based on Kerberos configs and the presence of the
- * HADOOP_CONF_DIR. This runs various bootstrap methods defined in HadoopBootstrapUtil.
+ * Provide kerberos / service credentials to the Spark driver.
+ *
+ * There are three use cases, in order of precedence:
+ *
+ * - keytab: if a kerberos keytab is defined, it is provided to the driver, and the driver will
+ *   manage the kerberos login and the creation of delegation tokens.
+ * - existing tokens: if a secret containing delegation tokens is provided, it will be mounted
+ *   on the driver pod, and the driver will handle distribution of those tokens to executors.
+ * - tgt only: if Hadoop security is enabled, the local TGT will be used to create delegation
+ *   tokens which will be provided to the driver. The driver will handle distribution of the
+ *   tokens to executors.
  */
 private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDriverConf)
-  extends KubernetesFeatureConfigStep {
-
-  private val hadoopConfDir = Option(kubernetesConf.sparkConf.getenv(ENV_HADOOP_CONF_DIR))
-  private val hadoopConfigMapName = kubernetesConf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP)
-  KubernetesUtils.requireNandDefined(
-    hadoopConfDir,
-    hadoopConfigMapName,
-    "Do not specify both the `HADOOP_CONF_DIR` in your ENV and the ConfigMap " +
-    "as the creation of an additional ConfigMap, when one is already specified is extraneous")
+  extends KubernetesFeatureConfigStep with Logging {
 
   private val principal = kubernetesConf.get(org.apache.spark.internal.config.PRINCIPAL)
   private val keytab = kubernetesConf.get(org.apache.spark.internal.config.KEYTAB)
@@ -49,15 +58,6 @@ private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDri
   private val krb5File = kubernetesConf.get(KUBERNETES_KERBEROS_KRB5_FILE)
   private val krb5CMap = kubernetesConf.get(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP)
   private val hadoopConf = SparkHadoopUtil.get.newConfiguration(kubernetesConf.sparkConf)
-  private val tokenManager = new HadoopDelegationTokenManager(kubernetesConf.sparkConf, hadoopConf)
-  private val isKerberosEnabled =
-    (hadoopConfDir.isDefined && UserGroupInformation.isSecurityEnabled) ||
-      (hadoopConfigMapName.isDefined && (krb5File.isDefined || krb5CMap.isDefined))
-  require(keytab.isEmpty || isKerberosEnabled,
-    "You must enable Kerberos support if you are specifying a Kerberos Keytab")
-
-  require(existingSecretName.isEmpty || isKerberosEnabled,
-    "You must enable Kerberos support if you are specifying a Kerberos Secret")
 
   KubernetesUtils.requireNandDefined(
     krb5File,
@@ -79,128 +79,183 @@ private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDri
     "If a secret storing a Kerberos Delegation Token is specified you must also" +
       " specify the item-key where the data is stored")
 
-  private val hadoopConfigurationFiles = hadoopConfDir.map { hConfDir =>
-    HadoopBootstrapUtil.getHadoopConfFiles(hConfDir)
+  if (!hasKerberosConf) {
+    logInfo("You have not specified a krb5.conf file locally or via a ConfigMap. " +
+      "Make sure that you have the krb5.conf locally on the driver image.")
   }
-  private val newHadoopConfigMapName =
-    if (hadoopConfigMapName.isEmpty) {
-      Some(kubernetesConf.hadoopConfigMapName)
-    } else {
-      None
-    }
 
-  // Either use pre-existing secret or login to create new Secret with DT stored within
-  private val kerberosConfSpec: Option[KerberosConfigSpec] = (for {
-    secretName <- existingSecretName
-    secretItemKey <- existingSecretItemKey
-  } yield {
-    KerberosConfigSpec(
-      dtSecret = None,
-      dtSecretName = secretName,
-      dtSecretItemKey = secretItemKey,
-      jobUserName = UserGroupInformation.getCurrentUser.getShortUserName)
-  }).orElse(
-    if (isKerberosEnabled) {
-      Some(buildKerberosSpec())
+  // Create delegation tokens if needed. This is a lazy val so that it's not populated
+  // unnecessarily. But it needs to be accessible to different methods in this class,
+  // since it's not clear based solely on available configuration options that delegation
+  // tokens are needed when other credentials are not available.
+  private lazy val delegationTokens: Array[Byte] = {
+    if (keytab.isEmpty && existingSecretName.isEmpty) {
+      val tokenManager = new HadoopDelegationTokenManager(kubernetesConf.sparkConf,
+        SparkHadoopUtil.get.newConfiguration(kubernetesConf.sparkConf))
+      val creds = UserGroupInformation.getCurrentUser().getCredentials()
+      tokenManager.obtainDelegationTokens(creds)
+      // If no tokens and no secrets are stored in the credentials, make sure nothing is returned,
+      // to avoid creating an unnecessary secret.
+      if (creds.numberOfTokens() > 0 || creds.numberOfSecretKeys() > 0) {
+        SparkHadoopUtil.get.serialize(creds)
+      } else {
+        null
+      }
     } else {
-      None
+      null
     }
-  )
+  }
 
-  override def configurePod(pod: SparkPod): SparkPod = {
-    if (!isKerberosEnabled) {
-      return pod
-    }
+  private def needKeytabUpload: Boolean = keytab.exists(!Utils.isLocalUri(_))
 
-    val hadoopBasedSparkPod = HadoopBootstrapUtil.bootstrapHadoopConfDir(
-      hadoopConfDir,
-      newHadoopConfigMapName,
-      hadoopConfigMapName,
-      pod)
-    kerberosConfSpec.map { hSpec =>
-      HadoopBootstrapUtil.bootstrapKerberosPod(
-        hSpec.dtSecretName,
-        hSpec.dtSecretItemKey,
-        hSpec.jobUserName,
-        krb5File,
-        Some(kubernetesConf.krbConfigMapName),
-        krb5CMap,
-        hadoopBasedSparkPod)
-    }.getOrElse(
-      HadoopBootstrapUtil.bootstrapSparkUserPod(
-        UserGroupInformation.getCurrentUser.getShortUserName,
-        hadoopBasedSparkPod))
-  }
+  private def dtSecretName: String = s"${kubernetesConf.resourceNamePrefix}-delegation-tokens"
 
-  override def getAdditionalPodSystemProperties(): Map[String, String] = {
-    if (!isKerberosEnabled) {
-      return Map.empty
-    }
+  private def ktSecretName: String = s"${kubernetesConf.resourceNamePrefix}-kerberos-keytab"
 
-    val resolvedConfValues = kerberosConfSpec.map { hSpec =>
-      Map(KERBEROS_DT_SECRET_NAME -> hSpec.dtSecretName,
-        KERBEROS_DT_SECRET_KEY -> hSpec.dtSecretItemKey,
-        KERBEROS_SPARK_USER_NAME -> hSpec.jobUserName,
-        KRB5_CONFIG_MAP_NAME -> krb5CMap.getOrElse(kubernetesConf.krbConfigMapName))
-      }.getOrElse(
-        Map(KERBEROS_SPARK_USER_NAME ->
-          UserGroupInformation.getCurrentUser.getShortUserName))
-    Map(HADOOP_CONFIG_MAP_NAME ->
-      hadoopConfigMapName.getOrElse(kubernetesConf.hadoopConfigMapName)) ++ resolvedConfValues
-  }
+  private def hasKerberosConf: Boolean = krb5CMap.isDefined | krb5File.isDefined
 
-  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
-    if (!isKerberosEnabled) {
-      return Seq.empty
-    }
+  private def newConfigMapName: String = s"${kubernetesConf.resourceNamePrefix}-krb5-file"
 
-    val hadoopConfConfigMap = for {
-      hName <- newHadoopConfigMapName
-      hFiles <- hadoopConfigurationFiles
-    } yield {
-      HadoopBootstrapUtil.buildHadoopConfigMap(hName, hFiles)
-    }
+  override def configurePod(original: SparkPod): SparkPod = {
+    original.transform { case pod if hasKerberosConf =>
+      val configMapVolume = if (krb5CMap.isDefined) {
+        new VolumeBuilder()
+          .withName(KRB_FILE_VOLUME)
+          .withNewConfigMap()
+            .withName(krb5CMap.get)
+            .endConfigMap()
+          .build()
+      } else {
+        val krb5Conf = new File(krb5File.get)
+        new VolumeBuilder()
+          .withName(KRB_FILE_VOLUME)
+          .withNewConfigMap()
+          .withName(newConfigMapName)
+          .withItems(new KeyToPathBuilder()
+            .withKey(krb5Conf.getName())
+            .withPath(krb5Conf.getName())
+            .build())
+          .endConfigMap()
+          .build()
+      }
 
-    val krb5ConfigMap = krb5File.map { fileLocation =>
-      HadoopBootstrapUtil.buildkrb5ConfigMap(
-        kubernetesConf.krbConfigMapName,
-        fileLocation)
-    }
+      val podWithVolume = new PodBuilder(pod.pod)
+        .editSpec()
+          .addNewVolumeLike(configMapVolume)
+            .endVolume()
+          .endSpec()
+        .build()
+
+      val containerWithMount = new ContainerBuilder(pod.container)
+        .addNewVolumeMount()
+          .withName(KRB_FILE_VOLUME)
+          .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf")
+          .withSubPath("krb5.conf")
+          .endVolumeMount()
+        .build()
+
+      SparkPod(podWithVolume, containerWithMount)
+    }.transform {
+      case pod if needKeytabUpload =>
+        // If keytab is defined and is a submission-local file (not local: URI), then create a
+        // secret for it. The keytab data will be stored in this secret below.
+        val podWitKeytab = new PodBuilder(pod.pod)
+          .editOrNewSpec()
+            .addNewVolume()
+              .withName(KERBEROS_KEYTAB_VOLUME)
+              .withNewSecret()
+                .withSecretName(ktSecretName)
+                .endSecret()
+              .endVolume()
+            .endSpec()
+          .build()
+
+        val containerWithKeytab = new ContainerBuilder(pod.container)
+          .addNewVolumeMount()
+            .withName(KERBEROS_KEYTAB_VOLUME)
+            .withMountPath(KERBEROS_KEYTAB_MOUNT_POINT)
+            .endVolumeMount()
+          .build()
+
+        SparkPod(podWitKeytab, containerWithKeytab)
+
+      case pod if existingSecretName.isDefined | delegationTokens != null =>
+        val secretName = existingSecretName.getOrElse(dtSecretName)
+        val itemKey = existingSecretItemKey.getOrElse(KERBEROS_SECRET_KEY)
+
+        val podWithTokens = new PodBuilder(pod.pod)
+          .editOrNewSpec()
+            .addNewVolume()
+              .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
+              .withNewSecret()
+                .withSecretName(secretName)
+                .endSecret()
+              .endVolume()
+            .endSpec()
+          .build()
 
-    val kerberosDTSecret = kerberosConfSpec.flatMap(_.dtSecret)
+        val containerWithTokens = new ContainerBuilder(pod.container)
+          .addNewVolumeMount()
+            .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
+            .withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)
+            .endVolumeMount()
+          .addNewEnv()
+            .withName(ENV_HADOOP_TOKEN_FILE_LOCATION)
+            .withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$itemKey")
+            .endEnv()
+          .build()
 
-    hadoopConfConfigMap.toSeq ++
-      krb5ConfigMap.toSeq ++
-      kerberosDTSecret.toSeq
+        SparkPod(podWithTokens, containerWithTokens)
+    }
   }
 
-  private def buildKerberosSpec(): KerberosConfigSpec = {
-    // The JobUserUGI will be taken fom the Local Ticket Cache or via keytab+principal
-    // The login happens in the SparkSubmit so login logic is not necessary to include
-    val jobUserUGI = UserGroupInformation.getCurrentUser
-    val creds = jobUserUGI.getCredentials
-    tokenManager.obtainDelegationTokens(creds)
-    val tokenData = SparkHadoopUtil.get.serialize(creds)
-    require(tokenData.nonEmpty, "Did not obtain any delegation tokens")
-    val newSecretName =
-      s"${kubernetesConf.resourceNamePrefix}-$KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME"
-    val secretDT =
-      new SecretBuilder()
-        .withNewMetadata()
-          .withName(newSecretName)
-          .endMetadata()
-        .addToData(KERBEROS_SECRET_KEY, Base64.encodeBase64String(tokenData))
-        .build()
-    KerberosConfigSpec(
-      dtSecret = Some(secretDT),
-      dtSecretName = newSecretName,
-      dtSecretItemKey = KERBEROS_SECRET_KEY,
-      jobUserName = jobUserUGI.getShortUserName)
+  override def getAdditionalPodSystemProperties(): Map[String, String] = {
+    // If a submission-local keytab is provided, update the Spark config so that it knows the
+    // path of the keytab in the driver container.
+    if (needKeytabUpload) {
+      val ktName = new File(keytab.get).getName()
+      Map(KEYTAB.key -> s"$KERBEROS_KEYTAB_MOUNT_POINT/$ktName")
+    } else {
+      Map.empty
+    }
   }
 
-  private case class KerberosConfigSpec(
-      dtSecret: Option[Secret],
-      dtSecretName: String,
-      dtSecretItemKey: String,
-      jobUserName: String)
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
+    Seq[HasMetadata]() ++ {
+      krb5File.map { path =>
+        val file = new File(path)
+        new ConfigMapBuilder()
+          .withNewMetadata()
+            .withName(newConfigMapName)
+            .endMetadata()
+          .addToData(
+            Map(file.getName() -> Files.toString(file, StandardCharsets.UTF_8)).asJava)
+          .build()
+      }
+    } ++ {
+      // If a submission-local keytab is provided, stash it in a secret.
+      if (needKeytabUpload) {
+        val kt = new File(keytab.get)
+        Seq(new SecretBuilder()
+          .withNewMetadata()
+            .withName(ktSecretName)
+            .endMetadata()
+          .addToData(kt.getName(), Base64.encodeBase64String(Files.toByteArray(kt)))
+          .build())
+      } else {
+        Nil
+      }
+    } ++ {
+      if (delegationTokens != null) {
+        Seq(new SecretBuilder()
+          .withNewMetadata()
+            .withName(dtSecretName)
+            .endMetadata()
+          .addToData(KERBEROS_SECRET_KEY, Base64.encodeBase64String(delegationTokens))
+          .build())
+      } else {
+        Nil
+      }
+    }
+  }
 }
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala
deleted file mode 100644
index 907271b1cb483..0000000000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.features
-
-import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod}
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil
-import org.apache.spark.internal.Logging
-
-/**
- * This step is responsible for mounting the DT secret for the executors
- */
-private[spark] class KerberosConfExecutorFeatureStep(conf: KubernetesExecutorConf)
-  extends KubernetesFeatureConfigStep with Logging {
-
-  override def configurePod(pod: SparkPod): SparkPod = {
-    val maybeKrb5CMap = conf.getOption(KRB5_CONFIG_MAP_NAME)
-    if (maybeKrb5CMap.isDefined) {
-      logInfo(s"Mounting Resources for Kerberos")
-      HadoopBootstrapUtil.bootstrapKerberosPod(
-        conf.get(KERBEROS_DT_SECRET_NAME),
-        conf.get(KERBEROS_DT_SECRET_KEY),
-        conf.get(KERBEROS_SPARK_USER_NAME),
-        None,
-        None,
-        maybeKrb5CMap,
-        pod)
-    } else {
-      pod
-    }
-  }
-}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopBootstrapUtil.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopBootstrapUtil.scala
deleted file mode 100644
index 5bee766caf2be..0000000000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopBootstrapUtil.scala
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.features.hadooputils
-
-import java.io.File
-import java.nio.charset.StandardCharsets
-
-import scala.collection.JavaConverters._
-
-import com.google.common.io.Files
-import io.fabric8.kubernetes.api.model._
-
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.SparkPod
-import org.apache.spark.internal.Logging
-
-private[spark] object HadoopBootstrapUtil extends Logging {
-
-  /**
-   * Mounting the DT secret for both the Driver and the executors
-   *
-   * @param dtSecretName Name of the secret that stores the Delegation Token
-   * @param dtSecretItemKey Name of the Item Key storing the Delegation Token
-   * @param userName Name of the SparkUser to set SPARK_USER
-   * @param fileLocation Optional Location of the krb5 file
-   * @param newKrb5ConfName Optional location of the ConfigMap for Krb5
-   * @param existingKrb5ConfName Optional name of ConfigMap for Krb5
-   * @param pod Input pod to be appended to
-   * @return a modified SparkPod
-   */
-  def bootstrapKerberosPod(
-      dtSecretName: String,
-      dtSecretItemKey: String,
-      userName: String,
-      fileLocation: Option[String],
-      newKrb5ConfName: Option[String],
-      existingKrb5ConfName: Option[String],
-      pod: SparkPod): SparkPod = {
-
-    val preConfigMapVolume = existingKrb5ConfName.map { kconf =>
-      new VolumeBuilder()
-        .withName(KRB_FILE_VOLUME)
-        .withNewConfigMap()
-          .withName(kconf)
-          .endConfigMap()
-        .build()
-    }
-
-    val createConfigMapVolume = for {
-      fLocation <- fileLocation
-      krb5ConfName <- newKrb5ConfName
-    } yield {
-      val krb5File = new File(fLocation)
-      val fileStringPath = krb5File.toPath.getFileName.toString
-      new VolumeBuilder()
-        .withName(KRB_FILE_VOLUME)
-        .withNewConfigMap()
-        .withName(krb5ConfName)
-        .withItems(new KeyToPathBuilder()
-          .withKey(fileStringPath)
-          .withPath(fileStringPath)
-          .build())
-        .endConfigMap()
-        .build()
-    }
-
-    // Breaking up Volume creation for clarity
-    val configMapVolume = preConfigMapVolume.orElse(createConfigMapVolume)
-    if (configMapVolume.isEmpty) {
-       logInfo("You have not specified a krb5.conf file locally or via a ConfigMap. " +
-         "Make sure that you have the krb5.conf locally on the Driver and Executor images")
-    }
-
-    val kerberizedPodWithDTSecret = new PodBuilder(pod.pod)
-      .editOrNewSpec()
-        .addNewVolume()
-          .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
-          .withNewSecret()
-            .withSecretName(dtSecretName)
-            .endSecret()
-          .endVolume()
-        .endSpec()
-      .build()
-
-    // Optionally add the krb5.conf ConfigMap
-    val kerberizedPod = configMapVolume.map { cmVolume =>
-      new PodBuilder(kerberizedPodWithDTSecret)
-        .editSpec()
-          .addNewVolumeLike(cmVolume)
-            .endVolume()
-          .endSpec()
-        .build()
-    }.getOrElse(kerberizedPodWithDTSecret)
-
-    val kerberizedContainerWithMounts = new ContainerBuilder(pod.container)
-      .addNewVolumeMount()
-        .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
-        .withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)
-        .endVolumeMount()
-      .addNewEnv()
-        .withName(ENV_HADOOP_TOKEN_FILE_LOCATION)
-        .withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$dtSecretItemKey")
-        .endEnv()
-      .addNewEnv()
-        .withName(ENV_SPARK_USER)
-        .withValue(userName)
-        .endEnv()
-      .build()
-
-    // Optionally add the krb5.conf Volume Mount
-    val kerberizedContainer =
-      if (configMapVolume.isDefined) {
-        new ContainerBuilder(kerberizedContainerWithMounts)
-          .addNewVolumeMount()
-            .withName(KRB_FILE_VOLUME)
-            .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf")
-            .withSubPath("krb5.conf")
-            .endVolumeMount()
-          .build()
-      } else {
-        kerberizedContainerWithMounts
-      }
-
-    SparkPod(kerberizedPod, kerberizedContainer)
-  }
-
-  /**
-   * setting ENV_SPARK_USER when HADOOP_FILES are detected
-   *
-   * @param sparkUserName Name of the SPARK_USER
-   * @param pod Input pod to be appended to
-   * @return a modified SparkPod
-   */
-  def bootstrapSparkUserPod(sparkUserName: String, pod: SparkPod): SparkPod = {
-    val envModifiedContainer = new ContainerBuilder(pod.container)
-      .addNewEnv()
-        .withName(ENV_SPARK_USER)
-        .withValue(sparkUserName)
-        .endEnv()
-      .build()
-    SparkPod(pod.pod, envModifiedContainer)
-  }
-
-  /**
-   * Grabbing files in the HADOOP_CONF_DIR
-   *
-   * @param path location of HADOOP_CONF_DIR
-   * @return a list of File object
-   */
-  def getHadoopConfFiles(path: String): Seq[File] = {
-    val dir = new File(path)
-    if (dir.isDirectory) {
-      dir.listFiles.filter(_.isFile).toSeq
-    } else {
-      Seq.empty[File]
-    }
-  }
-
-  /**
-   * Bootstraping the container with ConfigMaps that store
-   * Hadoop configuration files
-   *
-   * @param hadoopConfDir directory location of HADOOP_CONF_DIR env
-   * @param newHadoopConfigMapName name of the new configMap for HADOOP_CONF_DIR
-   * @param existingHadoopConfigMapName name of the pre-defined configMap for HADOOP_CONF_DIR
-   * @param pod Input pod to be appended to
-   * @return a modified SparkPod
-   */
-  def bootstrapHadoopConfDir(
-      hadoopConfDir: Option[String],
-      newHadoopConfigMapName: Option[String],
-      existingHadoopConfigMapName: Option[String],
-      pod: SparkPod): SparkPod = {
-    val preConfigMapVolume = existingHadoopConfigMapName.map { hConf =>
-      new VolumeBuilder()
-        .withName(HADOOP_FILE_VOLUME)
-        .withNewConfigMap()
-          .withName(hConf)
-          .endConfigMap()
-        .build() }
-
-    val createConfigMapVolume = for {
-      dirLocation <- hadoopConfDir
-      hConfName <- newHadoopConfigMapName
-    } yield {
-      val hadoopConfigFiles = getHadoopConfFiles(dirLocation)
-      val keyPaths = hadoopConfigFiles.map { file =>
-        val fileStringPath = file.toPath.getFileName.toString
-        new KeyToPathBuilder()
-          .withKey(fileStringPath)
-          .withPath(fileStringPath)
-          .build()
-      }
-      new VolumeBuilder()
-        .withName(HADOOP_FILE_VOLUME)
-        .withNewConfigMap()
-          .withName(hConfName)
-          .withItems(keyPaths.asJava)
-          .endConfigMap()
-        .build()
-    }
-
-    // Breaking up Volume Creation for clarity
-    val configMapVolume = preConfigMapVolume.getOrElse(createConfigMapVolume.get)
-
-    val hadoopSupportedPod = new PodBuilder(pod.pod)
-      .editSpec()
-        .addNewVolumeLike(configMapVolume)
-          .endVolume()
-        .endSpec()
-        .build()
-
-    val hadoopSupportedContainer = new ContainerBuilder(pod.container)
-      .addNewVolumeMount()
-        .withName(HADOOP_FILE_VOLUME)
-        .withMountPath(HADOOP_CONF_DIR_PATH)
-        .endVolumeMount()
-      .addNewEnv()
-        .withName(ENV_HADOOP_CONF_DIR)
-        .withValue(HADOOP_CONF_DIR_PATH)
-        .endEnv()
-      .build()
-    SparkPod(hadoopSupportedPod, hadoopSupportedContainer)
-  }
-
-  /**
-   * Builds ConfigMap given the file location of the
-   * krb5.conf file
-   *
-   * @param configMapName name of configMap for krb5
-   * @param fileLocation location of krb5 file
-   * @return a ConfigMap
-   */
-  def buildkrb5ConfigMap(
-      configMapName: String,
-      fileLocation: String): ConfigMap = {
-    val file = new File(fileLocation)
-    new ConfigMapBuilder()
-      .withNewMetadata()
-        .withName(configMapName)
-        .endMetadata()
-      .addToData(Map(file.toPath.getFileName.toString ->
-        Files.toString(file, StandardCharsets.UTF_8)).asJava)
-      .build()
-  }
-
-  /**
-   * Builds ConfigMap given the ConfigMap name
-   * and a list of Hadoop Conf files
-   *
-   * @param hadoopConfigMapName name of hadoopConfigMap
-   * @param hadoopConfFiles list of hadoopFiles
-   * @return a ConfigMap
-   */
-  def buildHadoopConfigMap(
-      hadoopConfigMapName: String,
-      hadoopConfFiles: Seq[File]): ConfigMap = {
-    new ConfigMapBuilder()
-      .withNewMetadata()
-        .withName(hadoopConfigMapName)
-        .endMetadata()
-      .addToData(hadoopConfFiles.map { file =>
-        (file.toPath.getFileName.toString,
-          Files.toString(file, StandardCharsets.UTF_8))
-        }.toMap.asJava)
-      .build()
-  }
-
-}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/KerberosConfigSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/KerberosConfigSpec.scala
deleted file mode 100644
index 7f7ef216cf485..0000000000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/KerberosConfigSpec.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.features.hadooputils
-
-import io.fabric8.kubernetes.api.model.Secret
-
-/**
- * Represents a given configuration of the Kerberos Configuration logic
- * <p>
- * - The secret containing a DT, either previously specified or built on the fly
- * - The name of the secret where the DT will be stored
- * - The data item-key on the secret which correlates with where the current DT data is stored
- * - The Job User's username
- */
-private[spark] case class KerberosConfigSpec(
-    dtSecret: Option[Secret],
-    dtSecretName: String,
-    dtSecretItemKey: String,
-    jobUserName: String)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
index d2c0ced9fa2f4..57e4060bc85b9 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
@@ -46,6 +46,7 @@ private[spark] class KubernetesDriverBuilder {
       new LocalDirsFeatureStep(conf),
       new MountVolumesFeatureStep(conf),
       new DriverCommandFeatureStep(conf),
+      new HadoopConfDriverFeatureStep(conf),
       new KerberosConfDriverFeatureStep(conf),
       new PodTemplateConfigMapStep(conf))
 
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index 68f6f2e46e316..44a97e117b2ca 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -18,12 +18,14 @@ package org.apache.spark.scheduler.cluster.k8s
 
 import java.util.concurrent.ExecutorService
 
-import io.fabric8.kubernetes.client.KubernetesClient
 import scala.concurrent.{ExecutionContext, Future}
 
+import io.fabric8.kubernetes.client.KubernetesClient
+
 import org.apache.spark.SparkContext
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.rpc.{RpcAddress, RpcEnv}
 import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
@@ -129,7 +131,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
   }
 
   override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
-    new KubernetesDriverEndpoint(rpcEnv, properties)
+    new KubernetesDriverEndpoint(sc.env.rpcEnv, properties)
+  }
+
+  override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = {
+    Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration))
   }
 
   private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
index 0b74966fe8685..48aa2c56d4d69 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -44,10 +44,7 @@ private[spark] class KubernetesExecutorBuilder {
       new MountSecretsFeatureStep(conf),
       new EnvSecretsFeatureStep(conf),
       new LocalDirsFeatureStep(conf),
-      new MountVolumesFeatureStep(conf),
-      new HadoopConfExecutorFeatureStep(conf),
-      new KerberosConfExecutorFeatureStep(conf),
-      new HadoopSparkUserExecutorFeatureStep(conf))
+      new MountVolumesFeatureStep(conf))
 
     features.foldLeft(initialPod) { case (pod, feature) => feature.configurePod(pod) }
   }
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index e4951bc1e69ed..5ceb9d6d6fcd0 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.submit._
 import org.apache.spark.internal.config._
 import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Utils
 
 class BasicDriverFeatureStepSuite extends SparkFunSuite {
 
@@ -73,7 +74,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     val foundPortNames = configuredPod.container.getPorts.asScala.toSet
     assert(expectedPortNames === foundPortNames)
 
-    assert(configuredPod.container.getEnv.size === 3)
     val envs = configuredPod.container
       .getEnv
       .asScala
@@ -82,6 +82,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     DRIVER_ENVS.foreach { case (k, v) =>
       assert(envs(v) === v)
     }
+    assert(envs(ENV_SPARK_USER) === Utils.getCurrentUserName())
 
     assert(configuredPod.pod.getSpec().getImagePullSecrets.asScala ===
       TEST_IMAGE_PULL_SECRET_OBJECTS)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index 05989d9be7ad5..c2efab01e4248 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -200,7 +200,8 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
       ENV_EXECUTOR_MEMORY -> "1g",
       ENV_APPLICATION_ID -> KubernetesTestConf.APP_ID,
       ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
-      ENV_EXECUTOR_POD_IP -> null) ++ additionalEnvVars
+      ENV_EXECUTOR_POD_IP -> null,
+      ENV_SPARK_USER -> Utils.getCurrentUserName())
 
     val extraJavaOptsStart = additionalEnvVars.keys.count(_.startsWith(ENV_JAVA_OPT_PREFIX))
     val extraJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
@@ -208,9 +209,11 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
       s"$ENV_JAVA_OPT_PREFIX${ind + extraJavaOptsStart}" -> opt
     }.toMap
 
-    val mapEnvs = executorPod.container.getEnv.asScala.map {
+    val containerEnvs = executorPod.container.getEnv.asScala.map {
       x => (x.getName, x.getValue)
     }.toMap
-    assert((defaultEnvs ++ extraJavaOptsEnvs) === mapEnvs)
+
+    val expectedEnvs = defaultEnvs ++ additionalEnvVars ++ extraJavaOptsEnvs
+    assert(containerEnvs === expectedEnvs)
   }
 }
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStepSuite.scala
new file mode 100644
index 0000000000000..e1c01dbdc7358
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStepSuite.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+import io.fabric8.kubernetes.api.model.ConfigMap
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
+import org.apache.spark.util.{SparkConfWithEnv, Utils}
+
+class HadoopConfDriverFeatureStepSuite extends SparkFunSuite {
+
+  import KubernetesFeaturesTestUtils._
+  import SecretVolumeUtils._
+
+  test("mount hadoop config map if defined") {
+    val sparkConf = new SparkConf(false)
+      .set(Config.KUBERNETES_HADOOP_CONF_CONFIG_MAP, "testConfigMap")
+    val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
+    val step = new HadoopConfDriverFeatureStep(conf)
+    checkPod(step.configurePod(SparkPod.initialPod()))
+    assert(step.getAdditionalKubernetesResources().isEmpty)
+  }
+
+  test("create hadoop config map if config dir is defined") {
+    val confDir = Utils.createTempDir()
+    val confFiles = Set("core-site.xml", "hdfs-site.xml")
+
+    confFiles.foreach { f =>
+      Files.write("some data", new File(confDir, f), UTF_8)
+    }
+
+    val sparkConf = new SparkConfWithEnv(Map(ENV_HADOOP_CONF_DIR -> confDir.getAbsolutePath()))
+    val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
+
+    val step = new HadoopConfDriverFeatureStep(conf)
+    checkPod(step.configurePod(SparkPod.initialPod()))
+
+    val hadoopConfMap = filter[ConfigMap](step.getAdditionalKubernetesResources()).head
+    assert(hadoopConfMap.getData().keySet().asScala === confFiles)
+  }
+
+  private def checkPod(pod: SparkPod): Unit = {
+    assert(podHasVolume(pod.pod, HADOOP_CONF_VOLUME))
+    assert(containerHasVolume(pod.container, HADOOP_CONF_VOLUME, HADOOP_CONF_DIR_PATH))
+    assert(containerHasEnvVar(pod.container, ENV_HADOOP_CONF_DIR))
+  }
+
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala
new file mode 100644
index 0000000000000..41ca3a94ce7a7
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.security.PrivilegedExceptionAction
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+import io.fabric8.kubernetes.api.model.{ConfigMap, Secret}
+import org.apache.commons.codec.binary.Base64
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.k8s._
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils
+
+class KerberosConfDriverFeatureStepSuite extends SparkFunSuite {
+
+  import KubernetesFeaturesTestUtils._
+  import SecretVolumeUtils._
+
+  private val tmpDir = Utils.createTempDir()
+
+  test("mount krb5 config map if defined") {
+    val configMap = "testConfigMap"
+    val step = createStep(
+      new SparkConf(false).set(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP, configMap))
+
+    checkPodForKrbConf(step.configurePod(SparkPod.initialPod()), configMap)
+    assert(step.getAdditionalPodSystemProperties().isEmpty)
+    assert(filter[ConfigMap](step.getAdditionalKubernetesResources()).isEmpty)
+  }
+
+  test("create krb5.conf config map if local config provided") {
+    val krbConf = File.createTempFile("krb5", ".conf", tmpDir)
+    Files.write("some data", krbConf, UTF_8)
+
+    val sparkConf = new SparkConf(false)
+      .set(KUBERNETES_KERBEROS_KRB5_FILE, krbConf.getAbsolutePath())
+    val step = createStep(sparkConf)
+
+    val confMap = filter[ConfigMap](step.getAdditionalKubernetesResources()).head
+    assert(confMap.getData().keySet().asScala === Set(krbConf.getName()))
+
+    checkPodForKrbConf(step.configurePod(SparkPod.initialPod()), confMap.getMetadata().getName())
+    assert(step.getAdditionalPodSystemProperties().isEmpty)
+  }
+
+  test("create keytab secret if client keytab file used") {
+    val keytab = File.createTempFile("keytab", ".bin", tmpDir)
+    Files.write("some data", keytab, UTF_8)
+
+    val sparkConf = new SparkConf(false)
+      .set(KEYTAB, keytab.getAbsolutePath())
+      .set(PRINCIPAL, "alice")
+    val step = createStep(sparkConf)
+
+    val pod = step.configurePod(SparkPod.initialPod())
+    assert(podHasVolume(pod.pod, KERBEROS_KEYTAB_VOLUME))
+    assert(containerHasVolume(pod.container, KERBEROS_KEYTAB_VOLUME, KERBEROS_KEYTAB_MOUNT_POINT))
+
+    assert(step.getAdditionalPodSystemProperties().keys === Set(KEYTAB.key))
+
+    val secret = filter[Secret](step.getAdditionalKubernetesResources()).head
+    assert(secret.getData().keySet().asScala === Set(keytab.getName()))
+  }
+
+  test("do nothing if container-local keytab used") {
+    val sparkConf = new SparkConf(false)
+      .set(KEYTAB, "local:/my.keytab")
+      .set(PRINCIPAL, "alice")
+    val step = createStep(sparkConf)
+
+    val initial = SparkPod.initialPod()
+    assert(step.configurePod(initial) === initial)
+    assert(step.getAdditionalPodSystemProperties().isEmpty)
+    assert(step.getAdditionalKubernetesResources().isEmpty)
+  }
+
+  test("mount delegation tokens if provided") {
+    val dtSecret = "tokenSecret"
+    val sparkConf = new SparkConf(false)
+      .set(KUBERNETES_KERBEROS_DT_SECRET_NAME, dtSecret)
+      .set(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY, "dtokens")
+    val step = createStep(sparkConf)
+
+    checkPodForTokens(step.configurePod(SparkPod.initialPod()), dtSecret)
+    assert(step.getAdditionalPodSystemProperties().isEmpty)
+    assert(step.getAdditionalKubernetesResources().isEmpty)
+  }
+
+  test("create delegation tokens if needed") {
+    // Since HadoopDelegationTokenManager does not create any tokens without proper configs and
+    // services, start with a test user that already has some tokens that will just be piped
+    // through to the driver.
+    val testUser = UserGroupInformation.createUserForTesting("k8s", Array())
+    testUser.doAs(new PrivilegedExceptionAction[Unit]() {
+      override def run(): Unit = {
+        val creds = testUser.getCredentials()
+        creds.addSecretKey(new Text("K8S_TEST_KEY"), Array[Byte](0x4, 0x2))
+        testUser.addCredentials(creds)
+
+        val tokens = SparkHadoopUtil.get.serialize(creds)
+
+        val step = createStep(new SparkConf(false))
+
+        val dtSecret = filter[Secret](step.getAdditionalKubernetesResources()).head
+        assert(dtSecret.getData().get(KERBEROS_SECRET_KEY) === Base64.encodeBase64String(tokens))
+
+        checkPodForTokens(step.configurePod(SparkPod.initialPod()),
+          dtSecret.getMetadata().getName())
+
+        assert(step.getAdditionalPodSystemProperties().isEmpty)
+      }
+    })
+  }
+
+  test("do nothing if no config and no tokens") {
+    val step = createStep(new SparkConf(false))
+    val initial = SparkPod.initialPod()
+    assert(step.configurePod(initial) === initial)
+    assert(step.getAdditionalPodSystemProperties().isEmpty)
+    assert(step.getAdditionalKubernetesResources().isEmpty)
+  }
+
+  private def checkPodForKrbConf(pod: SparkPod, confMapName: String): Unit = {
+    val podVolume = pod.pod.getSpec().getVolumes().asScala.find(_.getName() == KRB_FILE_VOLUME)
+    assert(podVolume.isDefined)
+    assert(containerHasVolume(pod.container, KRB_FILE_VOLUME, KRB_FILE_DIR_PATH + "/krb5.conf"))
+    assert(podVolume.get.getConfigMap().getName() === confMapName)
+  }
+
+  private def checkPodForTokens(pod: SparkPod, dtSecretName: String): Unit = {
+    val podVolume = pod.pod.getSpec().getVolumes().asScala
+      .find(_.getName() == SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
+    assert(podVolume.isDefined)
+    assert(containerHasVolume(pod.container, SPARK_APP_HADOOP_SECRET_VOLUME_NAME,
+      SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR))
+    assert(containerHasEnvVar(pod.container, ENV_HADOOP_TOKEN_FILE_LOCATION))
+    assert(podVolume.get.getSecret().getSecretName() === dtSecretName)
+  }
+
+  private def createStep(conf: SparkConf): KerberosConfDriverFeatureStep = {
+    val kconf = KubernetesTestConf.createDriverConf(sparkConf = conf)
+    new KerberosConfDriverFeatureStep(kconf)
+  }
+
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
index f90380e30e52a..076b681be2397 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.deploy.k8s.features
 
 import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
 
 import io.fabric8.kubernetes.api.model.{Container, HasMetadata, PodBuilder, SecretBuilder}
 import org.mockito.Matchers
@@ -63,4 +64,9 @@ object KubernetesFeaturesTestUtils {
   def containerHasEnvVar(container: Container, envVarName: String): Boolean = {
     container.getEnv.asScala.exists(envVar => envVar.getName == envVarName)
   }
+
+  def filter[T: ClassTag](list: Seq[HasMetadata]): Seq[T] = {
+    val desired = implicitly[ClassTag[T]].runtimeClass
+    list.filter(_.getClass() == desired).map(_.asInstanceOf[T]).toSeq
+  }
 }
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 6240f7b68d2c8..184fb6a8ad13e 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -116,6 +116,8 @@ private[spark] class Client(
     }
   }
 
+  require(keytab == null || !Utils.isLocalUri(keytab), "Keytab should reference a local file.")
+
   private val launcherBackend = new LauncherBackend() {
     override protected def conf: SparkConf = sparkConf
 
@@ -472,7 +474,7 @@ private[spark] class Client(
         appMasterOnly: Boolean = false): (Boolean, String) = {
       val trimmedPath = path.trim()
       val localURI = Utils.resolveURI(trimmedPath)
-      if (localURI.getScheme != LOCAL_SCHEME) {
+      if (localURI.getScheme != Utils.LOCAL_SCHEME) {
         if (addDistributedUri(localURI)) {
           val localPath = getQualifiedLocalPath(localURI, hadoopConf)
           val linkname = targetDir.map(_ + "/").getOrElse("") +
@@ -515,7 +517,7 @@ private[spark] class Client(
     val sparkArchive = sparkConf.get(SPARK_ARCHIVE)
     if (sparkArchive.isDefined) {
       val archive = sparkArchive.get
-      require(!isLocalUri(archive), s"${SPARK_ARCHIVE.key} cannot be a local URI.")
+      require(!Utils.isLocalUri(archive), s"${SPARK_ARCHIVE.key} cannot be a local URI.")
       distribute(Utils.resolveURI(archive).toString,
         resType = LocalResourceType.ARCHIVE,
         destName = Some(LOCALIZED_LIB_DIR))
@@ -525,7 +527,7 @@ private[spark] class Client(
           // Break the list of jars to upload, and resolve globs.
           val localJars = new ArrayBuffer[String]()
           jars.foreach { jar =>
-            if (!isLocalUri(jar)) {
+            if (!Utils.isLocalUri(jar)) {
               val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
               val pathFs = FileSystem.get(path.toUri(), hadoopConf)
               pathFs.globStatus(path).filter(_.isFile()).foreach { entry =>
@@ -814,7 +816,7 @@ private[spark] class Client(
     }
     (pySparkArchives ++ pyArchives).foreach { path =>
       val uri = Utils.resolveURI(path)
-      if (uri.getScheme != LOCAL_SCHEME) {
+      if (uri.getScheme != Utils.LOCAL_SCHEME) {
         pythonPath += buildPath(Environment.PWD.$$(), new Path(uri).getName())
       } else {
         pythonPath += uri.getPath()
@@ -1183,9 +1185,6 @@ private object Client extends Logging {
   // Alias for the user jar
   val APP_JAR_NAME: String = "__app__.jar"
 
-  // URI scheme that identifies local resources
-  val LOCAL_SCHEME = "local"
-
   // Staging directory for any temporary jars or files
   val SPARK_STAGING: String = ".sparkStaging"
 
@@ -1307,7 +1306,7 @@ private object Client extends Logging {
     addClasspathEntry(buildPath(Environment.PWD.$$(), LOCALIZED_LIB_DIR, "*"), env)
     if (sparkConf.get(SPARK_ARCHIVE).isEmpty) {
       sparkConf.get(SPARK_JARS).foreach { jars =>
-        jars.filter(isLocalUri).foreach { jar =>
+        jars.filter(Utils.isLocalUri).foreach { jar =>
           val uri = new URI(jar)
           addClasspathEntry(getClusterPath(sparkConf, uri.getPath()), env)
         }
@@ -1340,7 +1339,7 @@ private object Client extends Logging {
   private def getMainJarUri(mainJar: Option[String]): Option[URI] = {
     mainJar.flatMap { path =>
       val uri = Utils.resolveURI(path)
-      if (uri.getScheme == LOCAL_SCHEME) Some(uri) else None
+      if (uri.getScheme == Utils.LOCAL_SCHEME) Some(uri) else None
     }.orElse(Some(new URI(APP_JAR_NAME)))
   }
 
@@ -1368,7 +1367,7 @@ private object Client extends Logging {
       uri: URI,
       fileName: String,
       env: HashMap[String, String]): Unit = {
-    if (uri != null && uri.getScheme == LOCAL_SCHEME) {
+    if (uri != null && uri.getScheme == Utils.LOCAL_SCHEME) {
       addClasspathEntry(getClusterPath(conf, uri.getPath), env)
     } else if (fileName != null) {
       addClasspathEntry(buildPath(Environment.PWD.$$(), fileName), env)
@@ -1489,11 +1488,6 @@ private object Client extends Logging {
     components.mkString(Path.SEPARATOR)
   }
 
-  /** Returns whether the URI is a "local:" URI. */
-  def isLocalUri(uri: String): Boolean = {
-    uri.startsWith(s"$LOCAL_SCHEME:")
-  }
-
   def createAppReport(report: ApplicationReport): YarnAppReport = {
     val diags = report.getDiagnostics()
     val diagsOpt = if (diags != null && diags.nonEmpty) Some(diags) else None
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index b3286e8fd824e..a6f57fcdb2461 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -100,7 +100,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
     val cp = env("CLASSPATH").split(":|;|<CPS>")
     s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
       val uri = new URI(entry)
-      if (LOCAL_SCHEME.equals(uri.getScheme())) {
+      if (Utils.LOCAL_SCHEME.equals(uri.getScheme())) {
         cp should contain (uri.getPath())
       } else {
         cp should not contain (uri.getPath())
@@ -136,7 +136,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
       val expected = ADDED.split(",")
         .map(p => {
           val uri = new URI(p)
-          if (LOCAL_SCHEME == uri.getScheme()) {
+          if (Utils.LOCAL_SCHEME == uri.getScheme()) {
             p
           } else {
             Option(uri.getFragment()).getOrElse(new File(p).getName())
@@ -249,7 +249,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
       any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
     classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
 
-    sparkConf.set(SPARK_ARCHIVE, LOCAL_SCHEME + ":" + archive.getPath())
+    sparkConf.set(SPARK_ARCHIVE, Utils.LOCAL_SCHEME + ":" + archive.getPath())
     intercept[IllegalArgumentException] {
       client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org