You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mc...@apache.org on 2018/12/06 22:17:17 UTC

spark git commit: [SPARK-26194][K8S] Auto generate auth secret for k8s apps.

Repository: spark
Updated Branches:
  refs/heads/master b14a26ee5 -> dbd90e544


[SPARK-26194][K8S] Auto generate auth secret for k8s apps.

This change modifies the logic in the SecurityManager to do two
things:

- generate unique app secrets also when k8s is being used
- only store the secret in the user's UGI on YARN

The latter is needed so that k8s won't unnecessarily create
k8s secrets for the UGI credentials when only the auth token
is stored there.

On the k8s side, the secret is propagated to executors using
an environment variable instead. This ensures it works in both
client and cluster mode.

Security doc was updated to mention the feature and clarify that
proper access control in k8s should be enabled for it to be secure.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #23174 from vanzin/SPARK-26194.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dbd90e54
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dbd90e54
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dbd90e54

Branch: refs/heads/master
Commit: dbd90e54408d593e02a3dd1e659fcf9a7b940535
Parents: b14a26e
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Thu Dec 6 14:17:13 2018 -0800
Committer: mcheah <mc...@palantir.com>
Committed: Thu Dec 6 14:17:13 2018 -0800

----------------------------------------------------------------------
 .../org/apache/spark/SecurityManager.scala      | 21 ++++-
 .../org/apache/spark/SecurityManagerSuite.scala | 57 +++++++-----
 docs/security.md                                | 34 ++++---
 .../k8s/features/BasicExecutorFeatureStep.scala | 96 ++++++++++++--------
 .../cluster/k8s/ExecutorPodsAllocator.scala     |  5 +-
 .../cluster/k8s/KubernetesClusterManager.scala  |  3 +-
 .../k8s/KubernetesClusterSchedulerBackend.scala |  5 +-
 .../cluster/k8s/KubernetesExecutorBuilder.scala | 13 ++-
 .../BasicExecutorFeatureStepSuite.scala         | 46 +++++++---
 .../k8s/ExecutorPodsAllocatorSuite.scala        |  9 +-
 ...KubernetesClusterSchedulerBackendSuite.scala |  9 +-
 .../k8s/KubernetesExecutorBuilderSuite.scala    | 18 ++--
 .../k8s/integrationtest/KubernetesSuite.scala   |  2 +
 13 files changed, 205 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dbd90e54/core/src/main/scala/org/apache/spark/SecurityManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index 3cfafeb..96e4b53 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -348,15 +348,23 @@ private[spark] class SecurityManager(
    */
   def initializeAuth(): Unit = {
     import SparkMasterRegex._
+    val k8sRegex = "k8s.*".r
 
     if (!sparkConf.get(NETWORK_AUTH_ENABLED)) {
       return
     }
 
+    // TODO: this really should be abstracted somewhere else.
     val master = sparkConf.get(SparkLauncher.SPARK_MASTER, "")
-    master match {
+    val storeInUgi = master match {
       case "yarn" | "local" | LOCAL_N_REGEX(_) | LOCAL_N_FAILURES_REGEX(_, _) =>
-        // Secret generation allowed here
+        true
+
+      case k8sRegex() =>
+        // Don't propagate the secret through the user's credentials in kubernetes. That conflicts
+        // with the way k8s handles propagation of delegation tokens.
+        false
+
       case _ =>
         require(sparkConf.contains(SPARK_AUTH_SECRET_CONF),
           s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF config.")
@@ -364,9 +372,12 @@ private[spark] class SecurityManager(
     }
 
     secretKey = Utils.createSecret(sparkConf)
-    val creds = new Credentials()
-    creds.addSecretKey(SECRET_LOOKUP_KEY, secretKey.getBytes(UTF_8))
-    UserGroupInformation.getCurrentUser().addCredentials(creds)
+
+    if (storeInUgi) {
+      val creds = new Credentials()
+      creds.addSecretKey(SECRET_LOOKUP_KEY, secretKey.getBytes(UTF_8))
+      UserGroupInformation.getCurrentUser().addCredentials(creds)
+    }
   }
 
   // Default SecurityManager only has a single secret key, so ignore appId.

http://git-wip-us.apache.org/repos/asf/spark/blob/dbd90e54/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
index e357299..eec8004 100644
--- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
@@ -395,15 +395,23 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
     assert(keyFromEnv === new SecurityManager(conf2).getSecretKey())
   }
 
-  test("secret key generation") {
-    Seq(
-      ("yarn", true),
-      ("local", true),
-      ("local[*]", true),
-      ("local[1, 2]", true),
-      ("local-cluster[2, 1, 1024]", false),
-      ("invalid", false)
-    ).foreach { case (master, shouldGenerateSecret) =>
+  // How is the secret expected to be generated and stored.
+  object SecretTestType extends Enumeration {
+    val MANUAL, AUTO, UGI = Value
+  }
+
+  import SecretTestType._
+
+  Seq(
+    ("yarn", UGI),
+    ("local", UGI),
+    ("local[*]", UGI),
+    ("local[1, 2]", UGI),
+    ("k8s://127.0.0.1", AUTO),
+    ("local-cluster[2, 1, 1024]", MANUAL),
+    ("invalid", MANUAL)
+  ).foreach { case (master, secretType) =>
+    test(s"secret key generation: master '$master'") {
       val conf = new SparkConf()
         .set(NETWORK_AUTH_ENABLED, true)
         .set(SparkLauncher.SPARK_MASTER, master)
@@ -412,19 +420,26 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
       UserGroupInformation.createUserForTesting("authTest", Array()).doAs(
         new PrivilegedExceptionAction[Unit]() {
           override def run(): Unit = {
-            if (shouldGenerateSecret) {
-              mgr.initializeAuth()
-              val creds = UserGroupInformation.getCurrentUser().getCredentials()
-              val secret = creds.getSecretKey(SecurityManager.SECRET_LOOKUP_KEY)
-              assert(secret != null)
-              assert(new String(secret, UTF_8) === mgr.getSecretKey())
-            } else {
-              intercept[IllegalArgumentException] {
+            secretType match {
+              case UGI =>
+                mgr.initializeAuth()
+                val creds = UserGroupInformation.getCurrentUser().getCredentials()
+                val secret = creds.getSecretKey(SecurityManager.SECRET_LOOKUP_KEY)
+                assert(secret != null)
+                assert(new String(secret, UTF_8) === mgr.getSecretKey())
+
+              case AUTO =>
                 mgr.initializeAuth()
-              }
-              intercept[IllegalArgumentException] {
-                mgr.getSecretKey()
-              }
+                val creds = UserGroupInformation.getCurrentUser().getCredentials()
+                assert(creds.getSecretKey(SecurityManager.SECRET_LOOKUP_KEY) === null)
+
+              case MANUAL =>
+                intercept[IllegalArgumentException] {
+                  mgr.initializeAuth()
+                }
+                intercept[IllegalArgumentException] {
+                  mgr.getSecretKey()
+                }
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/dbd90e54/docs/security.md
----------------------------------------------------------------------
diff --git a/docs/security.md b/docs/security.md
index be48346..2a4f3c0 100644
--- a/docs/security.md
+++ b/docs/security.md
@@ -26,21 +26,29 @@ not documented, Spark does not support.
 Spark currently supports authentication for RPC channels using a shared secret. Authentication can
 be turned on by setting the `spark.authenticate` configuration parameter.
 
-The exact mechanism used to generate and distribute the shared secret is deployment-specific.
+The exact mechanism used to generate and distribute the shared secret is deployment-specific. Unless
+specified below, the secret must be defined by setting the `spark.authenticate.secret` config
+option. The same secret is shared by all Spark applications and daemons in that case, which limits
+the security of these deployments, especially on multi-tenant clusters.
 
-For Spark on [YARN](running-on-yarn.html) and local deployments, Spark will automatically handle
-generating and distributing the shared secret. Each application will use a unique shared secret. In
+The REST Submission Server and the MesosClusterDispatcher do not support authentication.  You should
+ensure that all network access to the REST API & MesosClusterDispatcher (port 6066 and 7077
+respectively by default) are restricted to hosts that are trusted to submit jobs.
+
+### YARN
+
+For Spark on [YARN](running-on-yarn.html), Spark will automatically handle generating and
+distributing the shared secret. Each application will use a unique shared secret. In
 the case of YARN, this feature relies on YARN RPC encryption being enabled for the distribution of
 secrets to be secure.
 
-For other resource managers, `spark.authenticate.secret` must be configured on each of the nodes.
-This secret will be shared by all the daemons and applications, so this deployment configuration is
-not as secure as the above, especially when considering multi-tenant clusters.  In this
-configuration, a user with the secret can effectively impersonate any other user.
+### Kubernetes
 
-The Rest Submission Server and the MesosClusterDispatcher do not support authentication.  You should
-ensure that all network access to the REST API & MesosClusterDispatcher (port 6066 and 7077
-respectively by default) are restricted to hosts that are trusted to submit jobs.
+On Kubernetes, Spark will also automatically generate an authentication secret unique to each
+application. The secret is propagated to executor pods using environment variables. This means
+that any user that can list pods in the namespace where the Spark application is running can
+also see their authentication secret. Access control rules should be properly set up by the
+Kubernetes admin to ensure that Spark authentication is secure.
 
 <table class="table">
 <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
@@ -738,10 +746,10 @@ tokens for supported will be created.
 ## Secure Interaction with Kubernetes
 
 When talking to Hadoop-based services behind Kerberos, it was noted that Spark needs to obtain delegation tokens
-so that non-local processes can authenticate. These delegation tokens in Kubernetes are stored in Secrets that are 
-shared by the Driver and its Executors. As such, there are three ways of submitting a Kerberos job: 
+so that non-local processes can authenticate. These delegation tokens in Kubernetes are stored in Secrets that are
+shared by the Driver and its Executors. As such, there are three ways of submitting a Kerberos job:
 
-In all cases you must define the environment variable: `HADOOP_CONF_DIR` or 
+In all cases you must define the environment variable: `HADOOP_CONF_DIR` or
 `spark.kubernetes.hadoop.configMapName.`
 
 It also important to note that the KDC needs to be visible from inside the containers.

http://git-wip-us.apache.org/repos/asf/spark/blob/dbd90e54/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index 8bf3152..939aa88 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -20,7 +20,7 @@ import scala.collection.JavaConverters._
 
 import io.fabric8.kubernetes.api.model._
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
@@ -29,11 +29,12 @@ import org.apache.spark.rpc.RpcEndpointAddress
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
 
-private[spark] class BasicExecutorFeatureStep(kubernetesConf: KubernetesExecutorConf)
+private[spark] class BasicExecutorFeatureStep(
+    kubernetesConf: KubernetesExecutorConf,
+    secMgr: SecurityManager)
   extends KubernetesFeatureConfigStep {
 
   // Consider moving some of these fields to KubernetesConf or KubernetesExecutorSpecificConf
-  private val executorExtraClasspath = kubernetesConf.get(EXECUTOR_CLASS_PATH)
   private val executorContainerImage = kubernetesConf
     .get(EXECUTOR_CONTAINER_IMAGE)
     .getOrElse(throw new SparkException("Must specify the executor container image"))
@@ -87,44 +88,61 @@ private[spark] class BasicExecutorFeatureStep(kubernetesConf: KubernetesExecutor
     val executorCpuQuantity = new QuantityBuilder(false)
       .withAmount(executorCoresRequest)
       .build()
-    val executorExtraClasspathEnv = executorExtraClasspath.map { cp =>
-      new EnvVarBuilder()
-        .withName(ENV_CLASSPATH)
-        .withValue(cp)
-        .build()
-    }
-    val executorExtraJavaOptionsEnv = kubernetesConf
-      .get(EXECUTOR_JAVA_OPTIONS)
-      .map { opts =>
-        val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId,
-          kubernetesConf.executorId)
-        val delimitedOpts = Utils.splitCommandString(subsOpts)
-        delimitedOpts.zipWithIndex.map {
-          case (opt, index) =>
-            new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build()
+
+    val executorEnv: Seq[EnvVar] = {
+        (Seq(
+          (ENV_DRIVER_URL, driverUrl),
+          (ENV_EXECUTOR_CORES, executorCores.toString),
+          (ENV_EXECUTOR_MEMORY, executorMemoryString),
+          (ENV_APPLICATION_ID, kubernetesConf.appId),
+          // This is to set the SPARK_CONF_DIR to be /opt/spark/conf
+          (ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL),
+          (ENV_EXECUTOR_ID, kubernetesConf.executorId)
+        ) ++ kubernetesConf.environment).map { case (k, v) =>
+          new EnvVarBuilder()
+            .withName(k)
+            .withValue(v)
+            .build()
         }
-      }.getOrElse(Seq.empty[EnvVar])
-    val executorEnv = (Seq(
-      (ENV_DRIVER_URL, driverUrl),
-      (ENV_EXECUTOR_CORES, executorCores.toString),
-      (ENV_EXECUTOR_MEMORY, executorMemoryString),
-      (ENV_APPLICATION_ID, kubernetesConf.appId),
-      // This is to set the SPARK_CONF_DIR to be /opt/spark/conf
-      (ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL),
-      (ENV_EXECUTOR_ID, kubernetesConf.executorId)) ++
-      kubernetesConf.environment)
-      .map(env => new EnvVarBuilder()
-        .withName(env._1)
-        .withValue(env._2)
-        .build()
-      ) ++ Seq(
-      new EnvVarBuilder()
-        .withName(ENV_EXECUTOR_POD_IP)
-        .withValueFrom(new EnvVarSourceBuilder()
-          .withNewFieldRef("v1", "status.podIP")
+      } ++ {
+        Seq(new EnvVarBuilder()
+          .withName(ENV_EXECUTOR_POD_IP)
+          .withValueFrom(new EnvVarSourceBuilder()
+            .withNewFieldRef("v1", "status.podIP")
+            .build())
           .build())
-        .build()
-    ) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq
+      } ++ {
+        Option(secMgr.getSecretKey()).map { authSecret =>
+          new EnvVarBuilder()
+            .withName(SecurityManager.ENV_AUTH_SECRET)
+            .withValue(authSecret)
+            .build()
+        }
+      } ++ {
+        kubernetesConf.get(EXECUTOR_CLASS_PATH).map { cp =>
+          new EnvVarBuilder()
+            .withName(ENV_CLASSPATH)
+            .withValue(cp)
+            .build()
+        }
+      } ++ {
+        val userOpts = kubernetesConf.get(EXECUTOR_JAVA_OPTIONS).toSeq.flatMap { opts =>
+          val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId,
+            kubernetesConf.executorId)
+          Utils.splitCommandString(subsOpts)
+        }
+
+        val sparkOpts = Utils.sparkJavaOpts(kubernetesConf.sparkConf,
+          SparkConf.isExecutorStartupConf)
+
+        (userOpts ++ sparkOpts).zipWithIndex.map { case (opt, index) =>
+          new EnvVarBuilder()
+            .withName(s"$ENV_JAVA_OPT_PREFIX$index")
+            .withValue(opt)
+            .build()
+        }
+      }
+
     val requiredPorts = Seq(
       (BLOCK_MANAGER_PORT_NAME, blockManagerPort))
       .map { case (name, port) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/dbd90e54/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index 2f0f949..ac42554 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -22,7 +22,7 @@ import io.fabric8.kubernetes.api.model.PodBuilder
 import io.fabric8.kubernetes.client.KubernetesClient
 import scala.collection.mutable
 
-import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.KubernetesConf
@@ -31,6 +31,7 @@ import org.apache.spark.util.{Clock, Utils}
 
 private[spark] class ExecutorPodsAllocator(
     conf: SparkConf,
+    secMgr: SecurityManager,
     executorBuilder: KubernetesExecutorBuilder,
     kubernetesClient: KubernetesClient,
     snapshotsStore: ExecutorPodsSnapshotsStore,
@@ -135,7 +136,7 @@ private[spark] class ExecutorPodsAllocator(
             newExecutorId.toString,
             applicationId,
             driverPod)
-          val executorPod = executorBuilder.buildFromFeatures(executorConf)
+          val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr)
           val podWithAttachedContainer = new PodBuilder(executorPod.pod)
             .editOrNewSpec()
             .addToContainers(executorPod.container)

http://git-wip-us.apache.org/repos/asf/spark/blob/dbd90e54/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index ce10f76..b31fbb4 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -94,6 +94,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
 
     val executorPodsAllocator = new ExecutorPodsAllocator(
       sc.conf,
+      sc.env.securityManager,
       KubernetesExecutorBuilder(kubernetesClient, sc.conf),
       kubernetesClient,
       snapshotsStore,
@@ -110,7 +111,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
 
     new KubernetesClusterSchedulerBackend(
       scheduler.asInstanceOf[TaskSchedulerImpl],
-      sc.env.rpcEnv,
+      sc,
       kubernetesClient,
       requestExecutorsService,
       snapshotsStore,

http://git-wip-us.apache.org/repos/asf/spark/blob/dbd90e54/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index 6356b58..68f6f2e 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService
 import io.fabric8.kubernetes.client.KubernetesClient
 import scala.concurrent.{ExecutionContext, Future}
 
+import org.apache.spark.SparkContext
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.rpc.{RpcAddress, RpcEnv}
@@ -30,7 +31,7 @@ import org.apache.spark.util.{ThreadUtils, Utils}
 
 private[spark] class KubernetesClusterSchedulerBackend(
     scheduler: TaskSchedulerImpl,
-    rpcEnv: RpcEnv,
+    sc: SparkContext,
     kubernetesClient: KubernetesClient,
     requestExecutorsService: ExecutorService,
     snapshotsStore: ExecutorPodsSnapshotsStore,
@@ -38,7 +39,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
     lifecycleEventHandler: ExecutorPodsLifecycleManager,
     watchEvents: ExecutorPodsWatchSnapshotSource,
     pollEvents: ExecutorPodsPollingSnapshotSource)
-  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
 
   private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
     requestExecutorsService)

http://git-wip-us.apache.org/repos/asf/spark/blob/dbd90e54/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
index d24ff0d..ba273ca 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -20,14 +20,14 @@ import java.io.File
 
 import io.fabric8.kubernetes.client.KubernetesClient
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.features._
 
 private[spark] class KubernetesExecutorBuilder(
-    provideBasicStep: (KubernetesExecutorConf => BasicExecutorFeatureStep) =
-      new BasicExecutorFeatureStep(_),
+    provideBasicStep: (KubernetesExecutorConf, SecurityManager) => BasicExecutorFeatureStep =
+      new BasicExecutorFeatureStep(_, _),
     provideSecretsStep: (KubernetesConf => MountSecretsFeatureStep) =
       new MountSecretsFeatureStep(_),
     provideEnvSecretsStep: (KubernetesConf => EnvSecretsFeatureStep) =
@@ -44,13 +44,16 @@ private[spark] class KubernetesExecutorBuilder(
       new HadoopSparkUserExecutorFeatureStep(_),
     provideInitialPod: () => SparkPod = () => SparkPod.initialPod()) {
 
-  def buildFromFeatures(kubernetesConf: KubernetesExecutorConf): SparkPod = {
+  def buildFromFeatures(
+      kubernetesConf: KubernetesExecutorConf,
+      secMgr: SecurityManager): SparkPod = {
     val sparkConf = kubernetesConf.sparkConf
     val maybeHadoopConfigMap = sparkConf.getOption(HADOOP_CONFIG_MAP_NAME)
     val maybeDTSecretName = sparkConf.getOption(KERBEROS_DT_SECRET_NAME)
     val maybeDTDataItem = sparkConf.getOption(KERBEROS_DT_SECRET_KEY)
 
-    val baseFeatures = Seq(provideBasicStep(kubernetesConf), provideLocalDirsStep(kubernetesConf))
+    val baseFeatures = Seq(provideBasicStep(kubernetesConf, secMgr),
+      provideLocalDirsStep(kubernetesConf))
     val secretFeature = if (kubernetesConf.secretNamesToMountPaths.nonEmpty) {
       Seq(provideSecretsStep(kubernetesConf))
     } else Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/dbd90e54/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index d6003c9..6aa8626 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -21,13 +21,14 @@ import scala.collection.JavaConverters._
 import io.fabric8.kubernetes.api.model._
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.config._
 import org.apache.spark.rpc.RpcEndpointAddress
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
 
 class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
 
@@ -63,7 +64,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
   private var baseConf: SparkConf = _
 
   before {
-    baseConf = new SparkConf()
+    baseConf = new SparkConf(false)
       .set(KUBERNETES_DRIVER_POD_NAME, DRIVER_POD_NAME)
       .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, RESOURCE_NAME_PREFIX)
       .set(CONTAINER_IMAGE, EXECUTOR_IMAGE)
@@ -84,7 +85,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
   }
 
   test("basic executor pod has reasonable defaults") {
-    val step = new BasicExecutorFeatureStep(newExecutorConf())
+    val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
     val executor = step.configurePod(SparkPod.initialPod())
 
     // The executor pod name and default labels.
@@ -106,7 +107,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
     assert(executor.pod.getSpec.getNodeSelector.isEmpty)
     assert(executor.pod.getSpec.getVolumes.isEmpty)
 
-    checkEnv(executor, Map())
+    checkEnv(executor, baseConf, Map())
     checkOwnerReferences(executor.pod, DRIVER_POD_UID)
   }
 
@@ -114,7 +115,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
     val longPodNamePrefix = "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple"
 
     baseConf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, longPodNamePrefix)
-    val step = new BasicExecutorFeatureStep(newExecutorConf())
+    val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
     assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63)
   }
 
@@ -122,10 +123,10 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
     baseConf.set(EXECUTOR_JAVA_OPTIONS, "foo=bar")
     baseConf.set(EXECUTOR_CLASS_PATH, "bar=baz")
     val kconf = newExecutorConf(environment = Map("qux" -> "quux"))
-    val step = new BasicExecutorFeatureStep(kconf)
+    val step = new BasicExecutorFeatureStep(kconf, new SecurityManager(baseConf))
     val executor = step.configurePod(SparkPod.initialPod())
 
-    checkEnv(executor,
+    checkEnv(executor, baseConf,
       Map("SPARK_JAVA_OPT_0" -> "foo=bar",
         ENV_CLASSPATH -> "bar=baz",
         "qux" -> "quux"))
@@ -136,12 +137,27 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
     baseConf.set("spark.kubernetes.resource.type", "python")
     baseConf.set(PYSPARK_EXECUTOR_MEMORY, 42L)
 
-    val step = new BasicExecutorFeatureStep(newExecutorConf())
+    val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
     val executor = step.configurePod(SparkPod.initialPod())
     // This is checking that basic executor + executorMemory = 1408 + 42 = 1450
     assert(executor.container.getResources.getRequests.get("memory").getAmount === "1450Mi")
   }
 
+  test("auth secret propagation") {
+    val conf = baseConf.clone()
+      .set(NETWORK_AUTH_ENABLED, true)
+      .set("spark.master", "k8s://127.0.0.1")
+
+    val secMgr = new SecurityManager(conf)
+    secMgr.initializeAuth()
+
+    val step = new BasicExecutorFeatureStep(KubernetesTestConf.createExecutorConf(sparkConf = conf),
+      secMgr)
+
+    val executor = step.configurePod(SparkPod.initialPod())
+    checkEnv(executor, conf, Map(SecurityManager.ENV_AUTH_SECRET -> secMgr.getSecretKey()))
+  }
+
   // There is always exactly one controller reference, and it points to the driver pod.
   private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
     assert(executor.getMetadata.getOwnerReferences.size() === 1)
@@ -150,7 +166,10 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
   }
 
   // Check that the expected environment variables are present.
-  private def checkEnv(executorPod: SparkPod, additionalEnvVars: Map[String, String]): Unit = {
+  private def checkEnv(
+      executorPod: SparkPod,
+      conf: SparkConf,
+      additionalEnvVars: Map[String, String]): Unit = {
     val defaultEnvs = Map(
       ENV_EXECUTOR_ID -> "1",
       ENV_DRIVER_URL -> DRIVER_ADDRESS.toString,
@@ -160,10 +179,15 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
       ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
       ENV_EXECUTOR_POD_IP -> null) ++ additionalEnvVars
 
-    assert(executorPod.container.getEnv.size() === defaultEnvs.size)
+    val extraJavaOptsStart = additionalEnvVars.keys.count(_.startsWith(ENV_JAVA_OPT_PREFIX))
+    val extraJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
+    val extraJavaOptsEnvs = extraJavaOpts.zipWithIndex.map { case (opt, ind) =>
+      s"$ENV_JAVA_OPT_PREFIX${ind + extraJavaOptsStart}" -> opt
+    }.toMap
+
     val mapEnvs = executorPod.container.getEnv.asScala.map {
       x => (x.getName, x.getValue)
     }.toMap
-    assert(defaultEnvs === mapEnvs)
+    assert((defaultEnvs ++ extraJavaOptsEnvs) === mapEnvs)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/dbd90e54/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index 303e24b..d4fa31a 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -20,13 +20,13 @@ import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder}
 import io.fabric8.kubernetes.client.KubernetesClient
 import io.fabric8.kubernetes.client.dsl.PodResource
 import org.mockito.{ArgumentMatcher, Matchers, Mock, MockitoAnnotations}
-import org.mockito.Matchers.any
+import org.mockito.Matchers.{any, eq => meq}
 import org.mockito.Mockito.{never, times, verify, when}
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
@@ -52,6 +52,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
   private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
   private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
   private val podCreationTimeout = math.max(podAllocationDelay * 5, 60000L)
+  private val secMgr = new SecurityManager(conf)
 
   private var waitForExecutorPodsClock: ManualClock = _
 
@@ -79,12 +80,12 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     when(kubernetesClient.pods()).thenReturn(podOperations)
     when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations)
     when(driverPodOperations.get).thenReturn(driverPod)
-    when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf])))
+    when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr)))
       .thenAnswer(executorPodAnswer())
     snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
     waitForExecutorPodsClock = new ManualClock(0L)
     podsAllocatorUnderTest = new ExecutorPodsAllocator(
-      conf, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
+      conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
     podsAllocatorUnderTest.start(TEST_SPARK_APP_ID)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/dbd90e54/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index 52e7a12..75232f7 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -23,7 +23,7 @@ import org.mockito.Matchers.{eq => mockitoEq}
 import org.mockito.Mockito.{never, verify, when}
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite}
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.Fabric8Aliases._
 import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
@@ -42,6 +42,9 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
   private var sc: SparkContext = _
 
   @Mock
+  private var env: SparkEnv = _
+
+  @Mock
   private var rpcEnv: RpcEnv = _
 
   @Mock
@@ -81,6 +84,8 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
     MockitoAnnotations.initMocks(this)
     when(taskScheduler.sc).thenReturn(sc)
     when(sc.conf).thenReturn(sparkConf)
+    when(sc.env).thenReturn(env)
+    when(env.rpcEnv).thenReturn(rpcEnv)
     driverEndpoint = ArgumentCaptor.forClass(classOf[RpcEndpoint])
     when(rpcEnv.setupEndpoint(
       mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), driverEndpoint.capture()))
@@ -88,7 +93,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
     when(kubernetesClient.pods()).thenReturn(podOperations)
     schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend(
       taskScheduler,
-      rpcEnv,
+      sc,
       kubernetesClient,
       requestExecutorsService,
       eventQueue,

http://git-wip-us.apache.org/repos/asf/spark/blob/dbd90e54/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
index b6a75b1..ef521fd 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
@@ -22,7 +22,7 @@ import io.fabric8.kubernetes.api.model.{Config => _, _}
 import io.fabric8.kubernetes.client.KubernetesClient
 import org.mockito.Mockito.{mock, never, verify}
 
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.features._
@@ -39,6 +39,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
   private val KERBEROS_CONF_STEP_TYPE = "kerberos-step"
   private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes"
 
+  private val secMgr = new SecurityManager(new SparkConf(false))
+
   private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
     BASIC_STEP_TYPE, classOf[BasicExecutorFeatureStep])
   private val mountSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
@@ -57,7 +59,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
     MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep])
 
   private val builderUnderTest = new KubernetesExecutorBuilder(
-    _ => basicFeatureStep,
+    (_, _) => basicFeatureStep,
     _ => mountSecretsStep,
     _ => envSecretsStep,
     _ => localDirsStep,
@@ -69,7 +71,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
   test("Basic steps are consistently applied.") {
     val conf = KubernetesTestConf.createExecutorConf()
     validateStepTypesApplied(
-      builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE)
+      builderUnderTest.buildFromFeatures(conf, secMgr), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE)
   }
 
   test("Apply secrets step if secrets are present.") {
@@ -77,7 +79,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
       secretEnvNamesToKeyRefs = Map("secret-name" -> "secret-key"),
       secretNamesToMountPaths = Map("secret" -> "secretMountPath"))
     validateStepTypesApplied(
-      builderUnderTest.buildFromFeatures(conf),
+      builderUnderTest.buildFromFeatures(conf, secMgr),
       BASIC_STEP_TYPE,
       LOCAL_DIRS_STEP_TYPE,
       SECRETS_STEP_TYPE,
@@ -94,7 +96,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
     val conf = KubernetesTestConf.createExecutorConf(
       volumes = Seq(volumeSpec))
     validateStepTypesApplied(
-      builderUnderTest.buildFromFeatures(conf),
+      builderUnderTest.buildFromFeatures(conf, secMgr),
       BASIC_STEP_TYPE,
       LOCAL_DIRS_STEP_TYPE,
       MOUNT_VOLUMES_STEP_TYPE)
@@ -107,7 +109,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
         .set(HADOOP_CONFIG_MAP_NAME, "hadoop-conf-map-name")
         .set(KRB5_CONFIG_MAP_NAME, "krb5-conf-map-name"))
     validateStepTypesApplied(
-      builderUnderTest.buildFromFeatures(conf),
+      builderUnderTest.buildFromFeatures(conf, secMgr),
       BASIC_STEP_TYPE,
       LOCAL_DIRS_STEP_TYPE,
       HADOOP_CONF_STEP_TYPE,
@@ -123,7 +125,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
         .set(KERBEROS_DT_SECRET_NAME, "dt-secret")
         .set(KERBEROS_DT_SECRET_KEY, "dt-key" ))
     validateStepTypesApplied(
-      builderUnderTest.buildFromFeatures(conf),
+      builderUnderTest.buildFromFeatures(conf, secMgr),
       BASIC_STEP_TYPE,
       LOCAL_DIRS_STEP_TYPE,
       HADOOP_CONF_STEP_TYPE,
@@ -154,7 +156,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
           .endMetadata()
         .build()))
     val sparkPod = KubernetesExecutorBuilder(kubernetesClient, sparkConf)
-      .buildFromFeatures(kubernetesConf)
+      .buildFromFeatures(kubernetesConf, secMgr)
     PodBuilderSuiteUtils.verifyPodWithSupportedFeatures(sparkPod)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/dbd90e54/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index b746a01..f8f4b41 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -36,6 +36,7 @@ import org.apache.spark.{SPARK_VERSION, SparkFunSuite}
 import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
 import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory}
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
 
 class KubernetesSuite extends SparkFunSuite
   with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite
@@ -138,6 +139,7 @@ class KubernetesSuite extends SparkFunSuite
       .set("spark.kubernetes.driver.pod.name", driverPodName)
       .set("spark.kubernetes.driver.label.spark-app-locator", appLocator)
       .set("spark.kubernetes.executor.label.spark-app-locator", appLocator)
+      .set(NETWORK_AUTH_ENABLED.key, "true")
     if (!kubernetesTestComponents.hasUserSpecifiedNamespace) {
       kubernetesTestComponents.createNamespace()
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org