You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mc...@apache.org on 2018/03/19 18:30:22 UTC
[1/2] spark git commit: [SPARK-22839][K8S] Remove the use of
init-container for downloading remote dependencies
Repository: spark
Updated Branches:
refs/heads/master 4de638c19 -> f15906da1
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index bf4ec04..6a50159 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -38,6 +38,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
private val DRIVER_POD_UID = "pod-id"
private val DRIVER_POD_API_VERSION = "v1"
private val DRIVER_POD_KIND = "pod"
+ private val KUBERNETES_RESOURCE_PREFIX = "resource-example"
private type ResourceList = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[
HasMetadata, Boolean]
@@ -61,6 +62,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
private val submissionSteps = Seq(FirstTestConfigurationStep, SecondTestConfigurationStep)
private var createdPodArgumentCaptor: ArgumentCaptor[Pod] = _
private var createdResourcesArgumentCaptor: ArgumentCaptor[HasMetadata] = _
+ private var createdContainerArgumentCaptor: ArgumentCaptor[Container] = _
before {
MockitoAnnotations.initMocks(this)
@@ -94,7 +96,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
kubernetesClient,
false,
"spark",
- loggingPodStatusWatcher)
+ loggingPodStatusWatcher,
+ KUBERNETES_RESOURCE_PREFIX)
submissionClient.run()
val createdPod = createdPodArgumentCaptor.getValue
assert(createdPod.getMetadata.getName === FirstTestConfigurationStep.podName)
@@ -108,62 +111,52 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
SecondTestConfigurationStep.containerName)
}
- test("The client should create the secondary Kubernetes resources.") {
+ test("The client should create Kubernetes resources") {
+ val EXAMPLE_JAVA_OPTS = "-XX:+HeapDumpOnOutOfMemoryError -XX:+PrintGCDetails"
+ val EXPECTED_JAVA_OPTS = "-XX\\:+HeapDumpOnOutOfMemoryError -XX\\:+PrintGCDetails"
val submissionClient = new Client(
submissionSteps,
- new SparkConf(false),
+ new SparkConf(false)
+ .set(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS, EXAMPLE_JAVA_OPTS),
kubernetesClient,
false,
"spark",
- loggingPodStatusWatcher)
+ loggingPodStatusWatcher,
+ KUBERNETES_RESOURCE_PREFIX)
submissionClient.run()
val createdPod = createdPodArgumentCaptor.getValue
val otherCreatedResources = createdResourcesArgumentCaptor.getAllValues
- assert(otherCreatedResources.size === 1)
- val createdResource = Iterables.getOnlyElement(otherCreatedResources).asInstanceOf[Secret]
- assert(createdResource.getMetadata.getName === FirstTestConfigurationStep.secretName)
- assert(createdResource.getData.asScala ===
+ assert(otherCreatedResources.size === 2)
+ val secrets = otherCreatedResources.toArray
+ .filter(_.isInstanceOf[Secret]).map(_.asInstanceOf[Secret])
+ val configMaps = otherCreatedResources.toArray
+ .filter(_.isInstanceOf[ConfigMap]).map(_.asInstanceOf[ConfigMap])
+ assert(secrets.nonEmpty)
+ val secret = secrets.head
+ assert(secret.getMetadata.getName === FirstTestConfigurationStep.secretName)
+ assert(secret.getData.asScala ===
Map(FirstTestConfigurationStep.secretKey -> FirstTestConfigurationStep.secretData))
- val ownerReference = Iterables.getOnlyElement(createdResource.getMetadata.getOwnerReferences)
+ val ownerReference = Iterables.getOnlyElement(secret.getMetadata.getOwnerReferences)
assert(ownerReference.getName === createdPod.getMetadata.getName)
assert(ownerReference.getKind === DRIVER_POD_KIND)
assert(ownerReference.getUid === DRIVER_POD_UID)
assert(ownerReference.getApiVersion === DRIVER_POD_API_VERSION)
- }
-
- test("The client should attach the driver container with the appropriate JVM options.") {
- val sparkConf = new SparkConf(false)
- .set("spark.logConf", "true")
- .set(
- org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS,
- "-XX:+HeapDumpOnOutOfMemoryError -XX:+PrintGCDetails")
- val submissionClient = new Client(
- submissionSteps,
- sparkConf,
- kubernetesClient,
- false,
- "spark",
- loggingPodStatusWatcher)
- submissionClient.run()
- val createdPod = createdPodArgumentCaptor.getValue
+ assert(configMaps.nonEmpty)
+ val configMap = configMaps.head
+ assert(configMap.getMetadata.getName ===
+ s"$KUBERNETES_RESOURCE_PREFIX-driver-conf-map")
+ assert(configMap.getData.containsKey(SPARK_CONF_FILE_NAME))
+ assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains(EXPECTED_JAVA_OPTS))
+ assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains(
+ "spark.custom-conf=custom-conf-value"))
val driverContainer = Iterables.getOnlyElement(createdPod.getSpec.getContainers)
assert(driverContainer.getName === SecondTestConfigurationStep.containerName)
- val driverJvmOptsEnvs = driverContainer.getEnv.asScala.filter { env =>
- env.getName.startsWith(ENV_JAVA_OPT_PREFIX)
- }.sortBy(_.getName)
- assert(driverJvmOptsEnvs.size === 4)
-
- val expectedJvmOptsValues = Seq(
- "-Dspark.logConf=true",
- s"-D${SecondTestConfigurationStep.sparkConfKey}=" +
- s"${SecondTestConfigurationStep.sparkConfValue}",
- "-XX:+HeapDumpOnOutOfMemoryError",
- "-XX:+PrintGCDetails")
- driverJvmOptsEnvs.zip(expectedJvmOptsValues).zipWithIndex.foreach {
- case ((resolvedEnv, expectedJvmOpt), index) =>
- assert(resolvedEnv.getName === s"$ENV_JAVA_OPT_PREFIX$index")
- assert(resolvedEnv.getValue === expectedJvmOpt)
- }
+ val driverEnv = driverContainer.getEnv.asScala.head
+ assert(driverEnv.getName === ENV_SPARK_CONF_DIR)
+ assert(driverEnv.getValue === SPARK_CONF_DIR_INTERNAL)
+ val driverMount = driverContainer.getVolumeMounts.asScala.head
+ assert(driverMount.getName === SPARK_CONF_VOLUME)
+ assert(driverMount.getMountPath === SPARK_CONF_DIR_INTERNAL)
}
test("Waiting for app completion should stall on the watcher") {
@@ -173,7 +166,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
kubernetesClient,
true,
"spark",
- loggingPodStatusWatcher)
+ loggingPodStatusWatcher,
+ KUBERNETES_RESOURCE_PREFIX)
submissionClient.run()
verify(loggingPodStatusWatcher).awaitCompletion()
}
@@ -209,13 +203,11 @@ private object FirstTestConfigurationStep extends DriverConfigurationStep {
}
private object SecondTestConfigurationStep extends DriverConfigurationStep {
-
val annotationKey = "second-submit"
val annotationValue = "submitted"
val sparkConfKey = "spark.custom-conf"
val sparkConfValue = "custom-conf-value"
val containerName = "driverContainer"
-
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val modifiedPod = new PodBuilder(driverSpec.driverPod)
.editMetadata()
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala
index 033d303..df34d2d 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala
@@ -25,7 +25,7 @@ class DriverConfigOrchestratorSuite extends SparkFunSuite {
private val DRIVER_IMAGE = "driver-image"
private val IC_IMAGE = "init-container-image"
private val APP_ID = "spark-app-id"
- private val LAUNCH_TIME = 975256L
+ private val KUBERNETES_RESOURCE_PREFIX = "example-prefix"
private val APP_NAME = "spark"
private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
private val APP_ARGS = Array("arg1", "arg2")
@@ -38,7 +38,7 @@ class DriverConfigOrchestratorSuite extends SparkFunSuite {
val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar")
val orchestrator = new DriverConfigOrchestrator(
APP_ID,
- LAUNCH_TIME,
+ KUBERNETES_RESOURCE_PREFIX,
Some(mainAppResource),
APP_NAME,
MAIN_CLASS,
@@ -49,15 +49,14 @@ class DriverConfigOrchestratorSuite extends SparkFunSuite {
classOf[BasicDriverConfigurationStep],
classOf[DriverServiceBootstrapStep],
classOf[DriverKubernetesCredentialsStep],
- classOf[DependencyResolutionStep]
- )
+ classOf[DependencyResolutionStep])
}
test("Base submission steps without a main app resource.") {
val sparkConf = new SparkConf(false).set(CONTAINER_IMAGE, DRIVER_IMAGE)
val orchestrator = new DriverConfigOrchestrator(
APP_ID,
- LAUNCH_TIME,
+ KUBERNETES_RESOURCE_PREFIX,
Option.empty,
APP_NAME,
MAIN_CLASS,
@@ -67,31 +66,7 @@ class DriverConfigOrchestratorSuite extends SparkFunSuite {
orchestrator,
classOf[BasicDriverConfigurationStep],
classOf[DriverServiceBootstrapStep],
- classOf[DriverKubernetesCredentialsStep]
- )
- }
-
- test("Submission steps with an init-container.") {
- val sparkConf = new SparkConf(false)
- .set(CONTAINER_IMAGE, DRIVER_IMAGE)
- .set(INIT_CONTAINER_IMAGE.key, IC_IMAGE)
- .set("spark.jars", "hdfs://localhost:9000/var/apps/jars/jar1.jar")
- val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar")
- val orchestrator = new DriverConfigOrchestrator(
- APP_ID,
- LAUNCH_TIME,
- Some(mainAppResource),
- APP_NAME,
- MAIN_CLASS,
- APP_ARGS,
- sparkConf)
- validateStepTypes(
- orchestrator,
- classOf[BasicDriverConfigurationStep],
- classOf[DriverServiceBootstrapStep],
- classOf[DriverKubernetesCredentialsStep],
- classOf[DependencyResolutionStep],
- classOf[DriverInitContainerBootstrapStep])
+ classOf[DriverKubernetesCredentialsStep])
}
test("Submission steps with driver secrets to mount") {
@@ -102,7 +77,7 @@ class DriverConfigOrchestratorSuite extends SparkFunSuite {
val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar")
val orchestrator = new DriverConfigOrchestrator(
APP_ID,
- LAUNCH_TIME,
+ KUBERNETES_RESOURCE_PREFIX,
Some(mainAppResource),
APP_NAME,
MAIN_CLASS,
@@ -122,7 +97,7 @@ class DriverConfigOrchestratorSuite extends SparkFunSuite {
.set(CONTAINER_IMAGE, DRIVER_IMAGE)
var orchestrator = new DriverConfigOrchestrator(
APP_ID,
- LAUNCH_TIME,
+ KUBERNETES_RESOURCE_PREFIX,
Some(JavaMainAppResource("file:///var/apps/jars/main.jar")),
APP_NAME,
MAIN_CLASS,
@@ -135,7 +110,7 @@ class DriverConfigOrchestratorSuite extends SparkFunSuite {
sparkConf.set("spark.files", "/path/to/file1,/path/to/file2")
orchestrator = new DriverConfigOrchestrator(
APP_ID,
- LAUNCH_TIME,
+ KUBERNETES_RESOURCE_PREFIX,
Some(JavaMainAppResource("local:///var/apps/jars/main.jar")),
APP_NAME,
MAIN_CLASS,
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
index b136f2c..ce06853 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala
@@ -73,16 +73,13 @@ class BasicDriverConfigurationStepSuite extends SparkFunSuite {
assert(preparedDriverSpec.driverContainer.getImage === "spark-driver:latest")
assert(preparedDriverSpec.driverContainer.getImagePullPolicy === CONTAINER_IMAGE_PULL_POLICY)
- assert(preparedDriverSpec.driverContainer.getEnv.size === 7)
+ assert(preparedDriverSpec.driverContainer.getEnv.size === 4)
val envs = preparedDriverSpec.driverContainer
.getEnv
.asScala
.map(env => (env.getName, env.getValue))
.toMap
assert(envs(ENV_CLASSPATH) === "/opt/spark/spark-examples.jar")
- assert(envs(ENV_DRIVER_MEMORY) === "256M")
- assert(envs(ENV_DRIVER_MAIN_CLASS) === MAIN_CLASS)
- assert(envs(ENV_DRIVER_ARGS) === "arg1 arg2 \"arg 3\"")
assert(envs(DRIVER_CUSTOM_ENV_KEY1) === "customDriverEnv1")
assert(envs(DRIVER_CUSTOM_ENV_KEY2) === "customDriverEnv2")
@@ -112,7 +109,8 @@ class BasicDriverConfigurationStepSuite extends SparkFunSuite {
val expectedSparkConf = Map(
KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
"spark.app.id" -> APP_ID,
- KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX)
+ KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX,
+ "spark.kubernetes.submitInDriver" -> "true")
assert(resolvedSparkConf === expectedSparkConf)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala
index 991b03c..ca43fc9 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala
@@ -29,24 +29,17 @@ import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
class DependencyResolutionStepSuite extends SparkFunSuite {
private val SPARK_JARS = Seq(
- "hdfs://localhost:9000/apps/jars/jar1.jar",
- "file:///home/user/apps/jars/jar2.jar",
- "local:///var/apps/jars/jar3.jar")
+ "apps/jars/jar1.jar",
+ "local:///var/apps/jars/jar2.jar")
private val SPARK_FILES = Seq(
- "file:///home/user/apps/files/file1.txt",
- "hdfs://localhost:9000/apps/files/file2.txt",
- "local:///var/apps/files/file3.txt")
-
- private val JARS_DOWNLOAD_PATH = "/mnt/spark-data/jars"
- private val FILES_DOWNLOAD_PATH = "/mnt/spark-data/files"
+ "apps/files/file1.txt",
+ "local:///var/apps/files/file2.txt")
test("Added dependencies should be resolved in Spark configuration and environment") {
val dependencyResolutionStep = new DependencyResolutionStep(
SPARK_JARS,
- SPARK_FILES,
- JARS_DOWNLOAD_PATH,
- FILES_DOWNLOAD_PATH)
+ SPARK_FILES)
val driverPod = new PodBuilder().build()
val baseDriverSpec = KubernetesDriverSpec(
driverPod = driverPod,
@@ -58,24 +51,19 @@ class DependencyResolutionStepSuite extends SparkFunSuite {
assert(preparedDriverSpec.otherKubernetesResources.isEmpty)
val resolvedSparkJars = preparedDriverSpec.driverSparkConf.get("spark.jars").split(",").toSet
val expectedResolvedSparkJars = Set(
- "hdfs://localhost:9000/apps/jars/jar1.jar",
- s"$JARS_DOWNLOAD_PATH/jar2.jar",
- "/var/apps/jars/jar3.jar")
+ "apps/jars/jar1.jar",
+ "/var/apps/jars/jar2.jar")
assert(resolvedSparkJars === expectedResolvedSparkJars)
val resolvedSparkFiles = preparedDriverSpec.driverSparkConf.get("spark.files").split(",").toSet
val expectedResolvedSparkFiles = Set(
- s"$FILES_DOWNLOAD_PATH/file1.txt",
- s"hdfs://localhost:9000/apps/files/file2.txt",
- s"/var/apps/files/file3.txt")
+ "apps/files/file1.txt",
+ "/var/apps/files/file2.txt")
assert(resolvedSparkFiles === expectedResolvedSparkFiles)
val driverEnv = preparedDriverSpec.driverContainer.getEnv.asScala
assert(driverEnv.size === 1)
assert(driverEnv.head.getName === ENV_MOUNTED_CLASSPATH)
val resolvedDriverClasspath = driverEnv.head.getValue.split(File.pathSeparator).toSet
- val expectedResolvedDriverClasspath = Set(
- s"$JARS_DOWNLOAD_PATH/jar1.jar",
- s"$JARS_DOWNLOAD_PATH/jar2.jar",
- "/var/apps/jars/jar3.jar")
+ val expectedResolvedDriverClasspath = expectedResolvedSparkJars
assert(resolvedDriverClasspath === expectedResolvedDriverClasspath)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStepSuite.scala
deleted file mode 100644
index 758871e..0000000
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStepSuite.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import java.io.StringReader
-import java.util.Properties
-
-import scala.collection.JavaConverters._
-
-import com.google.common.collect.Maps
-import io.fabric8.kubernetes.api.model.{ConfigMap, ContainerBuilder, HasMetadata, PodBuilder, SecretBuilder}
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-import org.apache.spark.deploy.k8s.submit.steps.initcontainer.{InitContainerConfigurationStep, InitContainerSpec}
-import org.apache.spark.util.Utils
-
-class DriverInitContainerBootstrapStepSuite extends SparkFunSuite {
-
- private val CONFIG_MAP_NAME = "spark-init-config-map"
- private val CONFIG_MAP_KEY = "spark-init-config-map-key"
-
- test("The init container bootstrap step should use all of the init container steps") {
- val baseDriverSpec = KubernetesDriverSpec(
- driverPod = new PodBuilder().build(),
- driverContainer = new ContainerBuilder().build(),
- driverSparkConf = new SparkConf(false),
- otherKubernetesResources = Seq.empty[HasMetadata])
- val initContainerSteps = Seq(
- FirstTestInitContainerConfigurationStep,
- SecondTestInitContainerConfigurationStep)
- val bootstrapStep = new DriverInitContainerBootstrapStep(
- initContainerSteps,
- CONFIG_MAP_NAME,
- CONFIG_MAP_KEY)
-
- val preparedDriverSpec = bootstrapStep.configureDriver(baseDriverSpec)
-
- assert(preparedDriverSpec.driverPod.getMetadata.getLabels.asScala ===
- FirstTestInitContainerConfigurationStep.additionalLabels)
- val additionalDriverEnv = preparedDriverSpec.driverContainer.getEnv.asScala
- assert(additionalDriverEnv.size === 1)
- assert(additionalDriverEnv.head.getName ===
- FirstTestInitContainerConfigurationStep.additionalMainContainerEnvKey)
- assert(additionalDriverEnv.head.getValue ===
- FirstTestInitContainerConfigurationStep.additionalMainContainerEnvValue)
-
- assert(preparedDriverSpec.otherKubernetesResources.size === 2)
- assert(preparedDriverSpec.otherKubernetesResources.contains(
- FirstTestInitContainerConfigurationStep.additionalKubernetesResource))
- assert(preparedDriverSpec.otherKubernetesResources.exists {
- case configMap: ConfigMap =>
- val hasMatchingName = configMap.getMetadata.getName == CONFIG_MAP_NAME
- val configMapData = configMap.getData.asScala
- val hasCorrectNumberOfEntries = configMapData.size == 1
- val initContainerPropertiesRaw = configMapData(CONFIG_MAP_KEY)
- val initContainerProperties = new Properties()
- Utils.tryWithResource(new StringReader(initContainerPropertiesRaw)) {
- initContainerProperties.load(_)
- }
- val initContainerPropertiesMap = Maps.fromProperties(initContainerProperties).asScala
- val expectedInitContainerProperties = Map(
- SecondTestInitContainerConfigurationStep.additionalInitContainerPropertyKey ->
- SecondTestInitContainerConfigurationStep.additionalInitContainerPropertyValue)
- val hasMatchingProperties = initContainerPropertiesMap == expectedInitContainerProperties
- hasMatchingName && hasCorrectNumberOfEntries && hasMatchingProperties
-
- case _ => false
- })
-
- val initContainers = preparedDriverSpec.driverPod.getSpec.getInitContainers
- assert(initContainers.size() === 1)
- val initContainerEnv = initContainers.get(0).getEnv.asScala
- assert(initContainerEnv.size === 1)
- assert(initContainerEnv.head.getName ===
- SecondTestInitContainerConfigurationStep.additionalInitContainerEnvKey)
- assert(initContainerEnv.head.getValue ===
- SecondTestInitContainerConfigurationStep.additionalInitContainerEnvValue)
-
- val expectedSparkConf = Map(
- INIT_CONTAINER_CONFIG_MAP_NAME.key -> CONFIG_MAP_NAME,
- INIT_CONTAINER_CONFIG_MAP_KEY_CONF.key -> CONFIG_MAP_KEY,
- SecondTestInitContainerConfigurationStep.additionalDriverSparkConfKey ->
- SecondTestInitContainerConfigurationStep.additionalDriverSparkConfValue)
- assert(preparedDriverSpec.driverSparkConf.getAll.toMap === expectedSparkConf)
- }
-}
-
-private object FirstTestInitContainerConfigurationStep extends InitContainerConfigurationStep {
-
- val additionalLabels = Map("additionalLabelkey" -> "additionalLabelValue")
- val additionalMainContainerEnvKey = "TEST_ENV_MAIN_KEY"
- val additionalMainContainerEnvValue = "TEST_ENV_MAIN_VALUE"
- val additionalKubernetesResource = new SecretBuilder()
- .withNewMetadata()
- .withName("test-secret")
- .endMetadata()
- .addToData("secret-key", "secret-value")
- .build()
-
- override def configureInitContainer(initContainerSpec: InitContainerSpec): InitContainerSpec = {
- val driverPod = new PodBuilder(initContainerSpec.driverPod)
- .editOrNewMetadata()
- .addToLabels(additionalLabels.asJava)
- .endMetadata()
- .build()
- val mainContainer = new ContainerBuilder(initContainerSpec.driverContainer)
- .addNewEnv()
- .withName(additionalMainContainerEnvKey)
- .withValue(additionalMainContainerEnvValue)
- .endEnv()
- .build()
- initContainerSpec.copy(
- driverPod = driverPod,
- driverContainer = mainContainer,
- dependentResources = initContainerSpec.dependentResources ++
- Seq(additionalKubernetesResource))
- }
-}
-
-private object SecondTestInitContainerConfigurationStep extends InitContainerConfigurationStep {
- val additionalInitContainerEnvKey = "TEST_ENV_INIT_KEY"
- val additionalInitContainerEnvValue = "TEST_ENV_INIT_VALUE"
- val additionalInitContainerPropertyKey = "spark.initcontainer.testkey"
- val additionalInitContainerPropertyValue = "testvalue"
- val additionalDriverSparkConfKey = "spark.driver.testkey"
- val additionalDriverSparkConfValue = "spark.driver.testvalue"
-
- override def configureInitContainer(initContainerSpec: InitContainerSpec): InitContainerSpec = {
- val initContainer = new ContainerBuilder(initContainerSpec.initContainer)
- .addNewEnv()
- .withName(additionalInitContainerEnvKey)
- .withValue(additionalInitContainerEnvValue)
- .endEnv()
- .build()
- val initContainerProperties = initContainerSpec.properties ++
- Map(additionalInitContainerPropertyKey -> additionalInitContainerPropertyValue)
- val driverSparkConf = initContainerSpec.driverSparkConf ++
- Map(additionalDriverSparkConfKey -> additionalDriverSparkConfValue)
- initContainerSpec.copy(
- initContainer = initContainer,
- properties = initContainerProperties,
- driverSparkConf = driverSparkConf)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStepSuite.scala
deleted file mode 100644
index 4553f9f..0000000
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStepSuite.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps.initcontainer
-
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model._
-import org.mockito.{Mock, MockitoAnnotations}
-import org.mockito.Matchers.any
-import org.mockito.Mockito.when
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.deploy.k8s.{InitContainerBootstrap, PodWithDetachedInitContainer}
-import org.apache.spark.deploy.k8s.Config._
-
-class BasicInitContainerConfigurationStepSuite extends SparkFunSuite with BeforeAndAfter {
-
- private val SPARK_JARS = Seq(
- "hdfs://localhost:9000/app/jars/jar1.jar", "file:///app/jars/jar2.jar")
- private val SPARK_FILES = Seq(
- "hdfs://localhost:9000/app/files/file1.txt", "file:///app/files/file2.txt")
- private val JARS_DOWNLOAD_PATH = "/var/data/jars"
- private val FILES_DOWNLOAD_PATH = "/var/data/files"
- private val POD_LABEL = Map("bootstrap" -> "true")
- private val INIT_CONTAINER_NAME = "init-container"
- private val DRIVER_CONTAINER_NAME = "driver-container"
-
- @Mock
- private var podAndInitContainerBootstrap : InitContainerBootstrap = _
-
- before {
- MockitoAnnotations.initMocks(this)
- when(podAndInitContainerBootstrap.bootstrapInitContainer(
- any[PodWithDetachedInitContainer])).thenAnswer(new Answer[PodWithDetachedInitContainer] {
- override def answer(invocation: InvocationOnMock) : PodWithDetachedInitContainer = {
- val pod = invocation.getArgumentAt(0, classOf[PodWithDetachedInitContainer])
- pod.copy(
- pod = new PodBuilder(pod.pod)
- .withNewMetadata()
- .addToLabels("bootstrap", "true")
- .endMetadata()
- .withNewSpec().endSpec()
- .build(),
- initContainer = new ContainerBuilder()
- .withName(INIT_CONTAINER_NAME)
- .build(),
- mainContainer = new ContainerBuilder()
- .withName(DRIVER_CONTAINER_NAME)
- .build()
- )}})
- }
-
- test("additionalDriverSparkConf with mix of remote files and jars") {
- val baseInitStep = new BasicInitContainerConfigurationStep(
- SPARK_JARS,
- SPARK_FILES,
- JARS_DOWNLOAD_PATH,
- FILES_DOWNLOAD_PATH,
- podAndInitContainerBootstrap)
- val expectedDriverSparkConf = Map(
- JARS_DOWNLOAD_LOCATION.key -> JARS_DOWNLOAD_PATH,
- FILES_DOWNLOAD_LOCATION.key -> FILES_DOWNLOAD_PATH,
- INIT_CONTAINER_REMOTE_JARS.key -> "hdfs://localhost:9000/app/jars/jar1.jar",
- INIT_CONTAINER_REMOTE_FILES.key -> "hdfs://localhost:9000/app/files/file1.txt")
- val initContainerSpec = InitContainerSpec(
- Map.empty[String, String],
- Map.empty[String, String],
- new Container(),
- new Container(),
- new Pod,
- Seq.empty[HasMetadata])
- val returnContainerSpec = baseInitStep.configureInitContainer(initContainerSpec)
- assert(expectedDriverSparkConf === returnContainerSpec.properties)
- assert(returnContainerSpec.initContainer.getName === INIT_CONTAINER_NAME)
- assert(returnContainerSpec.driverContainer.getName === DRIVER_CONTAINER_NAME)
- assert(returnContainerSpec.driverPod.getMetadata.getLabels.asScala === POD_LABEL)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestratorSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestratorSuite.scala
deleted file mode 100644
index 09b42e4..0000000
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestratorSuite.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps.initcontainer
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-
-class InitContainerConfigOrchestratorSuite extends SparkFunSuite {
-
- private val DOCKER_IMAGE = "init-container"
- private val SPARK_JARS = Seq(
- "hdfs://localhost:9000/app/jars/jar1.jar", "file:///app/jars/jar2.jar")
- private val SPARK_FILES = Seq(
- "hdfs://localhost:9000/app/files/file1.txt", "file:///app/files/file2.txt")
- private val JARS_DOWNLOAD_PATH = "/var/data/jars"
- private val FILES_DOWNLOAD_PATH = "/var/data/files"
- private val DOCKER_IMAGE_PULL_POLICY: String = "IfNotPresent"
- private val CUSTOM_LABEL_KEY = "customLabel"
- private val CUSTOM_LABEL_VALUE = "customLabelValue"
- private val INIT_CONTAINER_CONFIG_MAP_NAME = "spark-init-config-map"
- private val INIT_CONTAINER_CONFIG_MAP_KEY = "spark-init-config-map-key"
- private val SECRET_FOO = "foo"
- private val SECRET_BAR = "bar"
- private val SECRET_MOUNT_PATH = "/etc/secrets/init-container"
-
- test("including basic configuration step") {
- val sparkConf = new SparkConf(true)
- .set(CONTAINER_IMAGE, DOCKER_IMAGE)
- .set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE)
-
- val orchestrator = new InitContainerConfigOrchestrator(
- SPARK_JARS.take(1),
- SPARK_FILES,
- JARS_DOWNLOAD_PATH,
- FILES_DOWNLOAD_PATH,
- DOCKER_IMAGE_PULL_POLICY,
- INIT_CONTAINER_CONFIG_MAP_NAME,
- INIT_CONTAINER_CONFIG_MAP_KEY,
- sparkConf)
- val initSteps = orchestrator.getAllConfigurationSteps
- assert(initSteps.lengthCompare(1) == 0)
- assert(initSteps.head.isInstanceOf[BasicInitContainerConfigurationStep])
- }
-
- test("including step to mount user-specified secrets") {
- val sparkConf = new SparkConf(false)
- .set(CONTAINER_IMAGE, DOCKER_IMAGE)
- .set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_FOO", SECRET_MOUNT_PATH)
- .set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_BAR", SECRET_MOUNT_PATH)
-
- val orchestrator = new InitContainerConfigOrchestrator(
- SPARK_JARS.take(1),
- SPARK_FILES,
- JARS_DOWNLOAD_PATH,
- FILES_DOWNLOAD_PATH,
- DOCKER_IMAGE_PULL_POLICY,
- INIT_CONTAINER_CONFIG_MAP_NAME,
- INIT_CONTAINER_CONFIG_MAP_KEY,
- sparkConf)
- val initSteps = orchestrator.getAllConfigurationSteps
- assert(initSteps.length === 2)
- assert(initSteps.head.isInstanceOf[BasicInitContainerConfigurationStep])
- assert(initSteps(1).isInstanceOf[InitContainerMountSecretsStep])
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStepSuite.scala
deleted file mode 100644
index 7ac0bde..0000000
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStepSuite.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps.initcontainer
-
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.deploy.k8s.{MountSecretsBootstrap, SecretVolumeUtils}
-
-class InitContainerMountSecretsStepSuite extends SparkFunSuite {
-
- private val SECRET_FOO = "foo"
- private val SECRET_BAR = "bar"
- private val SECRET_MOUNT_PATH = "/etc/secrets/init-container"
-
- test("mounts all given secrets") {
- val baseInitContainerSpec = InitContainerSpec(
- Map.empty,
- Map.empty,
- new ContainerBuilder().build(),
- new ContainerBuilder().build(),
- new PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build(),
- Seq.empty)
- val secretNamesToMountPaths = Map(
- SECRET_FOO -> SECRET_MOUNT_PATH,
- SECRET_BAR -> SECRET_MOUNT_PATH)
-
- val mountSecretsBootstrap = new MountSecretsBootstrap(secretNamesToMountPaths)
- val initContainerMountSecretsStep = new InitContainerMountSecretsStep(mountSecretsBootstrap)
- val configuredInitContainerSpec = initContainerMountSecretsStep.configureInitContainer(
- baseInitContainerSpec)
- val initContainerWithSecretsMounted = configuredInitContainerSpec.initContainer
-
- Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
- assert(SecretVolumeUtils.containerHasVolume(
- initContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH)))
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
index a3c615b..7755b93 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
@@ -19,15 +19,13 @@ package org.apache.spark.scheduler.cluster.k8s
import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model._
-import org.mockito.{AdditionalAnswers, MockitoAnnotations}
-import org.mockito.Matchers.any
-import org.mockito.Mockito._
+import org.mockito.MockitoAnnotations
import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach}
import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.{InitContainerBootstrap, MountSecretsBootstrap, PodWithDetachedInitContainer, SecretVolumeUtils}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.MountSecretsBootstrap
class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterEach {
@@ -55,10 +53,11 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
.set(KUBERNETES_DRIVER_POD_NAME, driverPodName)
.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, executorPrefix)
.set(CONTAINER_IMAGE, executorImage)
+ .set(KUBERNETES_DRIVER_SUBMIT_CHECK, true)
}
test("basic executor pod has reasonable defaults") {
- val factory = new ExecutorPodFactory(baseConf, None, None, None)
+ val factory = new ExecutorPodFactory(baseConf, None)
val executor = factory.createExecutorPod(
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
@@ -89,7 +88,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX,
"loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple")
- val factory = new ExecutorPodFactory(conf, None, None, None)
+ val factory = new ExecutorPodFactory(conf, None)
val executor = factory.createExecutorPod(
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
@@ -101,7 +100,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
conf.set(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS, "foo=bar")
conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz")
- val factory = new ExecutorPodFactory(conf, None, None, None)
+ val factory = new ExecutorPodFactory(conf, None)
val executor = factory.createExecutorPod(
"1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]())
@@ -116,11 +115,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
val conf = baseConf.clone()
val secretsBootstrap = new MountSecretsBootstrap(Map("secret1" -> "/var/secret1"))
- val factory = new ExecutorPodFactory(
- conf,
- Some(secretsBootstrap),
- None,
- None)
+ val factory = new ExecutorPodFactory(conf, Some(secretsBootstrap))
val executor = factory.createExecutorPod(
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
@@ -138,50 +133,6 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
checkOwnerReferences(executor, driverPodUid)
}
- test("init-container bootstrap step adds an init container") {
- val conf = baseConf.clone()
- val initContainerBootstrap = mock(classOf[InitContainerBootstrap])
- when(initContainerBootstrap.bootstrapInitContainer(
- any(classOf[PodWithDetachedInitContainer]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
-
- val factory = new ExecutorPodFactory(
- conf,
- None,
- Some(initContainerBootstrap),
- None)
- val executor = factory.createExecutorPod(
- "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
-
- assert(executor.getSpec.getInitContainers.size() === 1)
- checkOwnerReferences(executor, driverPodUid)
- }
-
- test("init-container with secrets mount bootstrap") {
- val conf = baseConf.clone()
- val initContainerBootstrap = mock(classOf[InitContainerBootstrap])
- when(initContainerBootstrap.bootstrapInitContainer(
- any(classOf[PodWithDetachedInitContainer]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
- val secretsBootstrap = new MountSecretsBootstrap(Map("secret1" -> "/var/secret1"))
-
- val factory = new ExecutorPodFactory(
- conf,
- Some(secretsBootstrap),
- Some(initContainerBootstrap),
- Some(secretsBootstrap))
- val executor = factory.createExecutorPod(
- "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
-
- assert(executor.getSpec.getVolumes.size() === 1)
- assert(SecretVolumeUtils.podHasVolume(executor, "secret1-volume"))
- assert(SecretVolumeUtils.containerHasVolume(
- executor.getSpec.getContainers.get(0), "secret1-volume", "/var/secret1"))
- assert(executor.getSpec.getInitContainers.size() === 1)
- assert(SecretVolumeUtils.containerHasVolume(
- executor.getSpec.getInitContainers.get(0), "secret1-volume", "/var/secret1"))
-
- checkOwnerReferences(executor, driverPodUid)
- }
-
// There is always exactly one controller reference, and it points to the driver pod.
private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
assert(executor.getMetadata.getOwnerReferences.size() === 1)
@@ -197,8 +148,8 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
ENV_EXECUTOR_CORES -> "1",
ENV_EXECUTOR_MEMORY -> "1g",
ENV_APPLICATION_ID -> "dummy",
- ENV_EXECUTOR_POD_IP -> null,
- ENV_MOUNTED_CLASSPATH -> "/var/spark-data/spark-jars/*") ++ additionalEnvVars
+ ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
+ ENV_EXECUTOR_POD_IP -> null) ++ additionalEnvVars
assert(executor.getSpec.getContainers.size() === 1)
assert(executor.getSpec.getContainers.get(0).getEnv.size() === defaultEnvs.size)
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile
index 491b7cf..9badf85 100644
--- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile
@@ -40,7 +40,6 @@ RUN set -ex && \
COPY ${spark_jars} /opt/spark/jars
COPY bin /opt/spark/bin
COPY sbin /opt/spark/sbin
-COPY conf /opt/spark/conf
COPY ${img_path}/spark/entrypoint.sh /opt/
COPY examples /opt/spark/examples
COPY data /opt/spark/data
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
index d0cf284..3e16611 100755
--- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
@@ -56,14 +56,10 @@ fi
case "$SPARK_K8S_CMD" in
driver)
CMD=(
- ${JAVA_HOME}/bin/java
- "${SPARK_JAVA_OPTS[@]}"
- -cp "$SPARK_CLASSPATH"
- -Xms$SPARK_DRIVER_MEMORY
- -Xmx$SPARK_DRIVER_MEMORY
- -Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS
- $SPARK_DRIVER_CLASS
- $SPARK_DRIVER_ARGS
+ "$SPARK_HOME/bin/spark-submit"
+ --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS"
+ --deploy-mode client
+ "$@"
)
;;
@@ -83,14 +79,6 @@ case "$SPARK_K8S_CMD" in
)
;;
- init)
- CMD=(
- "$SPARK_HOME/bin/spark-class"
- "org.apache.spark.deploy.k8s.SparkPodInitContainer"
- "$@"
- )
- ;;
-
*)
echo "Unknown command: $SPARK_K8S_CMD" 1>&2
exit 1
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org
[2/2] spark git commit: [SPARK-22839][K8S] Remove the use of
init-container for downloading remote dependencies
Posted by mc...@apache.org.
[SPARK-22839][K8S] Remove the use of init-container for downloading remote dependencies
## What changes were proposed in this pull request?
Removal of the init-container for downloading remote dependencies. Built off of the work done by vanzin in an attempt to refactor driver/executor configuration elaborated in [this](https://issues.apache.org/jira/browse/SPARK-22839) ticket.
## How was this patch tested?
This patch was tested with unit and integration tests.
Author: Ilan Filonenko <if...@cornell.edu>
Closes #20669 from ifilonenko/remove-init-container.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f15906da
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f15906da
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f15906da
Branch: refs/heads/master
Commit: f15906da153f139b698e192ec6f82f078f896f1e
Parents: 4de638c
Author: Ilan Filonenko <if...@cornell.edu>
Authored: Mon Mar 19 11:29:56 2018 -0700
Committer: mcheah <mc...@palantir.com>
Committed: Mon Mar 19 11:29:56 2018 -0700
----------------------------------------------------------------------
bin/docker-image-tool.sh | 9 +-
.../org/apache/spark/deploy/SparkSubmit.scala | 2 -
docs/running-on-kubernetes.md | 71 +-------
.../spark/examples/SparkRemoteFileTest.scala | 48 ++++++
.../org/apache/spark/deploy/k8s/Config.scala | 73 +--------
.../org/apache/spark/deploy/k8s/Constants.scala | 21 +--
.../deploy/k8s/InitContainerBootstrap.scala | 120 --------------
.../spark/deploy/k8s/KubernetesUtils.scala | 63 +-------
.../k8s/PodWithDetachedInitContainer.scala | 31 ----
.../deploy/k8s/SparkPodInitContainer.scala | 116 --------------
.../k8s/submit/DriverConfigOrchestrator.scala | 45 +-----
.../submit/KubernetesClientApplication.scala | 84 ++++++----
.../steps/BasicDriverConfigurationStep.scala | 32 ++--
.../submit/steps/DependencyResolutionStep.scala | 18 +--
.../DriverInitContainerBootstrapStep.scala | 95 -----------
.../steps/DriverKubernetesCredentialsStep.scala | 2 +-
.../BasicInitContainerConfigurationStep.scala | 67 --------
.../InitContainerConfigOrchestrator.scala | 79 ---------
.../InitContainerConfigurationStep.scala | 25 ---
.../InitContainerMountSecretsStep.scala | 36 -----
.../steps/initcontainer/InitContainerSpec.scala | 37 -----
.../cluster/k8s/ExecutorPodFactory.scala | 43 +----
.../cluster/k8s/KubernetesClusterManager.scala | 65 +-------
.../deploy/k8s/SparkPodInitContainerSuite.scala | 86 ----------
.../spark/deploy/k8s/submit/ClientSuite.scala | 82 +++++-----
.../submit/DriverConfigOrchestratorSuite.scala | 41 +----
.../BasicDriverConfigurationStepSuite.scala | 8 +-
.../steps/DependencyResolutionStepSuite.scala | 32 ++--
.../DriverInitContainerBootstrapStepSuite.scala | 160 -------------------
...sicInitContainerConfigurationStepSuite.scala | 95 -----------
.../InitContainerConfigOrchestratorSuite.scala | 80 ----------
.../InitContainerMountSecretsStepSuite.scala | 52 ------
.../cluster/k8s/ExecutorPodFactorySuite.scala | 67 ++------
.../src/main/dockerfiles/spark/Dockerfile | 1 -
.../src/main/dockerfiles/spark/entrypoint.sh | 20 +--
35 files changed, 241 insertions(+), 1665 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/bin/docker-image-tool.sh
----------------------------------------------------------------------
diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh
index 0d0f564..f090240 100755
--- a/bin/docker-image-tool.sh
+++ b/bin/docker-image-tool.sh
@@ -64,9 +64,11 @@ function build {
error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
fi
+ local DOCKERFILE=${DOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
+
docker build "${BUILD_ARGS[@]}" \
-t $(image_ref spark) \
- -f "$IMG_PATH/spark/Dockerfile" .
+ -f "$DOCKERFILE" .
}
function push {
@@ -84,6 +86,7 @@ Commands:
push Push a pre-built image to a registry. Requires a repository address to be provided.
Options:
+ -f file Dockerfile to build. By default builds the Dockerfile shipped with Spark.
-r repo Repository address.
-t tag Tag to apply to the built image, or to identify the image to be pushed.
-m Use minikube's Docker daemon.
@@ -113,10 +116,12 @@ fi
REPO=
TAG=
-while getopts mr:t: option
+DOCKERFILE=
+while getopts f:mr:t: option
do
case "${option}"
in
+ f) DOCKERFILE=${OPTARG};;
r) REPO=${OPTARG};;
t) TAG=${OPTARG};;
m)
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 1e38196..329bde0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -320,8 +320,6 @@ object SparkSubmit extends CommandLineUtils with Logging {
printErrorAndExit("Python applications are currently not supported for Kubernetes.")
case (KUBERNETES, _) if args.isR =>
printErrorAndExit("R applications are currently not supported for Kubernetes.")
- case (KUBERNETES, CLIENT) =>
- printErrorAndExit("Client mode is currently not supported for Kubernetes.")
case (LOCAL, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) =>
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/docs/running-on-kubernetes.md
----------------------------------------------------------------------
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 3c7586e..975b28d 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -126,29 +126,6 @@ Those dependencies can be added to the classpath by referencing them with `local
dependencies in custom-built Docker images in `spark-submit`. Note that using application dependencies from the submission
client's local file system is currently not yet supported.
-
-### Using Remote Dependencies
-When there are application dependencies hosted in remote locations like HDFS or HTTP servers, the driver and executor pods
-need a Kubernetes [init-container](https://kubernetes.io/docs/concepts/workloads/pods/init-containers/) for downloading
-the dependencies so the driver and executor containers can use them locally.
-
-The init-container handles remote dependencies specified in `spark.jars` (or the `--jars` option of `spark-submit`) and
-`spark.files` (or the `--files` option of `spark-submit`). It also handles remotely hosted main application resources, e.g.,
-the main application jar. The following shows an example of using remote dependencies with the `spark-submit` command:
-
-```bash
-$ bin/spark-submit \
- --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
- --deploy-mode cluster \
- --name spark-pi \
- --class org.apache.spark.examples.SparkPi \
- --jars https://path/to/dependency1.jar,https://path/to/dependency2.jar
- --files hdfs://host:port/path/to/file1,hdfs://host:port/path/to/file2
- --conf spark.executor.instances=5 \
- --conf spark.kubernetes.container.image=<spark-image> \
- https://path/to/examples.jar
-```
-
## Secret Management
Kubernetes [Secrets](https://kubernetes.io/docs/concepts/configuration/secret/) can be used to provide credentials for a
Spark application to access secured services. To mount a user-specified secret into the driver container, users can use
@@ -163,10 +140,6 @@ namespace as that of the driver and executor pods. For example, to mount a secre
--conf spark.kubernetes.executor.secrets.spark-secret=/etc/secrets
```
-Note that if an init-container is used, any secret mounted into the driver container will also be mounted into the
-init-container of the driver. Similarly, any secret mounted into an executor container will also be mounted into the
-init-container of the executor.
-
## Introspection and Debugging
These are the different ways in which you can investigate a running/completed Spark application, monitor progress, and
@@ -605,50 +578,11 @@ specific to Spark on Kubernetes.
</td>
</tr>
<tr>
- <td><code>spark.kubernetes.mountDependencies.jarsDownloadDir</code></td>
- <td><code>/var/spark-data/spark-jars</code></td>
- <td>
- Location to download jars to in the driver and executors.
- This directory must be empty and will be mounted as an empty directory volume on the driver and executor pods.
- </td>
-</tr>
-<tr>
- <td><code>spark.kubernetes.mountDependencies.filesDownloadDir</code></td>
- <td><code>/var/spark-data/spark-files</code></td>
- <td>
- Location to download jars to in the driver and executors.
- This directory must be empty and will be mounted as an empty directory volume on the driver and executor pods.
- </td>
-</tr>
-<tr>
- <td><code>spark.kubernetes.mountDependencies.timeout</code></td>
- <td>300s</td>
- <td>
- Timeout in seconds before aborting the attempt to download and unpack dependencies from remote locations into
- the driver and executor pods.
- </td>
-</tr>
-<tr>
- <td><code>spark.kubernetes.mountDependencies.maxSimultaneousDownloads</code></td>
- <td>5</td>
- <td>
- Maximum number of remote dependencies to download simultaneously in a driver or executor pod.
- </td>
-</tr>
-<tr>
- <td><code>spark.kubernetes.initContainer.image</code></td>
- <td><code>(value of spark.kubernetes.container.image)</code></td>
- <td>
- Custom container image for the init container of both driver and executors.
- </td>
-</tr>
-<tr>
<td><code>spark.kubernetes.driver.secrets.[SecretName]</code></td>
<td>(none)</td>
<td>
Add the <a href="https://kubernetes.io/docs/concepts/configuration/secret/">Kubernetes Secret</a> named <code>SecretName</code> to the driver pod on the path specified in the value. For example,
- <code>spark.kubernetes.driver.secrets.spark-secret=/etc/secrets</code>. Note that if an init-container is used,
- the secret will also be added to the init-container in the driver pod.
+ <code>spark.kubernetes.driver.secrets.spark-secret=/etc/secrets</code>.
</td>
</tr>
<tr>
@@ -656,8 +590,7 @@ specific to Spark on Kubernetes.
<td>(none)</td>
<td>
Add the <a href="https://kubernetes.io/docs/concepts/configuration/secret/">Kubernetes Secret</a> named <code>SecretName</code> to the executor pod on the path specified in the value. For example,
- <code>spark.kubernetes.executor.secrets.spark-secret=/etc/secrets</code>. Note that if an init-container is used,
- the secret will also be added to the init-container in the executor pod.
+ <code>spark.kubernetes.executor.secrets.spark-secret=/etc/secrets</code>.
</td>
</tr>
</table>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/examples/src/main/scala/org/apache/spark/examples/SparkRemoteFileTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkRemoteFileTest.scala b/examples/src/main/scala/org/apache/spark/examples/SparkRemoteFileTest.scala
new file mode 100644
index 0000000..64076f2
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkRemoteFileTest.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples
+
+import java.io.File
+
+import org.apache.spark.SparkFiles
+import org.apache.spark.sql.SparkSession
+
+/** Usage: SparkRemoteFileTest [file] */
+object SparkRemoteFileTest {
+ def main(args: Array[String]) {
+ if (args.length < 1) {
+ System.err.println("Usage: SparkRemoteFileTest <file>")
+ System.exit(1)
+ }
+ val spark = SparkSession
+ .builder()
+ .appName("SparkRemoteFileTest")
+ .getOrCreate()
+ val sc = spark.sparkContext
+ val rdd = sc.parallelize(Seq(1)).map(_ => {
+ val localLocation = SparkFiles.get(args(0))
+ println(s"${args(0)} is stored at: $localLocation")
+ new File(localLocation).isFile
+ })
+ val truthCheck = rdd.collect().head
+ println(s"Mounting of ${args(0)} was $truthCheck")
+ spark.stop()
+ }
+}
+// scalastyle:on println
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 471196a..da34a7e 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -79,6 +79,12 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional
+ val KUBERNETES_DRIVER_SUBMIT_CHECK =
+ ConfigBuilder("spark.kubernetes.submitInDriver")
+ .internal()
+ .booleanConf
+ .createOptional
+
val KUBERNETES_EXECUTOR_LIMIT_CORES =
ConfigBuilder("spark.kubernetes.executor.limit.cores")
.doc("Specify the hard cpu limit for each executor pod")
@@ -135,73 +141,6 @@ private[spark] object Config extends Logging {
.checkValue(interval => interval > 0, s"Logging interval must be a positive time value.")
.createWithDefaultString("1s")
- val JARS_DOWNLOAD_LOCATION =
- ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir")
- .doc("Location to download jars to in the driver and executors. When using " +
- "spark-submit, this directory must be empty and will be mounted as an empty directory " +
- "volume on the driver and executor pod.")
- .stringConf
- .createWithDefault("/var/spark-data/spark-jars")
-
- val FILES_DOWNLOAD_LOCATION =
- ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir")
- .doc("Location to download files to in the driver and executors. When using " +
- "spark-submit, this directory must be empty and will be mounted as an empty directory " +
- "volume on the driver and executor pods.")
- .stringConf
- .createWithDefault("/var/spark-data/spark-files")
-
- val INIT_CONTAINER_IMAGE =
- ConfigBuilder("spark.kubernetes.initContainer.image")
- .doc("Image for the driver and executor's init-container for downloading dependencies.")
- .fallbackConf(CONTAINER_IMAGE)
-
- val INIT_CONTAINER_MOUNT_TIMEOUT =
- ConfigBuilder("spark.kubernetes.mountDependencies.timeout")
- .doc("Timeout before aborting the attempt to download and unpack dependencies from remote " +
- "locations into the driver and executor pods.")
- .timeConf(TimeUnit.SECONDS)
- .createWithDefault(300)
-
- val INIT_CONTAINER_MAX_THREAD_POOL_SIZE =
- ConfigBuilder("spark.kubernetes.mountDependencies.maxSimultaneousDownloads")
- .doc("Maximum number of remote dependencies to download simultaneously in a driver or " +
- "executor pod.")
- .intConf
- .createWithDefault(5)
-
- val INIT_CONTAINER_REMOTE_JARS =
- ConfigBuilder("spark.kubernetes.initContainer.remoteJars")
- .doc("Comma-separated list of jar URIs to download in the init-container. This is " +
- "calculated from spark.jars.")
- .internal()
- .stringConf
- .createOptional
-
- val INIT_CONTAINER_REMOTE_FILES =
- ConfigBuilder("spark.kubernetes.initContainer.remoteFiles")
- .doc("Comma-separated list of file URIs to download in the init-container. This is " +
- "calculated from spark.files.")
- .internal()
- .stringConf
- .createOptional
-
- val INIT_CONTAINER_CONFIG_MAP_NAME =
- ConfigBuilder("spark.kubernetes.initContainer.configMapName")
- .doc("Name of the config map to use in the init-container that retrieves submitted files " +
- "for the executor.")
- .internal()
- .stringConf
- .createOptional
-
- val INIT_CONTAINER_CONFIG_MAP_KEY_CONF =
- ConfigBuilder("spark.kubernetes.initContainer.configMapKey")
- .doc("Key for the entry in the init container config map for submitted files that " +
- "corresponds to the properties for this init-container.")
- .internal()
- .stringConf
- .createOptional
-
val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index 9411956..8da5f24 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -63,22 +63,13 @@ private[spark] object Constants {
val ENV_MOUNTED_CLASSPATH = "SPARK_MOUNTED_CLASSPATH"
val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"
val ENV_CLASSPATH = "SPARK_CLASSPATH"
- val ENV_DRIVER_MAIN_CLASS = "SPARK_DRIVER_CLASS"
- val ENV_DRIVER_ARGS = "SPARK_DRIVER_ARGS"
- val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS"
val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
- val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"
- val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"
-
- // Bootstrapping dependencies with the init-container
- val INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME = "download-jars-volume"
- val INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME = "download-files-volume"
- val INIT_CONTAINER_PROPERTIES_FILE_VOLUME = "spark-init-properties"
- val INIT_CONTAINER_PROPERTIES_FILE_DIR = "/etc/spark-init"
- val INIT_CONTAINER_PROPERTIES_FILE_NAME = "spark-init.properties"
- val INIT_CONTAINER_PROPERTIES_FILE_PATH =
- s"$INIT_CONTAINER_PROPERTIES_FILE_DIR/$INIT_CONTAINER_PROPERTIES_FILE_NAME"
- val INIT_CONTAINER_SECRET_VOLUME_NAME = "spark-init-secret"
+ val ENV_SPARK_CONF_DIR = "SPARK_CONF_DIR"
+ // Spark app configs for containers
+ val SPARK_CONF_VOLUME = "spark-conf-volume"
+ val SPARK_CONF_DIR_INTERNAL = "/opt/spark/conf"
+ val SPARK_CONF_FILE_NAME = "spark.properties"
+ val SPARK_CONF_PATH = s"$SPARK_CONF_DIR_INTERNAL/$SPARK_CONF_FILE_NAME"
// Miscellaneous
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/InitContainerBootstrap.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/InitContainerBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/InitContainerBootstrap.scala
deleted file mode 100644
index f6a57df..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/InitContainerBootstrap.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s
-
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, EnvVarBuilder, PodBuilder, VolumeMount, VolumeMountBuilder}
-
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-
-/**
- * Bootstraps an init-container for downloading remote dependencies. This is separated out from
- * the init-container steps API because this component can be used to bootstrap init-containers
- * for both the driver and executors.
- */
-private[spark] class InitContainerBootstrap(
- initContainerImage: String,
- imagePullPolicy: String,
- jarsDownloadPath: String,
- filesDownloadPath: String,
- configMapName: String,
- configMapKey: String,
- sparkRole: String,
- sparkConf: SparkConf) {
-
- /**
- * Bootstraps an init-container that downloads dependencies to be used by a main container.
- */
- def bootstrapInitContainer(
- original: PodWithDetachedInitContainer): PodWithDetachedInitContainer = {
- val sharedVolumeMounts = Seq[VolumeMount](
- new VolumeMountBuilder()
- .withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
- .withMountPath(jarsDownloadPath)
- .build(),
- new VolumeMountBuilder()
- .withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
- .withMountPath(filesDownloadPath)
- .build())
-
- val customEnvVarKeyPrefix = sparkRole match {
- case SPARK_POD_DRIVER_ROLE => KUBERNETES_DRIVER_ENV_KEY
- case SPARK_POD_EXECUTOR_ROLE => "spark.executorEnv."
- case _ => throw new SparkException(s"$sparkRole is not a valid Spark pod role")
- }
- val customEnvVars = sparkConf.getAllWithPrefix(customEnvVarKeyPrefix).toSeq.map {
- case (key, value) =>
- new EnvVarBuilder()
- .withName(key)
- .withValue(value)
- .build()
- }
-
- val initContainer = new ContainerBuilder(original.initContainer)
- .withName("spark-init")
- .withImage(initContainerImage)
- .withImagePullPolicy(imagePullPolicy)
- .addAllToEnv(customEnvVars.asJava)
- .addNewVolumeMount()
- .withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
- .withMountPath(INIT_CONTAINER_PROPERTIES_FILE_DIR)
- .endVolumeMount()
- .addToVolumeMounts(sharedVolumeMounts: _*)
- .addToArgs("init")
- .addToArgs(INIT_CONTAINER_PROPERTIES_FILE_PATH)
- .build()
-
- val podWithBasicVolumes = new PodBuilder(original.pod)
- .editSpec()
- .addNewVolume()
- .withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
- .withNewConfigMap()
- .withName(configMapName)
- .addNewItem()
- .withKey(configMapKey)
- .withPath(INIT_CONTAINER_PROPERTIES_FILE_NAME)
- .endItem()
- .endConfigMap()
- .endVolume()
- .addNewVolume()
- .withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
- .withEmptyDir(new EmptyDirVolumeSource())
- .endVolume()
- .addNewVolume()
- .withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
- .withEmptyDir(new EmptyDirVolumeSource())
- .endVolume()
- .endSpec()
- .build()
-
- val mainContainer = new ContainerBuilder(original.mainContainer)
- .addToVolumeMounts(sharedVolumeMounts: _*)
- .addNewEnv()
- .withName(ENV_MOUNTED_FILES_DIR)
- .withValue(filesDownloadPath)
- .endEnv()
- .build()
-
- PodWithDetachedInitContainer(
- podWithBasicVolumes,
- initContainer,
- mainContainer)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index 37331d8..5bc0701 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -16,10 +16,6 @@
*/
package org.apache.spark.deploy.k8s
-import java.io.File
-
-import io.fabric8.kubernetes.api.model.{Container, Pod, PodBuilder}
-
import org.apache.spark.SparkConf
import org.apache.spark.util.Utils
@@ -44,71 +40,22 @@ private[spark] object KubernetesUtils {
}
/**
- * Append the given init-container to a pod's list of init-containers.
- *
- * @param originalPodSpec original specification of the pod
- * @param initContainer the init-container to add to the pod
- * @return the pod with the init-container added to the list of InitContainers
- */
- def appendInitContainer(originalPodSpec: Pod, initContainer: Container): Pod = {
- new PodBuilder(originalPodSpec)
- .editOrNewSpec()
- .addToInitContainers(initContainer)
- .endSpec()
- .build()
- }
-
- /**
* For the given collection of file URIs, resolves them as follows:
- * - File URIs with scheme file:// are resolved to the given download path.
* - File URIs with scheme local:// resolve to just the path of the URI.
* - Otherwise, the URIs are returned as-is.
*/
- def resolveFileUris(
- fileUris: Iterable[String],
- fileDownloadPath: String): Iterable[String] = {
- fileUris.map { uri =>
- resolveFileUri(uri, fileDownloadPath, false)
- }
- }
-
- /**
- * If any file uri has any scheme other than local:// it is mapped as if the file
- * was downloaded to the file download path. Otherwise, it is mapped to the path
- * part of the URI.
- */
- def resolveFilePaths(fileUris: Iterable[String], fileDownloadPath: String): Iterable[String] = {
+ def resolveFileUrisAndPath(fileUris: Iterable[String]): Iterable[String] = {
fileUris.map { uri =>
- resolveFileUri(uri, fileDownloadPath, true)
- }
- }
-
- /**
- * Get from a given collection of file URIs the ones that represent remote files.
- */
- def getOnlyRemoteFiles(uris: Iterable[String]): Iterable[String] = {
- uris.filter { uri =>
- val scheme = Utils.resolveURI(uri).getScheme
- scheme != "file" && scheme != "local"
+ resolveFileUri(uri)
}
}
- private def resolveFileUri(
- uri: String,
- fileDownloadPath: String,
- assumesDownloaded: Boolean): String = {
+ private def resolveFileUri(uri: String): String = {
val fileUri = Utils.resolveURI(uri)
val fileScheme = Option(fileUri.getScheme).getOrElse("file")
fileScheme match {
- case "local" =>
- fileUri.getPath
- case _ =>
- if (assumesDownloaded || fileScheme == "file") {
- val fileName = new File(fileUri.getPath).getName
- s"$fileDownloadPath/$fileName"
- } else {
- uri
- }
+ case "local" => fileUri.getPath
+ case _ => uri
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/PodWithDetachedInitContainer.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/PodWithDetachedInitContainer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/PodWithDetachedInitContainer.scala
deleted file mode 100644
index 0b79f8b..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/PodWithDetachedInitContainer.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s
-
-import io.fabric8.kubernetes.api.model.{Container, Pod}
-
-/**
- * Represents a pod with a detached init-container (not yet added to the pod).
- *
- * @param pod the pod
- * @param initContainer the init-container in the pod
- * @param mainContainer the main container in the pod
- */
-private[spark] case class PodWithDetachedInitContainer(
- pod: Pod,
- initContainer: Container,
- mainContainer: Container)
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala
deleted file mode 100644
index c0f0878..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-
-import scala.concurrent.{ExecutionContext, Future}
-
-import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.{ThreadUtils, Utils}
-
-/**
- * Process that fetches files from a resource staging server and/or arbitrary remote locations.
- *
- * The init-container can handle fetching files from any of those sources, but not all of the
- * sources need to be specified. This allows for composing multiple instances of this container
- * with different configurations for different download sources, or using the same container to
- * download everything at once.
- */
-private[spark] class SparkPodInitContainer(
- sparkConf: SparkConf,
- fileFetcher: FileFetcher) extends Logging {
-
- private val maxThreadPoolSize = sparkConf.get(INIT_CONTAINER_MAX_THREAD_POOL_SIZE)
- private implicit val downloadExecutor = ExecutionContext.fromExecutorService(
- ThreadUtils.newDaemonCachedThreadPool("download-executor", maxThreadPoolSize))
-
- private val jarsDownloadDir = new File(sparkConf.get(JARS_DOWNLOAD_LOCATION))
- private val filesDownloadDir = new File(sparkConf.get(FILES_DOWNLOAD_LOCATION))
-
- private val remoteJars = sparkConf.get(INIT_CONTAINER_REMOTE_JARS)
- private val remoteFiles = sparkConf.get(INIT_CONTAINER_REMOTE_FILES)
-
- private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT)
-
- def run(): Unit = {
- logInfo(s"Downloading remote jars: $remoteJars")
- downloadFiles(
- remoteJars,
- jarsDownloadDir,
- s"Remote jars download directory specified at $jarsDownloadDir does not exist " +
- "or is not a directory.")
-
- logInfo(s"Downloading remote files: $remoteFiles")
- downloadFiles(
- remoteFiles,
- filesDownloadDir,
- s"Remote files download directory specified at $filesDownloadDir does not exist " +
- "or is not a directory.")
-
- downloadExecutor.shutdown()
- downloadExecutor.awaitTermination(downloadTimeoutMinutes, TimeUnit.MINUTES)
- }
-
- private def downloadFiles(
- filesCommaSeparated: Option[String],
- downloadDir: File,
- errMessage: String): Unit = {
- filesCommaSeparated.foreach { files =>
- require(downloadDir.isDirectory, errMessage)
- Utils.stringToSeq(files).foreach { file =>
- Future[Unit] {
- fileFetcher.fetchFile(file, downloadDir)
- }
- }
- }
- }
-}
-
-private class FileFetcher(sparkConf: SparkConf, securityManager: SparkSecurityManager) {
-
- def fetchFile(uri: String, targetDir: File): Unit = {
- Utils.fetchFile(
- url = uri,
- targetDir = targetDir,
- conf = sparkConf,
- securityMgr = securityManager,
- hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf),
- timestamp = System.currentTimeMillis(),
- useCache = false)
- }
-}
-
-object SparkPodInitContainer extends Logging {
-
- def main(args: Array[String]): Unit = {
- logInfo("Starting init-container to download Spark application dependencies.")
- val sparkConf = new SparkConf(true)
- if (args.nonEmpty) {
- Utils.loadDefaultSparkProperties(sparkConf, args(0))
- }
-
- val securityManager = new SparkSecurityManager(sparkConf)
- val fileFetcher = new FileFetcher(sparkConf, securityManager)
- new SparkPodInitContainer(sparkConf, fileFetcher).run()
- logInfo("Finished downloading application dependencies.")
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala
index ae70904..b4d3f04 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala
@@ -16,16 +16,11 @@
*/
package org.apache.spark.deploy.k8s.submit
-import java.util.UUID
-
-import com.google.common.primitives.Longs
-
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.steps._
-import org.apache.spark.deploy.k8s.submit.steps.initcontainer.InitContainerConfigOrchestrator
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.SystemClock
import org.apache.spark.util.Utils
@@ -34,13 +29,11 @@ import org.apache.spark.util.Utils
* Figures out and returns the complete ordered list of needed DriverConfigurationSteps to
* configure the Spark driver pod. The returned steps will be applied one by one in the given
* order to produce a final KubernetesDriverSpec that is used in KubernetesClientApplication
- * to construct and create the driver pod. It uses the InitContainerConfigOrchestrator to
- * configure the driver init-container if one is needed, i.e., when there are remote dependencies
- * to localize.
+ * to construct and create the driver pod.
*/
private[spark] class DriverConfigOrchestrator(
kubernetesAppId: String,
- launchTime: Long,
+ kubernetesResourceNamePrefix: String,
mainAppResource: Option[MainAppResource],
appName: String,
mainClass: String,
@@ -50,15 +43,8 @@ private[spark] class DriverConfigOrchestrator(
// The resource name prefix is derived from the Spark application name, making it easy to connect
// the names of the Kubernetes resources from e.g. kubectl or the Kubernetes dashboard to the
// application the user submitted.
- private val kubernetesResourceNamePrefix = {
- val uuid = UUID.nameUUIDFromBytes(Longs.toByteArray(launchTime)).toString.replaceAll("-", "")
- s"$appName-$uuid".toLowerCase.replaceAll("\\.", "-")
- }
private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
- private val initContainerConfigMapName = s"$kubernetesResourceNamePrefix-init-config"
- private val jarsDownloadPath = sparkConf.get(JARS_DOWNLOAD_LOCATION)
- private val filesDownloadPath = sparkConf.get(FILES_DOWNLOAD_LOCATION)
def getAllConfigurationSteps: Seq[DriverConfigurationStep] = {
val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
@@ -126,9 +112,7 @@ private[spark] class DriverConfigOrchestrator(
val dependencyResolutionStep = if (sparkJars.nonEmpty || sparkFiles.nonEmpty) {
Seq(new DependencyResolutionStep(
sparkJars,
- sparkFiles,
- jarsDownloadPath,
- filesDownloadPath))
+ sparkFiles))
} else {
Nil
}
@@ -139,33 +123,12 @@ private[spark] class DriverConfigOrchestrator(
Nil
}
- val initContainerBootstrapStep = if (existNonContainerLocalFiles(sparkJars ++ sparkFiles)) {
- val orchestrator = new InitContainerConfigOrchestrator(
- sparkJars,
- sparkFiles,
- jarsDownloadPath,
- filesDownloadPath,
- imagePullPolicy,
- initContainerConfigMapName,
- INIT_CONTAINER_PROPERTIES_FILE_NAME,
- sparkConf)
- val bootstrapStep = new DriverInitContainerBootstrapStep(
- orchestrator.getAllConfigurationSteps,
- initContainerConfigMapName,
- INIT_CONTAINER_PROPERTIES_FILE_NAME)
-
- Seq(bootstrapStep)
- } else {
- Nil
- }
-
Seq(
initialSubmissionStep,
serviceBootstrapStep,
kubernetesCredentialsStep) ++
dependencyResolutionStep ++
- mountSecretsStep ++
- initContainerBootstrapStep
+ mountSecretsStep
}
private def existSubmissionLocalFiles(files: Seq[String]): Boolean = {
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 5884348..e16d1ad 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -16,14 +16,14 @@
*/
package org.apache.spark.deploy.k8s.submit
+import java.io.StringWriter
import java.util.{Collections, UUID}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.util.control.NonFatal
+import java.util.Properties
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.mutable
+import scala.util.control.NonFatal
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkApplication
@@ -32,6 +32,7 @@ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.util.Utils
/**
@@ -93,10 +94,8 @@ private[spark] class Client(
kubernetesClient: KubernetesClient,
waitForAppCompletion: Boolean,
appName: String,
- watcher: LoggingPodStatusWatcher) extends Logging {
-
- private val driverJavaOptions = sparkConf.get(
- org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+ watcher: LoggingPodStatusWatcher,
+ kubernetesResourceNamePrefix: String) extends Logging {
/**
* Run command that initializes a DriverSpec that will be updated after each
@@ -110,33 +109,31 @@ private[spark] class Client(
for (nextStep <- submissionSteps) {
currentDriverSpec = nextStep.configureDriver(currentDriverSpec)
}
-
- val resolvedDriverJavaOpts = currentDriverSpec
- .driverSparkConf
- // Remove this as the options are instead extracted and set individually below using
- // environment variables with prefix SPARK_JAVA_OPT_.
- .remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
- .getAll
- .map {
- case (confKey, confValue) => s"-D$confKey=$confValue"
- } ++ driverJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
- val driverJavaOptsEnvs: Seq[EnvVar] = resolvedDriverJavaOpts.zipWithIndex.map {
- case (option, index) =>
- new EnvVarBuilder()
- .withName(s"$ENV_JAVA_OPT_PREFIX$index")
- .withValue(option)
- .build()
- }
-
+ val configMapName = s"$kubernetesResourceNamePrefix-driver-conf-map"
+ val configMap = buildConfigMap(configMapName, currentDriverSpec.driverSparkConf)
+ // The include of the ENV_VAR for "SPARK_CONF_DIR" is to allow for the
+ // Spark command builder to pickup on the Java Options present in the ConfigMap
val resolvedDriverContainer = new ContainerBuilder(currentDriverSpec.driverContainer)
- .addAllToEnv(driverJavaOptsEnvs.asJava)
+ .addNewEnv()
+ .withName(ENV_SPARK_CONF_DIR)
+ .withValue(SPARK_CONF_DIR_INTERNAL)
+ .endEnv()
+ .addNewVolumeMount()
+ .withName(SPARK_CONF_VOLUME)
+ .withMountPath(SPARK_CONF_DIR_INTERNAL)
+ .endVolumeMount()
.build()
val resolvedDriverPod = new PodBuilder(currentDriverSpec.driverPod)
.editSpec()
.addToContainers(resolvedDriverContainer)
+ .addNewVolume()
+ .withName(SPARK_CONF_VOLUME)
+ .withNewConfigMap()
+ .withName(configMapName)
+ .endConfigMap()
+ .endVolume()
.endSpec()
.build()
-
Utils.tryWithResource(
kubernetesClient
.pods()
@@ -145,7 +142,8 @@ private[spark] class Client(
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
try {
if (currentDriverSpec.otherKubernetesResources.nonEmpty) {
- val otherKubernetesResources = currentDriverSpec.otherKubernetesResources
+ val otherKubernetesResources =
+ currentDriverSpec.otherKubernetesResources ++ Seq(configMap)
addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
}
@@ -180,6 +178,26 @@ private[spark] class Client(
originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference))
}
}
+
+ // Build a Config Map that will house spark conf properties in a single file for spark-submit
+ private def buildConfigMap(configMapName: String, conf: SparkConf): ConfigMap = {
+ val properties = new Properties()
+ conf.getAll.foreach { case (k, v) =>
+ properties.setProperty(k, v)
+ }
+ val propertiesWriter = new StringWriter()
+ properties.store(propertiesWriter,
+ s"Java properties built from Kubernetes config map with name: $configMapName")
+
+ val namespace = conf.get(KUBERNETES_NAMESPACE)
+ new ConfigMapBuilder()
+ .withNewMetadata()
+ .withName(configMapName)
+ .withNamespace(namespace)
+ .endMetadata()
+ .addToData(SPARK_CONF_FILE_NAME, propertiesWriter.toString)
+ .build()
+ }
}
/**
@@ -202,6 +220,9 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
val launchTime = System.currentTimeMillis()
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
+ val kubernetesResourceNamePrefix = {
+ s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
+ }
// The master URL has been checked for validity already in SparkSubmit.
// We just need to get rid of the "k8s://" prefix here.
val master = sparkConf.get("spark.master").substring("k8s://".length)
@@ -211,7 +232,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
val orchestrator = new DriverConfigOrchestrator(
kubernetesAppId,
- launchTime,
+ kubernetesResourceNamePrefix,
clientArguments.mainAppResource,
appName,
clientArguments.mainClass,
@@ -231,7 +252,8 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
kubernetesClient,
waitForAppCompletion,
appName,
- watcher)
+ watcher,
+ kubernetesResourceNamePrefix)
client.run()
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
index 164e2e5..347c4d2 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
@@ -26,6 +26,7 @@ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesUtils
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
import org.apache.spark.internal.config.{DRIVER_CLASS_PATH, DRIVER_MEMORY, DRIVER_MEMORY_OVERHEAD}
+import org.apache.spark.launcher.SparkLauncher
/**
* Performs basic configuration for the driver pod.
@@ -56,8 +57,6 @@ private[spark] class BasicDriverConfigurationStep(
// Memory settings
private val driverMemoryMiB = sparkConf.get(DRIVER_MEMORY)
- private val driverMemoryString = sparkConf.get(
- DRIVER_MEMORY.key, DRIVER_MEMORY.defaultValueString)
private val memoryOverheadMiB = sparkConf
.get(DRIVER_MEMORY_OVERHEAD)
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB))
@@ -103,25 +102,13 @@ private[spark] class BasicDriverConfigurationStep(
("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
}
- val driverContainer = new ContainerBuilder(driverSpec.driverContainer)
+ val driverContainerWithoutArgs = new ContainerBuilder(driverSpec.driverContainer)
.withName(DRIVER_CONTAINER_NAME)
.withImage(driverContainerImage)
.withImagePullPolicy(imagePullPolicy)
.addAllToEnv(driverCustomEnvs.asJava)
.addToEnv(driverExtraClasspathEnv.toSeq: _*)
.addNewEnv()
- .withName(ENV_DRIVER_MEMORY)
- .withValue(driverMemoryString)
- .endEnv()
- .addNewEnv()
- .withName(ENV_DRIVER_MAIN_CLASS)
- .withValue(mainClass)
- .endEnv()
- .addNewEnv()
- .withName(ENV_DRIVER_ARGS)
- .withValue(appArgs.mkString(" "))
- .endEnv()
- .addNewEnv()
.withName(ENV_DRIVER_BIND_ADDRESS)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef("v1", "status.podIP")
@@ -134,7 +121,16 @@ private[spark] class BasicDriverConfigurationStep(
.addToLimits(maybeCpuLimitQuantity.toMap.asJava)
.endResources()
.addToArgs("driver")
- .build()
+ .addToArgs("--properties-file", SPARK_CONF_PATH)
+ .addToArgs("--class", mainClass)
+ // The user application jar is merged into the spark.jars list and managed through that
+ // property, so there is no need to reference it explicitly here.
+ .addToArgs(SparkLauncher.NO_RESOURCE)
+
+ val driverContainer = appArgs.toList match {
+ case "" :: Nil | Nil => driverContainerWithoutArgs.build()
+ case _ => driverContainerWithoutArgs.addToArgs(appArgs: _*).build()
+ }
val baseDriverPod = new PodBuilder(driverSpec.driverPod)
.editOrNewMetadata()
@@ -152,10 +148,14 @@ private[spark] class BasicDriverConfigurationStep(
.setIfMissing(KUBERNETES_DRIVER_POD_NAME, driverPodName)
.set("spark.app.id", kubernetesAppId)
.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, resourceNamePrefix)
+ // to set the config variables to allow client-mode spark-submit from driver
+ .set(KUBERNETES_DRIVER_SUBMIT_CHECK, true)
driverSpec.copy(
driverPod = baseDriverPod,
driverSparkConf = resolvedSparkConf,
driverContainer = driverContainer)
}
+
}
+
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
index d4b8323..43de329 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
@@ -30,13 +30,11 @@ import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
*/
private[spark] class DependencyResolutionStep(
sparkJars: Seq[String],
- sparkFiles: Seq[String],
- jarsDownloadPath: String,
- filesDownloadPath: String) extends DriverConfigurationStep {
+ sparkFiles: Seq[String]) extends DriverConfigurationStep {
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
- val resolvedSparkJars = KubernetesUtils.resolveFileUris(sparkJars, jarsDownloadPath)
- val resolvedSparkFiles = KubernetesUtils.resolveFileUris(sparkFiles, filesDownloadPath)
+ val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath(sparkJars)
+ val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath(sparkFiles)
val sparkConf = driverSpec.driverSparkConf.clone()
if (resolvedSparkJars.nonEmpty) {
@@ -45,14 +43,12 @@ private[spark] class DependencyResolutionStep(
if (resolvedSparkFiles.nonEmpty) {
sparkConf.set("spark.files", resolvedSparkFiles.mkString(","))
}
-
- val resolvedClasspath = KubernetesUtils.resolveFilePaths(sparkJars, jarsDownloadPath)
- val resolvedDriverContainer = if (resolvedClasspath.nonEmpty) {
+ val resolvedDriverContainer = if (resolvedSparkJars.nonEmpty) {
new ContainerBuilder(driverSpec.driverContainer)
.addNewEnv()
- .withName(ENV_MOUNTED_CLASSPATH)
- .withValue(resolvedClasspath.mkString(File.pathSeparator))
- .endEnv()
+ .withName(ENV_MOUNTED_CLASSPATH)
+ .withValue(resolvedSparkJars.mkString(File.pathSeparator))
+ .endEnv()
.build()
} else {
driverSpec.driverContainer
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStep.scala
deleted file mode 100644
index 9fb3daf..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStep.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import java.io.StringWriter
-import java.util.Properties
-
-import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, ContainerBuilder, HasMetadata}
-
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.KubernetesUtils
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-import org.apache.spark.deploy.k8s.submit.steps.initcontainer.{InitContainerConfigurationStep, InitContainerSpec}
-
-/**
- * Configures the driver init-container that localizes remote dependencies into the driver pod.
- * It applies the given InitContainerConfigurationSteps in the given order to produce a final
- * InitContainerSpec that is then used to configure the driver pod with the init-container attached.
- * It also builds a ConfigMap that will be mounted into the init-container. The ConfigMap carries
- * configuration properties for the init-container.
- */
-private[spark] class DriverInitContainerBootstrapStep(
- steps: Seq[InitContainerConfigurationStep],
- configMapName: String,
- configMapKey: String)
- extends DriverConfigurationStep {
-
- override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
- var initContainerSpec = InitContainerSpec(
- properties = Map.empty[String, String],
- driverSparkConf = Map.empty[String, String],
- initContainer = new ContainerBuilder().build(),
- driverContainer = driverSpec.driverContainer,
- driverPod = driverSpec.driverPod,
- dependentResources = Seq.empty[HasMetadata])
- for (nextStep <- steps) {
- initContainerSpec = nextStep.configureInitContainer(initContainerSpec)
- }
-
- val configMap = buildConfigMap(
- configMapName,
- configMapKey,
- initContainerSpec.properties)
- val resolvedDriverSparkConf = driverSpec.driverSparkConf
- .clone()
- .set(INIT_CONTAINER_CONFIG_MAP_NAME, configMapName)
- .set(INIT_CONTAINER_CONFIG_MAP_KEY_CONF, configMapKey)
- .setAll(initContainerSpec.driverSparkConf)
- val resolvedDriverPod = KubernetesUtils.appendInitContainer(
- initContainerSpec.driverPod, initContainerSpec.initContainer)
-
- driverSpec.copy(
- driverPod = resolvedDriverPod,
- driverContainer = initContainerSpec.driverContainer,
- driverSparkConf = resolvedDriverSparkConf,
- otherKubernetesResources =
- driverSpec.otherKubernetesResources ++
- initContainerSpec.dependentResources ++
- Seq(configMap))
- }
-
- private def buildConfigMap(
- configMapName: String,
- configMapKey: String,
- config: Map[String, String]): ConfigMap = {
- val properties = new Properties()
- config.foreach { entry =>
- properties.setProperty(entry._1, entry._2)
- }
- val propertiesWriter = new StringWriter()
- properties.store(propertiesWriter,
- s"Java properties built from Kubernetes config map with name: $configMapName " +
- s"and config map key: $configMapKey")
- new ConfigMapBuilder()
- .withNewMetadata()
- .withName(configMapName)
- .endMetadata()
- .addToData(configMapKey, propertiesWriter.toString)
- .build()
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
index ccc1890..2424e63 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
@@ -99,7 +99,7 @@ private[spark] class DriverKubernetesCredentialsStep(
}.getOrElse(driverSpec.driverPod)
)
- val driverContainerWithMountedSecretVolume = kubernetesCredentialsSecret.map { secret =>
+ val driverContainerWithMountedSecretVolume = kubernetesCredentialsSecret.map { _ =>
new ContainerBuilder(driverSpec.driverContainer)
.addNewVolumeMount()
.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStep.scala
deleted file mode 100644
index 0146985..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStep.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps.initcontainer
-
-import org.apache.spark.deploy.k8s.{InitContainerBootstrap, PodWithDetachedInitContainer}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.KubernetesUtils
-
-/**
- * Performs basic configuration for the driver init-container with most of the work delegated to
- * the given InitContainerBootstrap.
- */
-private[spark] class BasicInitContainerConfigurationStep(
- sparkJars: Seq[String],
- sparkFiles: Seq[String],
- jarsDownloadPath: String,
- filesDownloadPath: String,
- bootstrap: InitContainerBootstrap)
- extends InitContainerConfigurationStep {
-
- override def configureInitContainer(spec: InitContainerSpec): InitContainerSpec = {
- val remoteJarsToDownload = KubernetesUtils.getOnlyRemoteFiles(sparkJars)
- val remoteFilesToDownload = KubernetesUtils.getOnlyRemoteFiles(sparkFiles)
- val remoteJarsConf = if (remoteJarsToDownload.nonEmpty) {
- Map(INIT_CONTAINER_REMOTE_JARS.key -> remoteJarsToDownload.mkString(","))
- } else {
- Map()
- }
- val remoteFilesConf = if (remoteFilesToDownload.nonEmpty) {
- Map(INIT_CONTAINER_REMOTE_FILES.key -> remoteFilesToDownload.mkString(","))
- } else {
- Map()
- }
-
- val baseInitContainerConfig = Map(
- JARS_DOWNLOAD_LOCATION.key -> jarsDownloadPath,
- FILES_DOWNLOAD_LOCATION.key -> filesDownloadPath) ++
- remoteJarsConf ++
- remoteFilesConf
-
- val bootstrapped = bootstrap.bootstrapInitContainer(
- PodWithDetachedInitContainer(
- spec.driverPod,
- spec.initContainer,
- spec.driverContainer))
-
- spec.copy(
- initContainer = bootstrapped.initContainer,
- driverContainer = bootstrapped.mainContainer,
- driverPod = bootstrapped.pod,
- properties = spec.properties ++ baseInitContainerConfig)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestrator.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestrator.scala
deleted file mode 100644
index f2c29c7..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestrator.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps.initcontainer
-
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.k8s.{InitContainerBootstrap, KubernetesUtils, MountSecretsBootstrap}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-
-/**
- * Figures out and returns the complete ordered list of InitContainerConfigurationSteps required to
- * configure the driver init-container. The returned steps will be applied in the given order to
- * produce a final InitContainerSpec that is used to construct the driver init-container in
- * DriverInitContainerBootstrapStep. This class is only used when an init-container is needed, i.e.,
- * when there are remote application dependencies to localize.
- */
-private[spark] class InitContainerConfigOrchestrator(
- sparkJars: Seq[String],
- sparkFiles: Seq[String],
- jarsDownloadPath: String,
- filesDownloadPath: String,
- imagePullPolicy: String,
- configMapName: String,
- configMapKey: String,
- sparkConf: SparkConf) {
-
- private val initContainerImage = sparkConf
- .get(INIT_CONTAINER_IMAGE)
- .getOrElse(throw new SparkException(
- "Must specify the init-container image when there are remote dependencies"))
-
- def getAllConfigurationSteps: Seq[InitContainerConfigurationStep] = {
- val initContainerBootstrap = new InitContainerBootstrap(
- initContainerImage,
- imagePullPolicy,
- jarsDownloadPath,
- filesDownloadPath,
- configMapName,
- configMapKey,
- SPARK_POD_DRIVER_ROLE,
- sparkConf)
- val baseStep = new BasicInitContainerConfigurationStep(
- sparkJars,
- sparkFiles,
- jarsDownloadPath,
- filesDownloadPath,
- initContainerBootstrap)
-
- val secretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
- sparkConf,
- KUBERNETES_DRIVER_SECRETS_PREFIX)
- // Mount user-specified driver secrets also into the driver's init-container. The
- // init-container may need credentials in the secrets to be able to download remote
- // dependencies. The driver's main container and its init-container share the secrets
- // because the init-container is sort of an implementation details and this sharing
- // avoids introducing a dedicated configuration property just for the init-container.
- val mountSecretsStep = if (secretNamesToMountPaths.nonEmpty) {
- Seq(new InitContainerMountSecretsStep(new MountSecretsBootstrap(secretNamesToMountPaths)))
- } else {
- Nil
- }
-
- Seq(baseStep) ++ mountSecretsStep
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigurationStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigurationStep.scala
deleted file mode 100644
index 0372ad5..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigurationStep.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps.initcontainer
-
-/**
- * Represents a step in configuring the driver init-container.
- */
-private[spark] trait InitContainerConfigurationStep {
-
- def configureInitContainer(spec: InitContainerSpec): InitContainerSpec
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStep.scala
deleted file mode 100644
index 0daa7b9..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStep.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps.initcontainer
-
-import org.apache.spark.deploy.k8s.MountSecretsBootstrap
-
-/**
- * An init-container configuration step for mounting user-specified secrets onto user-specified
- * paths.
- *
- * @param bootstrap a utility actually handling mounting of the secrets
- */
-private[spark] class InitContainerMountSecretsStep(
- bootstrap: MountSecretsBootstrap) extends InitContainerConfigurationStep {
-
- override def configureInitContainer(spec: InitContainerSpec) : InitContainerSpec = {
- // Mount the secret volumes given that the volumes have already been added to the driver pod
- // when mounting the secrets into the main driver container.
- val initContainer = bootstrap.mountSecrets(spec.initContainer)
- spec.copy(initContainer = initContainer)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerSpec.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerSpec.scala
deleted file mode 100644
index b52c343..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerSpec.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps.initcontainer
-
-import io.fabric8.kubernetes.api.model.{Container, HasMetadata, Pod}
-
-/**
- * Represents a specification of the init-container for the driver pod.
- *
- * @param properties properties that should be set on the init-container
- * @param driverSparkConf Spark configuration properties that will be carried back to the driver
- * @param initContainer the init-container object
- * @param driverContainer the driver container object
- * @param driverPod the driver pod object
- * @param dependentResources resources the init-container depends on to work
- */
-private[spark] case class InitContainerSpec(
- properties: Map[String, String],
- driverSparkConf: Map[String, String],
- initContainer: Container,
- driverContainer: Container,
- driverPod: Pod,
- dependentResources: Seq[HasMetadata])
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
index 141bd28..98cbd56 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model._
import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.k8s.{InitContainerBootstrap, KubernetesUtils, MountSecretsBootstrap, PodWithDetachedInitContainer}
+import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
@@ -34,18 +34,10 @@ import org.apache.spark.util.Utils
* @param sparkConf Spark configuration
* @param mountSecretsBootstrap an optional component for mounting user-specified secrets onto
* user-specified paths into the executor container
- * @param initContainerBootstrap an optional component for bootstrapping the executor init-container
- * if one is needed, i.e., when there are remote dependencies to
- * localize
- * @param initContainerMountSecretsBootstrap an optional component for mounting user-specified
- * secrets onto user-specified paths into the executor
- * init-container
*/
private[spark] class ExecutorPodFactory(
sparkConf: SparkConf,
- mountSecretsBootstrap: Option[MountSecretsBootstrap],
- initContainerBootstrap: Option[InitContainerBootstrap],
- initContainerMountSecretsBootstrap: Option[MountSecretsBootstrap]) {
+ mountSecretsBootstrap: Option[MountSecretsBootstrap]) {
private val executorExtraClasspath = sparkConf.get(EXECUTOR_CLASS_PATH)
@@ -94,8 +86,6 @@ private[spark] class ExecutorPodFactory(
private val executorCores = sparkConf.getDouble("spark.executor.cores", 1)
private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
- private val executorJarsDownloadDir = sparkConf.get(JARS_DOWNLOAD_LOCATION)
-
/**
* Configure and construct an executor pod with the given parameters.
*/
@@ -147,8 +137,9 @@ private[spark] class ExecutorPodFactory(
(ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString),
(ENV_EXECUTOR_MEMORY, executorMemoryString),
(ENV_APPLICATION_ID, applicationId),
- (ENV_EXECUTOR_ID, executorId),
- (ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*")) ++ executorEnvs)
+ // This is to set the SPARK_CONF_DIR to be /opt/spark/conf
+ (ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL),
+ (ENV_EXECUTOR_ID, executorId)) ++ executorEnvs)
.map(env => new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
@@ -221,30 +212,10 @@ private[spark] class ExecutorPodFactory(
(bootstrap.addSecretVolumes(executorPod), bootstrap.mountSecrets(containerWithLimitCores))
}.getOrElse((executorPod, containerWithLimitCores))
- val (bootstrappedPod, bootstrappedContainer) =
- initContainerBootstrap.map { bootstrap =>
- val podWithInitContainer = bootstrap.bootstrapInitContainer(
- PodWithDetachedInitContainer(
- maybeSecretsMountedPod,
- new ContainerBuilder().build(),
- maybeSecretsMountedContainer))
-
- val (pod, mayBeSecretsMountedInitContainer) =
- initContainerMountSecretsBootstrap.map { bootstrap =>
- // Mount the secret volumes given that the volumes have already been added to the
- // executor pod when mounting the secrets into the main executor container.
- (podWithInitContainer.pod, bootstrap.mountSecrets(podWithInitContainer.initContainer))
- }.getOrElse((podWithInitContainer.pod, podWithInitContainer.initContainer))
-
- val bootstrappedPod = KubernetesUtils.appendInitContainer(
- pod, mayBeSecretsMountedInitContainer)
-
- (bootstrappedPod, podWithInitContainer.mainContainer)
- }.getOrElse((maybeSecretsMountedPod, maybeSecretsMountedContainer))
- new PodBuilder(bootstrappedPod)
+ new PodBuilder(maybeSecretsMountedPod)
.editSpec()
- .addToContainers(bootstrappedContainer)
+ .addToContainers(maybeSecretsMountedContainer)
.endSpec()
.build()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index a942db6..ff5f680 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -21,7 +21,7 @@ import java.io.File
import io.fabric8.kubernetes.client.Config
import org.apache.spark.{SparkContext, SparkException}
-import org.apache.spark.deploy.k8s.{InitContainerBootstrap, KubernetesUtils, MountSecretsBootstrap, SparkKubernetesClientFactory}
+import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap, SparkKubernetesClientFactory}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
@@ -33,7 +33,9 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s")
override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
- if (masterURL.startsWith("k8s") && sc.deployMode == "client") {
+ if (masterURL.startsWith("k8s") &&
+ sc.deployMode == "client" &&
+ !sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK).getOrElse(false)) {
throw new SparkException("Client mode is currently not supported for Kubernetes.")
}
@@ -44,74 +46,23 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = {
- val sparkConf = sc.getConf
- val initContainerConfigMap = sparkConf.get(INIT_CONTAINER_CONFIG_MAP_NAME)
- val initContainerConfigMapKey = sparkConf.get(INIT_CONTAINER_CONFIG_MAP_KEY_CONF)
-
- if (initContainerConfigMap.isEmpty) {
- logWarning("The executor's init-container config map is not specified. Executors will " +
- "therefore not attempt to fetch remote or submitted dependencies.")
- }
-
- if (initContainerConfigMapKey.isEmpty) {
- logWarning("The executor's init-container config map key is not specified. Executors will " +
- "therefore not attempt to fetch remote or submitted dependencies.")
- }
-
- // Only set up the bootstrap if they've provided both the config map key and the config map
- // name. The config map might not be provided if init-containers aren't being used to
- // bootstrap dependencies.
- val initContainerBootstrap = for {
- configMap <- initContainerConfigMap
- configMapKey <- initContainerConfigMapKey
- } yield {
- val initContainerImage = sparkConf
- .get(INIT_CONTAINER_IMAGE)
- .getOrElse(throw new SparkException(
- "Must specify the init-container image when there are remote dependencies"))
- new InitContainerBootstrap(
- initContainerImage,
- sparkConf.get(CONTAINER_IMAGE_PULL_POLICY),
- sparkConf.get(JARS_DOWNLOAD_LOCATION),
- sparkConf.get(FILES_DOWNLOAD_LOCATION),
- configMap,
- configMapKey,
- SPARK_POD_EXECUTOR_ROLE,
- sparkConf)
- }
-
val executorSecretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
- sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
+ sc.conf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
val mountSecretBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) {
Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths))
} else {
None
}
- // Mount user-specified executor secrets also into the executor's init-container. The
- // init-container may need credentials in the secrets to be able to download remote
- // dependencies. The executor's main container and its init-container share the secrets
- // because the init-container is sort of an implementation details and this sharing
- // avoids introducing a dedicated configuration property just for the init-container.
- val initContainerMountSecretsBootstrap = if (initContainerBootstrap.nonEmpty &&
- executorSecretNamesToMountPaths.nonEmpty) {
- Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths))
- } else {
- None
- }
val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
KUBERNETES_MASTER_INTERNAL_URL,
- Some(sparkConf.get(KUBERNETES_NAMESPACE)),
+ Some(sc.conf.get(KUBERNETES_NAMESPACE)),
KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
- sparkConf,
+ sc.conf,
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
- val executorPodFactory = new ExecutorPodFactory(
- sparkConf,
- mountSecretBootstrap,
- initContainerBootstrap,
- initContainerMountSecretsBootstrap)
+ val executorPodFactory = new ExecutorPodFactory(sc.conf, mountSecretBootstrap)
val allocatorExecutor = ThreadUtils
.newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
http://git-wip-us.apache.org/repos/asf/spark/blob/f15906da/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerSuite.scala
deleted file mode 100644
index e0f29ec..0000000
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerSuite.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s
-
-import java.io.File
-import java.util.UUID
-
-import com.google.common.base.Charsets
-import com.google.common.io.Files
-import org.mockito.Mockito
-import org.scalatest.BeforeAndAfter
-import org.scalatest.mockito.MockitoSugar._
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.util.Utils
-
-class SparkPodInitContainerSuite extends SparkFunSuite with BeforeAndAfter {
-
- private val DOWNLOAD_JARS_SECRET_LOCATION = createTempFile("txt")
- private val DOWNLOAD_FILES_SECRET_LOCATION = createTempFile("txt")
-
- private var downloadJarsDir: File = _
- private var downloadFilesDir: File = _
- private var downloadJarsSecretValue: String = _
- private var downloadFilesSecretValue: String = _
- private var fileFetcher: FileFetcher = _
-
- override def beforeAll(): Unit = {
- downloadJarsSecretValue = Files.toString(
- new File(DOWNLOAD_JARS_SECRET_LOCATION), Charsets.UTF_8)
- downloadFilesSecretValue = Files.toString(
- new File(DOWNLOAD_FILES_SECRET_LOCATION), Charsets.UTF_8)
- }
-
- before {
- downloadJarsDir = Utils.createTempDir()
- downloadFilesDir = Utils.createTempDir()
- fileFetcher = mock[FileFetcher]
- }
-
- after {
- downloadJarsDir.delete()
- downloadFilesDir.delete()
- }
-
- test("Downloads from remote server should invoke the file fetcher") {
- val sparkConf = getSparkConfForRemoteFileDownloads
- val initContainerUnderTest = new SparkPodInitContainer(sparkConf, fileFetcher)
- initContainerUnderTest.run()
- Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/jar1.jar", downloadJarsDir)
- Mockito.verify(fileFetcher).fetchFile("hdfs://localhost:9000/jar2.jar", downloadJarsDir)
- Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/file.txt", downloadFilesDir)
- }
-
- private def getSparkConfForRemoteFileDownloads: SparkConf = {
- new SparkConf(true)
- .set(INIT_CONTAINER_REMOTE_JARS,
- "http://localhost:9000/jar1.jar,hdfs://localhost:9000/jar2.jar")
- .set(INIT_CONTAINER_REMOTE_FILES,
- "http://localhost:9000/file.txt")
- .set(JARS_DOWNLOAD_LOCATION, downloadJarsDir.getAbsolutePath)
- .set(FILES_DOWNLOAD_LOCATION, downloadFilesDir.getAbsolutePath)
- }
-
- private def createTempFile(extension: String): String = {
- val dir = Utils.createTempDir()
- val file = new File(dir, s"${UUID.randomUUID().toString}.$extension")
- Files.write(UUID.randomUUID().toString, file, Charsets.UTF_8)
- file.getAbsolutePath
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org