You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by ifilonenko <gi...@git.apache.org> on 2018/08/29 17:31:27 UTC
[GitHub] spark pull request #22256: [SPARK-25262][K8S] Better support configurability...
Github user ifilonenko commented on a diff in the pull request:
https://github.com/apache/spark/pull/22256#discussion_r213768267
--- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala ---
@@ -37,41 +40,99 @@ private[spark] class LocalDirsFeatureStep(
.orElse(conf.getOption("spark.local.dir"))
.getOrElse(defaultLocalDir)
.split(",")
+ private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS)
override def configurePod(pod: SparkPod): SparkPod = {
val localDirVolumes = resolvedLocalDirs
.zipWithIndex
.map { case (localDir, index) =>
- new VolumeBuilder()
- .withName(s"spark-local-dir-${index + 1}")
- .withNewEmptyDir()
- .endEmptyDir()
- .build()
+ val name = s"spark-local-dir-${index + 1}"
+ // To allow customisation of local dirs backing volumes we should avoid creating
+ // emptyDir volumes if the volume is already defined in the pod spec
+ hasVolume(pod, name) match {
+ case true =>
+ // For pre-existing volume definitions just re-use the volume
+ pod.pod.getSpec().getVolumes().asScala.find(v => v.getName.equals(name)).get
+ case false =>
+ // Create new emptyDir volume
+ new VolumeBuilder()
+ .withName(name)
+ .withNewEmptyDir()
+ .withMedium(useLocalDirTmpFs match {
+ case true => "Memory" // Use tmpfs
+ case false => null // Default - use nodes backing storage
+ })
+ .endEmptyDir()
+ .build()
+ }
}
+
val localDirVolumeMounts = localDirVolumes
.zip(resolvedLocalDirs)
.map { case (localDirVolume, localDirPath) =>
- new VolumeMountBuilder()
- .withName(localDirVolume.getName)
- .withMountPath(localDirPath)
- .build()
+ hasVolumeMount(pod, localDirVolume.getName, localDirPath) match {
+ case true =>
+ // For pre-existing volume mounts just re-use the mount
+ pod.container.getVolumeMounts().asScala
+ .find(m => m.getName.equals(localDirVolume.getName)
+ && m.getMountPath.equals(localDirPath))
+ .get
+ case false =>
+ // Create new volume mount
+ new VolumeMountBuilder()
+ .withName (localDirVolume.getName)
+ .withMountPath (localDirPath)
+ .build()
+ }
+ }
+
+ // Check for conflicting volume mounts
+ for (m: VolumeMount <- localDirVolumeMounts) {
+ if (hasConflictingVolumeMount(pod, m.getName, m.getMountPath).size > 0) {
+ throw new SparkException(s"Conflicting volume mounts defined, pod template attempted to " +
+ "mount SPARK_LOCAL_DIRS volume ${m.getName} multiple times or at an alternative path " +
+ "then the expected ${m.getPath}")
}
+ }
+
val podWithLocalDirVolumes = new PodBuilder(pod.pod)
.editSpec()
- .addToVolumes(localDirVolumes: _*)
+ // Don't want to re-add volumes that already existed in the incoming spec
+ // as duplicate definitions will lead to K8S API errors
+ .addToVolumes(localDirVolumes.filter(v => !hasVolume(pod, v.getName)): _*)
--- End diff --
Checking current volumes in a feature step isn't consistent with the additive design of the feature builder pattern. @mccheah to comment
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org