You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ej...@apache.org on 2020/09/25 19:17:38 UTC
[spark] branch branch-2.4 updated: [SPARK-27872][K8S][2.4] Fix
executor service account inconsistency
This is an automated email from the ASF dual-hosted git repository.
eje pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new bf32ac8 [SPARK-27872][K8S][2.4] Fix executor service account inconsistency
bf32ac8 is described below
commit bf32ac8efa9818be551fe720a71eaba50d5d41ad
Author: nssalian <ne...@stitchfix.com>
AuthorDate: Fri Sep 25 12:05:40 2020 -0700
[SPARK-27872][K8S][2.4] Fix executor service account inconsistency
### What changes were proposed in this pull request?
Similar patch to https://github.com/apache/spark/pull/24748 but applied to the branch-2.4.
Backporting the fix to releases 2.4.x.
Please let me know if I missed some step; I haven't contributed to spark in a long time.
Closes #29844 from nssalian/patch-SPARK-27872.
Authored-by: nssalian <ne...@stitchfix.com>
Signed-off-by: Erik Erlandson <ee...@redhat.com>
---
.../scala/org/apache/spark/deploy/k8s/Config.scala | 16 +++++---
.../apache/spark/deploy/k8s/KubernetesUtils.scala | 15 ++++++++
.../DriverKubernetesCredentialsFeatureStep.scala | 13 ++-----
.../ExecutorKubernetesCredentialsFeatureStep.scala | 45 ++++++++++++++++++++++
.../cluster/k8s/KubernetesExecutorBuilder.scala | 11 +++++-
.../k8s/KubernetesExecutorBuilderSuite.scala | 9 ++++-
.../k8s/integrationtest/BasicTestsSuite.scala | 7 ++++
.../k8s/integrationtest/KubernetesSuite.scala | 4 ++
8 files changed, 102 insertions(+), 18 deletions(-)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index c7338a7..cfff6b9 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -61,10 +61,9 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional
- val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
- "spark.kubernetes.authenticate.driver"
- val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
- "spark.kubernetes.authenticate.driver.mounted"
+ val KUBERNETES_AUTH_DRIVER_CONF_PREFIX = "spark.kubernetes.authenticate.driver"
+ val KUBERNETES_AUTH_EXECUTOR_CONF_PREFIX = "spark.kubernetes.authenticate.executor"
+ val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = "spark.kubernetes.authenticate.driver.mounted"
val KUBERNETES_AUTH_CLIENT_MODE_PREFIX = "spark.kubernetes.authenticate"
val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
@@ -72,7 +71,7 @@ private[spark] object Config extends Logging {
val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
- val KUBERNETES_SERVICE_ACCOUNT_NAME =
+ val KUBERNETES_DRIVER_SERVICE_ACCOUNT_NAME =
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
.doc("Service account that is used when running the driver pod. The driver pod uses " +
"this service account when requesting executor pods from the API server. If specific " +
@@ -81,6 +80,13 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional
+ val KUBERNETES_EXECUTOR_SERVICE_ACCOUNT_NAME =
+ ConfigBuilder(s"$KUBERNETES_AUTH_EXECUTOR_CONF_PREFIX.serviceAccountName")
+ .doc("Service account that is used when running the executor pod." +
+ "If this parameter is not setup, the service account defaults to none.")
+ .stringConf
+ .createWithDefault("none")
+
val KUBERNETES_DRIVER_LIMIT_CORES =
ConfigBuilder("spark.kubernetes.driver.limit.cores")
.doc("Specify the hard cpu limit for the driver pod")
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index 588cd9d..aa3cf83 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -16,6 +16,10 @@
*/
package org.apache.spark.deploy.k8s
+import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder,
+ ContainerStateRunning, ContainerStateTerminated,
+ ContainerStateWaiting, ContainerStatus, Pod, PodBuilder}
+
import org.apache.spark.SparkConf
import org.apache.spark.util.Utils
@@ -60,4 +64,15 @@ private[spark] object KubernetesUtils {
}
def parseMasterUrl(url: String): String = url.substring("k8s://".length)
+
+ def buildPodWithServiceAccount(serviceAccount: Option[String], pod: SparkPod): Option[Pod] = {
+ serviceAccount.map { account =>
+ new PodBuilder(pod.pod)
+ .editOrNewSpec()
+ .withServiceAccount(account)
+ .withServiceAccountName(account)
+ .endSpec()
+ .build()
+ }
+ }
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala
index ff5ad66..442ec62 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala
@@ -27,6 +27,7 @@ import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilde
import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesUtils.buildPodWithServiceAccount
private[spark] class DriverKubernetesCredentialsFeatureStep(kubernetesConf: KubernetesConf[_])
extends KubernetesFeatureConfigStep {
@@ -41,7 +42,7 @@ private[spark] class DriverKubernetesCredentialsFeatureStep(kubernetesConf: Kube
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX")
private val maybeMountedCaCertFile = kubernetesConf.getOption(
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")
- private val driverServiceAccount = kubernetesConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
+ private val driverServiceAccount = kubernetesConf.get(KUBERNETES_DRIVER_SERVICE_ACCOUNT_NAME)
private val oauthTokenBase64 = kubernetesConf
.getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX")
@@ -70,15 +71,7 @@ private[spark] class DriverKubernetesCredentialsFeatureStep(kubernetesConf: Kube
override def configurePod(pod: SparkPod): SparkPod = {
if (!shouldMountSecret) {
- pod.copy(
- pod = driverServiceAccount.map { account =>
- new PodBuilder(pod.pod)
- .editOrNewSpec()
- .withServiceAccount(account)
- .withServiceAccountName(account)
- .endSpec()
- .build()
- }.getOrElse(pod.pod))
+ pod.copy(pod = buildPodWithServiceAccount(driverServiceAccount, pod).getOrElse(pod.pod))
} else {
val driverPodWithMountedKubernetesCredentials =
new PodBuilder(pod.pod)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ExecutorKubernetesCredentialsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ExecutorKubernetesCredentialsFeatureStep.scala
new file mode 100644
index 0000000..15a4be4
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ExecutorKubernetesCredentialsFeatureStep.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config.{KUBERNETES_DRIVER_SERVICE_ACCOUNT_NAME,
+ KUBERNETES_EXECUTOR_SERVICE_ACCOUNT_NAME}
+import org.apache.spark.deploy.k8s.KubernetesUtils.buildPodWithServiceAccount
+
+private[spark] class ExecutorKubernetesCredentialsFeatureStep(kubernetesConf: KubernetesConf[_])
+ extends KubernetesFeatureConfigStep {
+ private lazy val driverServiceAccount =
+ kubernetesConf.get(KUBERNETES_DRIVER_SERVICE_ACCOUNT_NAME)
+ private lazy val executorServiceAccount =
+ kubernetesConf.get(KUBERNETES_EXECUTOR_SERVICE_ACCOUNT_NAME)
+
+ override def configurePod(pod: SparkPod): SparkPod = {
+ pod.copy(
+ // if not setup by the pod template fallback to the driver's sa,
+ // last option is the default sa.
+ pod = if (Option(pod.pod.getSpec.getServiceAccount).isEmpty) {
+ buildPodWithServiceAccount(driverServiceAccount, pod).getOrElse(pod.pod)
+ } else {
+ pod.pod
+ })
+ }
+ override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
+
+ override def getAdditionalKubernetesResources(): Seq[io.fabric8.kubernetes.api.model.HasMetadata]
+ = Seq.empty
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
index 364b6fb..1d4f81e 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -18,12 +18,17 @@ package org.apache.spark.scheduler.cluster.k8s
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.features._
-import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
+import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep,
+ EnvSecretsFeatureStep, ExecutorKubernetesCredentialsFeatureStep,
+ LocalDirsFeatureStep, MountSecretsFeatureStep}
private[spark] class KubernetesExecutorBuilder(
provideBasicStep: (KubernetesConf [KubernetesExecutorSpecificConf])
=> BasicExecutorFeatureStep =
new BasicExecutorFeatureStep(_),
+ provideCredentialsStep: (KubernetesConf [KubernetesExecutorSpecificConf])
+ => ExecutorKubernetesCredentialsFeatureStep =
+ new ExecutorKubernetesCredentialsFeatureStep(_),
provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
=> MountSecretsFeatureStep =
new MountSecretsFeatureStep(_),
@@ -50,8 +55,10 @@ private[spark] class KubernetesExecutorBuilder(
val volumesFeature = if (kubernetesConf.roleVolumes.nonEmpty) {
Seq(provideVolumesStep(kubernetesConf))
} else Nil
+ val credentialsFeatures = Seq(provideCredentialsStep(kubernetesConf))
- val allFeatures = baseFeatures ++ secretFeature ++ secretEnvFeature ++ volumesFeature
+ val allFeatures =
+ baseFeatures ++ secretFeature ++ secretEnvFeature ++ volumesFeature ++ credentialsFeatures
var executorPod = SparkPod.initialPod()
for (feature <- allFeatures) {
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
index 44fe4a2..7bbeb26 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
@@ -25,12 +25,15 @@ import org.apache.spark.deploy.k8s.features._
class KubernetesExecutorBuilderSuite extends SparkFunSuite {
private val BASIC_STEP_TYPE = "basic"
private val SECRETS_STEP_TYPE = "mount-secrets"
+ private val CREDENTIALS_STEP_TYPE = "creds"
private val ENV_SECRETS_STEP_TYPE = "env-secrets"
private val LOCAL_DIRS_STEP_TYPE = "local-dirs"
private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes"
private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
BASIC_STEP_TYPE, classOf[BasicExecutorFeatureStep])
+ private val credentialsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+ CREDENTIALS_STEP_TYPE, classOf[ExecutorKubernetesCredentialsFeatureStep])
private val mountSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep])
private val envSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
@@ -42,6 +45,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
private val builderUnderTest = new KubernetesExecutorBuilder(
_ => basicFeatureStep,
+ _ => credentialsStep,
_ => mountSecretsStep,
_ => envSecretsStep,
_ => localDirsStep,
@@ -62,7 +66,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
Nil,
Seq.empty[String])
validateStepTypesApplied(
- builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE)
+ builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE,
+ CREDENTIALS_STEP_TYPE)
}
test("Apply secrets step if secrets are present.") {
@@ -82,6 +87,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
+ CREDENTIALS_STEP_TYPE,
LOCAL_DIRS_STEP_TYPE,
SECRETS_STEP_TYPE,
ENV_SECRETS_STEP_TYPE)
@@ -110,6 +116,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
LOCAL_DIRS_STEP_TYPE,
+ CREDENTIALS_STEP_TYPE,
MOUNT_VOLUMES_STEP_TYPE)
}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
index 1e9f830..76221e4 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
@@ -84,6 +84,13 @@ private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite =>
})
}
+ test("All pods have the same service account by default", k8sTestTag) {
+ runSparkPiAndVerifyCompletion(
+ executorPodChecker = (executorPod: Pod) => {
+ doExecutorServiceAccountCheck(executorPod, kubernetesTestComponents.serviceAccountName)
+ })
+ }
+
test("Run extraJVMOptions check on driver", k8sTestTag) {
sparkAppConf
.set("spark.driver.extraJavaOptions", "-Dspark.test.foo=spark.test.bar")
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index d893433..1036589 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -264,6 +264,10 @@ private[spark] class KubernetesSuite extends SparkFunSuite
=== baseMemory)
}
+ protected def doExecutorServiceAccountCheck(executorPod: Pod, account: String): Unit = {
+ doBasicExecutorPodCheck(executorPod)
+ assert(executorPod.getSpec.getServiceAccount == kubernetesTestComponents.serviceAccountName)
+ }
protected def doBasicDriverPyPodCheck(driverPod: Pod): Unit = {
assert(driverPod.getMetadata.getName === driverPodName)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org