You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fe...@apache.org on 2018/07/11 05:53:50 UTC

spark git commit: [SPARK-23529][K8S] Support mounting volumes

Repository: spark
Updated Branches:
  refs/heads/master 74a8d6308 -> 5ff1b9ba1


[SPARK-23529][K8S] Support mounting volumes

This PR continues #21095 and intersects with #21238. I've added volume mounts as a separate step and added PersistantVolumeClaim support.

There is a fundamental problem with how we pass the options through spark conf to fabric8. For each volume type and all possible volume options we would have to implement some custom code to map config values to fabric8 calls. This will result in big body of code we would have to support and means that Spark will always be somehow out of sync with k8s.

I think there needs to be a discussion on how to proceed correctly (eg use PodPreset instead)

----

Due to the complications of provisioning and managing actual resources this PR addresses only volume mounting of already present resources.

----
- [x] emptyDir support
- [x] Testing
- [x] Documentation
- [x] KubernetesVolumeUtils tests

Author: Andrew Korzhuev <an...@klarna.com>
Author: madanadit <ad...@alluxio.com>

Closes #21260 from andrusha/k8s-vol.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ff1b9ba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ff1b9ba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ff1b9ba

Branch: refs/heads/master
Commit: 5ff1b9ba1983d5601add62aef64a3e87d07050eb
Parents: 74a8d63
Author: Andrew Korzhuev <an...@klarna.com>
Authored: Tue Jul 10 22:53:44 2018 -0700
Committer: Felix Cheung <fe...@apache.org>
Committed: Tue Jul 10 22:53:44 2018 -0700

----------------------------------------------------------------------
 docs/running-on-kubernetes.md                   |  48 +++++++
 .../org/apache/spark/deploy/k8s/Config.scala    |  12 ++
 .../spark/deploy/k8s/KubernetesConf.scala       |  11 ++
 .../spark/deploy/k8s/KubernetesUtils.scala      |   2 -
 .../spark/deploy/k8s/KubernetesVolumeSpec.scala |  38 +++++
 .../deploy/k8s/KubernetesVolumeUtils.scala      | 110 ++++++++++++++
 .../k8s/features/BasicDriverFeatureStep.scala   |   5 +-
 .../k8s/features/BasicExecutorFeatureStep.scala |   5 +-
 .../k8s/features/MountVolumesFeatureStep.scala  |  79 ++++++++++
 .../k8s/submit/KubernetesDriverBuilder.scala    |  31 ++--
 .../cluster/k8s/KubernetesExecutorBuilder.scala |  38 ++---
 .../deploy/k8s/KubernetesVolumeUtilsSuite.scala | 106 ++++++++++++++
 .../features/BasicDriverFeatureStepSuite.scala  |  23 +--
 .../BasicExecutorFeatureStepSuite.scala         |   3 +
 ...rKubernetesCredentialsFeatureStepSuite.scala |   3 +
 .../DriverServiceFeatureStepSuite.scala         |   6 +
 .../features/EnvSecretsFeatureStepSuite.scala   |   1 +
 .../features/LocalDirsFeatureStepSuite.scala    |   3 +-
 .../features/MountSecretsFeatureStepSuite.scala |   1 +
 .../features/MountVolumesFeatureStepSuite.scala | 144 +++++++++++++++++++
 .../bindings/JavaDriverFeatureStepSuite.scala   |   1 +
 .../bindings/PythonDriverFeatureStepSuite.scala |   2 +
 .../spark/deploy/k8s/submit/ClientSuite.scala   |   1 +
 .../submit/KubernetesDriverBuilderSuite.scala   |  45 +++++-
 .../k8s/KubernetesExecutorBuilderSuite.scala    |  38 ++++-
 25 files changed, 705 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/docs/running-on-kubernetes.md
----------------------------------------------------------------------
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 408e446..7149616 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -629,6 +629,54 @@ specific to Spark on Kubernetes.
    Add as an environment variable to the executor container with name EnvName (case sensitive), the value referenced by key <code> key </code> in the data of the referenced <a href="https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-environment-variables">Kubernetes Secret</a>. For example,
    <code>spark.kubernetes.executor.secrets.ENV_VAR=spark-secret:key</code>.
   </td>
+</tr>   
+<tr>
+  <td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.path</code></td>
+  <td>(none)</td>
+  <td>
+   Add the <a href="https://kubernetes.io/docs/concepts/storage/volumes/">Kubernetes Volume</a> named <code>VolumeName</code> of the <code>VolumeType</code> type to the driver pod on the path specified in the value. For example,
+   <code>spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/checkpoint</code>.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.readOnly</code></td>
+  <td>(none)</td>
+  <td>
+   Specify if the mounted volume is read only or not. For example,
+   <code>spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.readOnly=false</code>.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].options.[OptionName]</code></td>
+  <td>(none)</td>
+  <td>
+   Configure <a href="https://kubernetes.io/docs/concepts/storage/volumes/">Kubernetes Volume</a> options passed to the Kubernetes with <code>OptionName</code> as key having specified value, must conform with Kubernetes option format. For example,
+   <code>spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-pvc-claim</code>.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.path</code></td>
+  <td>(none)</td>
+  <td>
+   Add the <a href="https://kubernetes.io/docs/concepts/storage/volumes/">Kubernetes Volume</a> named <code>VolumeName</code> of the <code>VolumeType</code> type to the executor pod on the path specified in the value. For example,
+   <code>spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/checkpoint</code>.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.readOnly</code></td>
+  <td>false</td>
+  <td>
+   Specify if the mounted volume is read only or not. For example,
+   <code>spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.readOnly=false</code>.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].options.[OptionName]</code></td>
+  <td>(none)</td>
+  <td>
+   Configure <a href="https://kubernetes.io/docs/concepts/storage/volumes/">Kubernetes Volume</a> options passed to the Kubernetes with <code>OptionName</code> as key having specified value. For example,
+   <code>spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-pvc-claim</code>.
+  </td>
 </tr>
 <tr>
   <td><code>spark.kubernetes.memoryOverheadFactor</code></td>

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index bf33179..f9a77e7 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -220,11 +220,23 @@ private[spark] object Config extends Logging {
   val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
   val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
   val KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX = "spark.kubernetes.driver.secretKeyRef."
+  val KUBERNETES_DRIVER_VOLUMES_PREFIX = "spark.kubernetes.driver.volumes."
 
   val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
   val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
   val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."
   val KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX = "spark.kubernetes.executor.secretKeyRef."
+  val KUBERNETES_EXECUTOR_VOLUMES_PREFIX = "spark.kubernetes.executor.volumes."
+
+  val KUBERNETES_VOLUMES_HOSTPATH_TYPE = "hostPath"
+  val KUBERNETES_VOLUMES_PVC_TYPE = "persistentVolumeClaim"
+  val KUBERNETES_VOLUMES_EMPTYDIR_TYPE = "emptyDir"
+  val KUBERNETES_VOLUMES_MOUNT_PATH_KEY = "mount.path"
+  val KUBERNETES_VOLUMES_MOUNT_READONLY_KEY = "mount.readOnly"
+  val KUBERNETES_VOLUMES_OPTIONS_PATH_KEY = "options.path"
+  val KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY = "options.claimName"
+  val KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY = "options.medium"
+  val KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY = "options.sizeLimit"
 
   val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv."
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index b0ccaa3..51d205f 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -59,6 +59,7 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
     roleSecretNamesToMountPaths: Map[String, String],
     roleSecretEnvNamesToKeyRefs: Map[String, String],
     roleEnvs: Map[String, String],
+    roleVolumes: Iterable[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]],
     sparkFiles: Seq[String]) {
 
   def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
@@ -155,6 +156,12 @@ private[spark] object KubernetesConf {
       sparkConf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX)
     val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs(
       sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)
+    val driverVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
+      sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX).map(_.get)
+    // Also parse executor volumes in order to verify configuration
+    // before the driver pod is created
+    KubernetesVolumeUtils.parseVolumesWithPrefix(
+      sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get)
 
     val sparkFiles = sparkConf
       .getOption("spark.files")
@@ -171,6 +178,7 @@ private[spark] object KubernetesConf {
       driverSecretNamesToMountPaths,
       driverSecretEnvNamesToKeyRefs,
       driverEnvs,
+      driverVolumes,
       sparkFiles)
   }
 
@@ -203,6 +211,8 @@ private[spark] object KubernetesConf {
     val executorEnvSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
       sparkConf, KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX)
     val executorEnv = sparkConf.getExecutorEnv.toMap
+    val executorVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
+      sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get)
 
     KubernetesConf(
       sparkConf.clone(),
@@ -214,6 +224,7 @@ private[spark] object KubernetesConf {
       executorMountSecrets,
       executorEnvSecrets,
       executorEnv,
+      executorVolumes,
       Seq.empty[String])
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index 593fb53..66fff26 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -16,8 +16,6 @@
  */
 package org.apache.spark.deploy.k8s
 
-import io.fabric8.kubernetes.api.model.LocalObjectReference
-
 import org.apache.spark.SparkConf
 import org.apache.spark.util.Utils
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
new file mode 100644
index 0000000..b1762d1
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+private[spark] sealed trait KubernetesVolumeSpecificConf
+
+private[spark] case class KubernetesHostPathVolumeConf(
+    hostPath: String)
+  extends KubernetesVolumeSpecificConf
+
+private[spark] case class KubernetesPVCVolumeConf(
+    claimName: String)
+  extends KubernetesVolumeSpecificConf
+
+private[spark] case class KubernetesEmptyDirVolumeConf(
+    medium: Option[String],
+    sizeLimit: Option[String])
+  extends KubernetesVolumeSpecificConf
+
+private[spark] case class KubernetesVolumeSpec[T <: KubernetesVolumeSpecificConf](
+    volumeName: String,
+    mountPath: String,
+    mountReadOnly: Boolean,
+    volumeConf: T)

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
new file mode 100644
index 0000000..713df5f
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.NoSuchElementException
+
+import scala.util.{Failure, Success, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+
+private[spark] object KubernetesVolumeUtils {
+  /**
+   * Extract Spark volume configuration properties with a given name prefix.
+   *
+   * @param sparkConf Spark configuration
+   * @param prefix the given property name prefix
+   * @return a Map storing with volume name as key and spec as value
+   */
+  def parseVolumesWithPrefix(
+    sparkConf: SparkConf,
+    prefix: String): Iterable[Try[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]]] = {
+    val properties = sparkConf.getAllWithPrefix(prefix).toMap
+
+    getVolumeTypesAndNames(properties).map { case (volumeType, volumeName) =>
+      val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_PATH_KEY"
+      val readOnlyKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_READONLY_KEY"
+
+      for {
+        path <- properties.getTry(pathKey)
+        volumeConf <- parseVolumeSpecificConf(properties, volumeType, volumeName)
+      } yield KubernetesVolumeSpec(
+        volumeName = volumeName,
+        mountPath = path,
+        mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean),
+        volumeConf = volumeConf
+      )
+    }
+  }
+
+  /**
+   * Get unique pairs of volumeType and volumeName,
+   * assuming options are formatted in this way:
+   * `volumeType`.`volumeName`.`property` = `value`
+   * @param properties flat mapping of property names to values
+   * @return Set[(volumeType, volumeName)]
+   */
+  private def getVolumeTypesAndNames(
+    properties: Map[String, String]
+  ): Set[(String, String)] = {
+    properties.keys.flatMap { k =>
+      k.split('.').toList match {
+        case tpe :: name :: _ => Some((tpe, name))
+        case _ => None
+      }
+    }.toSet
+  }
+
+  private def parseVolumeSpecificConf(
+    options: Map[String, String],
+    volumeType: String,
+    volumeName: String): Try[KubernetesVolumeSpecificConf] = {
+    volumeType match {
+      case KUBERNETES_VOLUMES_HOSTPATH_TYPE =>
+        val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY"
+        for {
+          path <- options.getTry(pathKey)
+        } yield KubernetesHostPathVolumeConf(path)
+
+      case KUBERNETES_VOLUMES_PVC_TYPE =>
+        val claimNameKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY"
+        for {
+          claimName <- options.getTry(claimNameKey)
+        } yield KubernetesPVCVolumeConf(claimName)
+
+      case KUBERNETES_VOLUMES_EMPTYDIR_TYPE =>
+        val mediumKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY"
+        val sizeLimitKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY"
+        Success(KubernetesEmptyDirVolumeConf(options.get(mediumKey), options.get(sizeLimitKey)))
+
+      case _ =>
+        Failure(new RuntimeException(s"Kubernetes Volume type `$volumeType` is not supported"))
+    }
+  }
+
+  /**
+   * Convenience wrapper to accumulate key lookup errors
+   */
+  implicit private class MapOps[A, B](m: Map[A, B]) {
+    def getTry(key: A): Try[B] = {
+      m
+        .get(key)
+        .fold[Try[B]](Failure(new NoSuchElementException(key.toString)))(Success(_))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index 143dc8a..7e67b51 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -19,10 +19,10 @@ package org.apache.spark.deploy.k8s.features
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder}
+import io.fabric8.kubernetes.api.model._
 
 import org.apache.spark.SparkException
-import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
+import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.submit._
@@ -103,6 +103,7 @@ private[spark] class BasicDriverFeatureStep(
         .addToImagePullSecrets(conf.imagePullSecrets(): _*)
         .endSpec()
       .build()
+
     SparkPod(driverPod, driverContainer)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index 91c54a9..abaeff0 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -18,10 +18,10 @@ package org.apache.spark.deploy.k8s.features
 
 import scala.collection.JavaConverters._
 
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder}
+import io.fabric8.kubernetes.api.model._
 
 import org.apache.spark.SparkException
-import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
@@ -173,6 +173,7 @@ private[spark] class BasicExecutorFeatureStep(
         .addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*)
         .endSpec()
       .build()
+
     SparkPod(executorPod, containerWithLimitCores)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
new file mode 100644
index 0000000..bb0e2b3
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model._
+
+import org.apache.spark.deploy.k8s._
+
+private[spark] class MountVolumesFeatureStep(
+    kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf])
+  extends KubernetesFeatureConfigStep {
+
+  override def configurePod(pod: SparkPod): SparkPod = {
+    val (volumeMounts, volumes) = constructVolumes(kubernetesConf.roleVolumes).unzip
+
+    val podWithVolumes = new PodBuilder(pod.pod)
+      .editSpec()
+      .addToVolumes(volumes.toSeq: _*)
+      .endSpec()
+      .build()
+
+    val containerWithVolumeMounts = new ContainerBuilder(pod.container)
+      .addToVolumeMounts(volumeMounts.toSeq: _*)
+      .build()
+
+    SparkPod(podWithVolumes, containerWithVolumeMounts)
+  }
+
+  override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
+
+  private def constructVolumes(
+    volumeSpecs: Iterable[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]]
+  ): Iterable[(VolumeMount, Volume)] = {
+    volumeSpecs.map { spec =>
+      val volumeMount = new VolumeMountBuilder()
+        .withMountPath(spec.mountPath)
+        .withReadOnly(spec.mountReadOnly)
+        .withName(spec.volumeName)
+        .build()
+
+      val volumeBuilder = spec.volumeConf match {
+        case KubernetesHostPathVolumeConf(hostPath) =>
+          new VolumeBuilder()
+            .withHostPath(new HostPathVolumeSource(hostPath))
+
+        case KubernetesPVCVolumeConf(claimName) =>
+          new VolumeBuilder()
+            .withPersistentVolumeClaim(
+              new PersistentVolumeClaimVolumeSource(claimName, spec.mountReadOnly))
+
+        case KubernetesEmptyDirVolumeConf(medium, sizeLimit) =>
+          new VolumeBuilder()
+            .withEmptyDir(
+              new EmptyDirVolumeSource(medium.getOrElse(""),
+              new Quantity(sizeLimit.orNull)))
+      }
+
+      val volume = volumeBuilder.withName(spec.volumeName).build()
+
+      (volumeMount, volume)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
index 0dd1c37..7208e3d 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.deploy.k8s.submit
 
 import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf}
-import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, KubernetesFeatureConfigStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
+import org.apache.spark.deploy.k8s.features._
 import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep}
 
 private[spark] class KubernetesDriverBuilder(
@@ -33,10 +33,13 @@ private[spark] class KubernetesDriverBuilder(
       new MountSecretsFeatureStep(_),
     provideEnvSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
       => EnvSecretsFeatureStep) =
-    new EnvSecretsFeatureStep(_),
-    provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
-      => LocalDirsFeatureStep) =
+      new EnvSecretsFeatureStep(_),
+    provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
+      => LocalDirsFeatureStep =
       new LocalDirsFeatureStep(_),
+    provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
+      => MountVolumesFeatureStep) =
+      new MountVolumesFeatureStep(_),
     provideJavaStep: (
       KubernetesConf[KubernetesDriverSpecificConf]
         => JavaDriverFeatureStep) =
@@ -54,11 +57,15 @@ private[spark] class KubernetesDriverBuilder(
       provideServiceStep(kubernetesConf),
       provideLocalDirsStep(kubernetesConf))
 
-    val maybeRoleSecretNamesStep = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
-      Some(provideSecretsStep(kubernetesConf)) } else None
-
-    val maybeProvideSecretsStep = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
-      Some(provideEnvSecretsStep(kubernetesConf)) } else None
+    val secretFeature = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
+      Seq(provideSecretsStep(kubernetesConf))
+    } else Nil
+    val envSecretFeature = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
+      Seq(provideEnvSecretsStep(kubernetesConf))
+    } else Nil
+    val volumesFeature = if (kubernetesConf.roleVolumes.nonEmpty) {
+      Seq(provideVolumesStep(kubernetesConf))
+    } else Nil
 
     val bindingsStep = kubernetesConf.roleSpecificConf.mainAppResource.map {
         case JavaMainAppResource(_) =>
@@ -67,10 +74,8 @@ private[spark] class KubernetesDriverBuilder(
           providePythonStep(kubernetesConf)}
       .getOrElse(provideJavaStep(kubernetesConf))
 
-    val allFeatures: Seq[KubernetesFeatureConfigStep] =
-      (baseFeatures :+ bindingsStep) ++
-        maybeRoleSecretNamesStep.toSeq ++
-        maybeProvideSecretsStep.toSeq
+    val allFeatures = (baseFeatures :+ bindingsStep) ++
+      secretFeature ++ envSecretFeature ++ volumesFeature
 
     var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap)
     for (feature <- allFeatures) {

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
index 769a0a5..364b6fb 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -17,37 +17,41 @@
 package org.apache.spark.scheduler.cluster.k8s
 
 import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
-import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, EnvSecretsFeatureStep, KubernetesFeatureConfigStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
+import org.apache.spark.deploy.k8s.features._
+import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
 
 private[spark] class KubernetesExecutorBuilder(
-    provideBasicStep: (KubernetesConf[KubernetesExecutorSpecificConf]) => BasicExecutorFeatureStep =
+    provideBasicStep: (KubernetesConf [KubernetesExecutorSpecificConf])
+      => BasicExecutorFeatureStep =
       new BasicExecutorFeatureStep(_),
-    provideSecretsStep:
-      (KubernetesConf[_ <: KubernetesRoleSpecificConf]) => MountSecretsFeatureStep =
+    provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
+      => MountSecretsFeatureStep =
       new MountSecretsFeatureStep(_),
     provideEnvSecretsStep:
       (KubernetesConf[_ <: KubernetesRoleSpecificConf] => EnvSecretsFeatureStep) =
       new EnvSecretsFeatureStep(_),
     provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
       => LocalDirsFeatureStep =
-      new LocalDirsFeatureStep(_)) {
+      new LocalDirsFeatureStep(_),
+    provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
+      => MountVolumesFeatureStep) =
+      new MountVolumesFeatureStep(_)) {
 
   def buildFromFeatures(
     kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = {
-    val baseFeatures = Seq(
-      provideBasicStep(kubernetesConf),
-      provideLocalDirsStep(kubernetesConf))
 
-    val maybeRoleSecretNamesStep = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
-      Some(provideSecretsStep(kubernetesConf)) } else None
+    val baseFeatures = Seq(provideBasicStep(kubernetesConf), provideLocalDirsStep(kubernetesConf))
+    val secretFeature = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
+      Seq(provideSecretsStep(kubernetesConf))
+    } else Nil
+    val secretEnvFeature = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
+      Seq(provideEnvSecretsStep(kubernetesConf))
+    } else Nil
+    val volumesFeature = if (kubernetesConf.roleVolumes.nonEmpty) {
+      Seq(provideVolumesStep(kubernetesConf))
+    } else Nil
 
-    val maybeProvideSecretsStep = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
-      Some(provideEnvSecretsStep(kubernetesConf)) } else None
-
-    val allFeatures: Seq[KubernetesFeatureConfigStep] =
-      baseFeatures ++
-      maybeRoleSecretNamesStep.toSeq ++
-      maybeProvideSecretsStep.toSeq
+    val allFeatures = baseFeatures ++ secretFeature ++ secretEnvFeature ++ volumesFeature
 
     var executorPod = SparkPod.initialPod()
     for (feature <- allFeatures) {

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
new file mode 100644
index 0000000..d795d15
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+
+class KubernetesVolumeUtilsSuite extends SparkFunSuite {
+  test("Parses hostPath volumes correctly") {
+    val sparkConf = new SparkConf(false)
+    sparkConf.set("test.hostPath.volumeName.mount.path", "/path")
+    sparkConf.set("test.hostPath.volumeName.mount.readOnly", "true")
+    sparkConf.set("test.hostPath.volumeName.options.path", "/hostPath")
+
+    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get
+    assert(volumeSpec.volumeName === "volumeName")
+    assert(volumeSpec.mountPath === "/path")
+    assert(volumeSpec.mountReadOnly === true)
+    assert(volumeSpec.volumeConf.asInstanceOf[KubernetesHostPathVolumeConf] ===
+      KubernetesHostPathVolumeConf("/hostPath"))
+  }
+
+  test("Parses persistentVolumeClaim volumes correctly") {
+    val sparkConf = new SparkConf(false)
+    sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")
+    sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true")
+    sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", "claimeName")
+
+    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get
+    assert(volumeSpec.volumeName === "volumeName")
+    assert(volumeSpec.mountPath === "/path")
+    assert(volumeSpec.mountReadOnly === true)
+    assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
+      KubernetesPVCVolumeConf("claimeName"))
+  }
+
+  test("Parses emptyDir volumes correctly") {
+    val sparkConf = new SparkConf(false)
+    sparkConf.set("test.emptyDir.volumeName.mount.path", "/path")
+    sparkConf.set("test.emptyDir.volumeName.mount.readOnly", "true")
+    sparkConf.set("test.emptyDir.volumeName.options.medium", "medium")
+    sparkConf.set("test.emptyDir.volumeName.options.sizeLimit", "5G")
+
+    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get
+    assert(volumeSpec.volumeName === "volumeName")
+    assert(volumeSpec.mountPath === "/path")
+    assert(volumeSpec.mountReadOnly === true)
+    assert(volumeSpec.volumeConf.asInstanceOf[KubernetesEmptyDirVolumeConf] ===
+      KubernetesEmptyDirVolumeConf(Some("medium"), Some("5G")))
+  }
+
+  test("Parses emptyDir volume options can be optional") {
+    val sparkConf = new SparkConf(false)
+    sparkConf.set("test.emptyDir.volumeName.mount.path", "/path")
+    sparkConf.set("test.emptyDir.volumeName.mount.readOnly", "true")
+
+    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get
+    assert(volumeSpec.volumeName === "volumeName")
+    assert(volumeSpec.mountPath === "/path")
+    assert(volumeSpec.mountReadOnly === true)
+    assert(volumeSpec.volumeConf.asInstanceOf[KubernetesEmptyDirVolumeConf] ===
+      KubernetesEmptyDirVolumeConf(None, None))
+  }
+
+  test("Defaults optional readOnly to false") {
+    val sparkConf = new SparkConf(false)
+    sparkConf.set("test.hostPath.volumeName.mount.path", "/path")
+    sparkConf.set("test.hostPath.volumeName.options.path", "/hostPath")
+
+    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get
+    assert(volumeSpec.mountReadOnly === false)
+  }
+
+  test("Gracefully fails on missing mount key") {
+    val sparkConf = new SparkConf(false)
+    sparkConf.set("test.emptyDir.volumeName.mnt.path", "/path")
+
+    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
+    assert(volumeSpec.isFailure === true)
+    assert(volumeSpec.failed.get.getMessage === "emptyDir.volumeName.mount.path")
+  }
+
+  test("Gracefully fails on missing option key") {
+    val sparkConf = new SparkConf(false)
+    sparkConf.set("test.hostPath.volumeName.mount.path", "/path")
+    sparkConf.set("test.hostPath.volumeName.mount.readOnly", "true")
+    sparkConf.set("test.hostPath.volumeName.options.pth", "/hostPath")
+
+    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
+    assert(volumeSpec.isFailure === true)
+    assert(volumeSpec.failed.get.getMessage === "hostPath.volumeName.options.path")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index 04b909d..165f46a 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -50,6 +50,12 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     TEST_IMAGE_PULL_SECRETS.map { secret =>
       new LocalObjectReferenceBuilder().withName(secret).build()
     }
+  private val emptyDriverSpecificConf = KubernetesDriverSpecificConf(
+    None,
+    APP_NAME,
+    MAIN_CLASS,
+    APP_ARGS)
+
 
   test("Check the pod respects all configurations from the user.") {
     val sparkConf = new SparkConf()
@@ -62,11 +68,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS.mkString(","))
     val kubernetesConf = KubernetesConf(
       sparkConf,
-      KubernetesDriverSpecificConf(
-        Some(JavaMainAppResource("")),
-        APP_NAME,
-        MAIN_CLASS,
-        APP_ARGS),
+      emptyDriverSpecificConf,
       RESOURCE_NAME_PREFIX,
       APP_ID,
       DRIVER_LABELS,
@@ -74,6 +76,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       Map.empty,
       Map.empty,
       DRIVER_ENVS,
+      Nil,
       Seq.empty[String])
 
     val featureStep = new BasicDriverFeatureStep(kubernetesConf)
@@ -143,6 +146,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       Map.empty,
       Map.empty,
       DRIVER_ENVS,
+      Nil,
       Seq.empty[String])
     val pythonKubernetesConf = KubernetesConf(
       pythonSparkConf,
@@ -158,6 +162,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       Map.empty,
       Map.empty,
       DRIVER_ENVS,
+      Nil,
       Seq.empty[String])
     val javaFeatureStep = new BasicDriverFeatureStep(javaKubernetesConf)
     val pythonFeatureStep = new BasicDriverFeatureStep(pythonKubernetesConf)
@@ -176,11 +181,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       .set(CONTAINER_IMAGE, "spark-driver:latest")
     val kubernetesConf = KubernetesConf(
       sparkConf,
-      KubernetesDriverSpecificConf(
-        Some(JavaMainAppResource("")),
-        APP_NAME,
-        MAIN_CLASS,
-        APP_ARGS),
+      emptyDriverSpecificConf,
       RESOURCE_NAME_PREFIX,
       APP_ID,
       DRIVER_LABELS,
@@ -188,7 +189,9 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       Map.empty,
       Map.empty,
       DRIVER_ENVS,
+      Nil,
       allFiles)
+
     val step = new BasicDriverFeatureStep(kubernetesConf)
     val additionalProperties = step.getAdditionalPodSystemProperties()
     val expectedSparkConf = Map(

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index f06030a..a44fa1f 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -89,6 +89,7 @@ class BasicExecutorFeatureStepSuite
         Map.empty,
         Map.empty,
         Map.empty,
+        Nil,
         Seq.empty[String]))
     val executor = step.configurePod(SparkPod.initialPod())
 
@@ -128,6 +129,7 @@ class BasicExecutorFeatureStepSuite
         Map.empty,
         Map.empty,
         Map.empty,
+        Nil,
         Seq.empty[String]))
     assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63)
   }
@@ -148,6 +150,7 @@ class BasicExecutorFeatureStepSuite
         Map.empty,
         Map.empty,
         Map("qux" -> "quux"),
+        Nil,
         Seq.empty[String]))
     val executor = step.configurePod(SparkPod.initialPod())
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
index 7cea835..7e916b3 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
@@ -61,6 +61,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
       Map.empty,
       Map.empty,
       Map.empty,
+      Nil,
       Seq.empty[String])
     val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
     assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD)
@@ -92,6 +93,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
       Map.empty,
       Map.empty,
       Map.empty,
+      Nil,
       Seq.empty[String])
 
     val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
@@ -130,6 +132,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
       Map.empty,
       Map.empty,
       Map.empty,
+      Nil,
       Seq.empty[String])
     val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
     val resolvedProperties = kubernetesCredentialsStep.getAdditionalPodSystemProperties()

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
index 77d38bf..8b91e93 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
@@ -67,6 +67,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
         Map.empty,
         Map.empty,
         Map.empty,
+        Nil,
         Seq.empty[String]))
     assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod())
     assert(configurationStep.getAdditionalKubernetesResources().size === 1)
@@ -98,6 +99,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
         Map.empty,
         Map.empty,
         Map.empty,
+        Nil,
         Seq.empty[String]))
     val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX +
       DriverServiceFeatureStep.DRIVER_SVC_POSTFIX
@@ -119,6 +121,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
         Map.empty,
         Map.empty,
         Map.empty,
+        Nil,
         Seq.empty[String]))
     val resolvedService = configurationStep
       .getAdditionalKubernetesResources()
@@ -149,6 +152,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
         Map.empty,
         Map.empty,
         Map.empty,
+        Nil,
         Seq.empty[String]),
       clock)
     val driverService = configurationStep
@@ -176,6 +180,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
           Map.empty,
           Map.empty,
           Map.empty,
+          Nil,
           Seq.empty[String]),
         clock)
       fail("The driver bind address should not be allowed.")
@@ -201,6 +206,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
           Map.empty,
           Map.empty,
           Map.empty,
+          Nil,
           Seq.empty[String]),
         clock)
       fail("The driver host address should not be allowed.")

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
index af6b35e..1c8d84b 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
@@ -45,6 +45,7 @@ class EnvSecretsFeatureStepSuite extends SparkFunSuite{
       Map.empty,
       envVarsToKeys,
       Map.empty,
+      Nil,
       Seq.empty[String])
 
     val step = new EnvSecretsFeatureStep(kubernetesConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
index bd6ce4b..a339827 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
@@ -21,7 +21,7 @@ import org.mockito.Mockito
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod}
 
 class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
   private val defaultLocalDir = "/var/data/default-local-dir"
@@ -45,6 +45,7 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
       Map.empty,
       Map.empty,
       Map.empty,
+      Nil,
       Seq.empty[String])
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
index eff75b8..2b49b72 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
@@ -43,6 +43,7 @@ class MountSecretsFeatureStepSuite extends SparkFunSuite {
       secretNamesToMountPaths,
       Map.empty,
       Map.empty,
+      Nil,
       Seq.empty[String])
 
     val step = new MountSecretsFeatureStep(kubernetesConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
new file mode 100644
index 0000000..d309aa9
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s._
+
+class MountVolumesFeatureStepSuite extends SparkFunSuite {
+  private val sparkConf = new SparkConf(false)
+  private val emptyKubernetesConf = KubernetesConf(
+    sparkConf = sparkConf,
+    roleSpecificConf = KubernetesDriverSpecificConf(
+      None,
+      "app-name",
+      "main",
+      Seq.empty),
+    appResourceNamePrefix = "resource",
+    appId = "app-id",
+    roleLabels = Map.empty,
+    roleAnnotations = Map.empty,
+    roleSecretNamesToMountPaths = Map.empty,
+    roleSecretEnvNamesToKeyRefs = Map.empty,
+    roleEnvs = Map.empty,
+    roleVolumes = Nil,
+    sparkFiles = Nil)
+
+  test("Mounts hostPath volumes") {
+    val volumeConf = KubernetesVolumeSpec(
+      "testVolume",
+      "/tmp",
+      false,
+      KubernetesHostPathVolumeConf("/hostPath/tmp")
+    )
+    val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil)
+    val step = new MountVolumesFeatureStep(kubernetesConf)
+    val configuredPod = step.configurePod(SparkPod.initialPod())
+
+    assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
+    assert(configuredPod.pod.getSpec.getVolumes.get(0).getHostPath.getPath === "/hostPath/tmp")
+    assert(configuredPod.container.getVolumeMounts.size() === 1)
+    assert(configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp")
+    assert(configuredPod.container.getVolumeMounts.get(0).getName === "testVolume")
+    assert(configuredPod.container.getVolumeMounts.get(0).getReadOnly === false)
+  }
+
+  test("Mounts pesistentVolumeClaims") {
+    val volumeConf = KubernetesVolumeSpec(
+      "testVolume",
+      "/tmp",
+      true,
+      KubernetesPVCVolumeConf("pvcClaim")
+    )
+    val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil)
+    val step = new MountVolumesFeatureStep(kubernetesConf)
+    val configuredPod = step.configurePod(SparkPod.initialPod())
+
+    assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
+    val pvcClaim = configuredPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
+    assert(pvcClaim.getClaimName === "pvcClaim")
+    assert(configuredPod.container.getVolumeMounts.size() === 1)
+    assert(configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp")
+    assert(configuredPod.container.getVolumeMounts.get(0).getName === "testVolume")
+    assert(configuredPod.container.getVolumeMounts.get(0).getReadOnly === true)
+
+  }
+
+  test("Mounts emptyDir") {
+    val volumeConf = KubernetesVolumeSpec(
+      "testVolume",
+      "/tmp",
+      false,
+      KubernetesEmptyDirVolumeConf(Some("Memory"), Some("6G"))
+    )
+    val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil)
+    val step = new MountVolumesFeatureStep(kubernetesConf)
+    val configuredPod = step.configurePod(SparkPod.initialPod())
+
+    assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
+    val emptyDir = configuredPod.pod.getSpec.getVolumes.get(0).getEmptyDir
+    assert(emptyDir.getMedium === "Memory")
+    assert(emptyDir.getSizeLimit.getAmount === "6G")
+    assert(configuredPod.container.getVolumeMounts.size() === 1)
+    assert(configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp")
+    assert(configuredPod.container.getVolumeMounts.get(0).getName === "testVolume")
+    assert(configuredPod.container.getVolumeMounts.get(0).getReadOnly === false)
+  }
+
+  test("Mounts emptyDir with no options") {
+    val volumeConf = KubernetesVolumeSpec(
+      "testVolume",
+      "/tmp",
+      false,
+      KubernetesEmptyDirVolumeConf(None, None)
+    )
+    val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil)
+    val step = new MountVolumesFeatureStep(kubernetesConf)
+    val configuredPod = step.configurePod(SparkPod.initialPod())
+
+    assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
+    val emptyDir = configuredPod.pod.getSpec.getVolumes.get(0).getEmptyDir
+    assert(emptyDir.getMedium === "")
+    assert(emptyDir.getSizeLimit.getAmount === null)
+    assert(configuredPod.container.getVolumeMounts.size() === 1)
+    assert(configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp")
+    assert(configuredPod.container.getVolumeMounts.get(0).getName === "testVolume")
+    assert(configuredPod.container.getVolumeMounts.get(0).getReadOnly === false)
+  }
+
+  test("Mounts multiple volumes") {
+    val hpVolumeConf = KubernetesVolumeSpec(
+      "hpVolume",
+      "/tmp",
+      false,
+      KubernetesHostPathVolumeConf("/hostPath/tmp")
+    )
+    val pvcVolumeConf = KubernetesVolumeSpec(
+      "checkpointVolume",
+      "/checkpoints",
+      true,
+      KubernetesPVCVolumeConf("pvcClaim")
+    )
+    val volumesConf = hpVolumeConf :: pvcVolumeConf :: Nil
+    val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumesConf)
+    val step = new MountVolumesFeatureStep(kubernetesConf)
+    val configuredPod = step.configurePod(SparkPod.initialPod())
+
+    assert(configuredPod.pod.getSpec.getVolumes.size() === 2)
+    assert(configuredPod.container.getVolumeMounts.size() === 2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
index 0f2bf2f..18874af 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
@@ -42,6 +42,7 @@ class JavaDriverFeatureStepSuite extends SparkFunSuite {
       roleSecretNamesToMountPaths = Map.empty,
       roleSecretEnvNamesToKeyRefs = Map.empty,
       roleEnvs = Map.empty,
+      roleVolumes = Nil,
       sparkFiles = Seq.empty[String])
 
     val step = new JavaDriverFeatureStep(kubernetesConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala
index a1f9a5d..a5dac68 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala
@@ -52,6 +52,7 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite {
       roleSecretNamesToMountPaths = Map.empty,
       roleSecretEnvNamesToKeyRefs = Map.empty,
       roleEnvs = Map.empty,
+      roleVolumes = Nil,
       sparkFiles = Seq.empty[String])
 
     val step = new PythonDriverFeatureStep(kubernetesConf)
@@ -88,6 +89,7 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite {
       roleSecretNamesToMountPaths = Map.empty,
       roleSecretEnvNamesToKeyRefs = Map.empty,
       roleEnvs = Map.empty,
+      roleVolumes = Nil,
       sparkFiles = Seq.empty[String])
     val step = new PythonDriverFeatureStep(kubernetesConf)
     val driverContainerwithPySpark = step.configurePod(baseDriverPod).container

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index d045d9a..4d8e791 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -141,6 +141,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
       Map.empty,
       Map.empty,
       Map.empty,
+      Nil,
       Seq.empty[String])
     when(driverBuilder.buildFromFeatures(kubernetesConf)).thenReturn(BUILT_KUBERNETES_SPEC)
     when(kubernetesClient.pods()).thenReturn(podOperations)

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
index 4e8c300..046e578 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
@@ -17,7 +17,8 @@
 package org.apache.spark.deploy.k8s.submit
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf}
+import org.apache.spark.deploy.k8s._
+import org.apache.spark.deploy.k8s.features._
 import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountSecretsFeatureStep}
 import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep}
 
@@ -31,6 +32,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
   private val JAVA_STEP_TYPE = "java-bindings"
   private val PYSPARK_STEP_TYPE = "pyspark-bindings"
   private val ENV_SECRETS_STEP_TYPE = "env-secrets"
+  private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes"
 
   private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
     BASIC_STEP_TYPE, classOf[BasicDriverFeatureStep])
@@ -56,6 +58,9 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
   private val envSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
     ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep])
 
+  private val mountVolumesStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+    MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep])
+
   private val builderUnderTest: KubernetesDriverBuilder =
     new KubernetesDriverBuilder(
       _ => basicFeatureStep,
@@ -64,6 +69,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
       _ => secretsStep,
       _ => envSecretsStep,
       _ => localDirsStep,
+      _ => mountVolumesStep,
       _ => javaStep,
       _ => pythonStep)
 
@@ -82,6 +88,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
       Map.empty,
       Map.empty,
       Map.empty,
+      Nil,
       Seq.empty[String])
     validateStepTypesApplied(
       builderUnderTest.buildFromFeatures(conf),
@@ -107,6 +114,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
       Map("secret" -> "secretMountPath"),
       Map("EnvName" -> "SecretName:secretKey"),
       Map.empty,
+      Nil,
       Seq.empty[String])
     validateStepTypesApplied(
       builderUnderTest.buildFromFeatures(conf),
@@ -134,6 +142,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
       Map.empty,
       Map.empty,
       Map.empty,
+      Nil,
       Seq.empty[String])
     validateStepTypesApplied(
       builderUnderTest.buildFromFeatures(conf),
@@ -159,6 +168,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
       Map.empty,
       Map.empty,
       Map.empty,
+      Nil,
       Seq.empty[String])
     validateStepTypesApplied(
       builderUnderTest.buildFromFeatures(conf),
@@ -169,6 +179,39 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
       PYSPARK_STEP_TYPE)
   }
 
+  test("Apply volumes step if mounts are present.") {
+    val volumeSpec = KubernetesVolumeSpec(
+      "volume",
+      "/tmp",
+      false,
+      KubernetesHostPathVolumeConf("/path"))
+    val conf = KubernetesConf(
+      new SparkConf(false),
+      KubernetesDriverSpecificConf(
+        None,
+        "test-app",
+        "main",
+        Seq.empty),
+      "prefix",
+      "appId",
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      volumeSpec :: Nil,
+      Seq.empty[String])
+    validateStepTypesApplied(
+      builderUnderTest.buildFromFeatures(conf),
+      BASIC_STEP_TYPE,
+      CREDENTIALS_STEP_TYPE,
+      SERVICE_STEP_TYPE,
+      LOCAL_DIRS_STEP_TYPE,
+      MOUNT_VOLUMES_STEP_TYPE,
+      JAVA_STEP_TYPE)
+  }
+
+
   private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec, stepTypes: String*)
     : Unit = {
     assert(resolvedSpec.systemProperties.size === stepTypes.size)

http://git-wip-us.apache.org/repos/asf/spark/blob/5ff1b9ba/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
index a6bc8bc..d0b4127 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
@@ -19,14 +19,15 @@ package org.apache.spark.scheduler.cluster.k8s
 import io.fabric8.kubernetes.api.model.PodBuilder
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod}
-import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, EnvSecretsFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountSecretsFeatureStep}
+import org.apache.spark.deploy.k8s._
+import org.apache.spark.deploy.k8s.features._
 
 class KubernetesExecutorBuilderSuite extends SparkFunSuite {
   private val BASIC_STEP_TYPE = "basic"
   private val SECRETS_STEP_TYPE = "mount-secrets"
   private val ENV_SECRETS_STEP_TYPE = "env-secrets"
   private val LOCAL_DIRS_STEP_TYPE = "local-dirs"
+  private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes"
 
   private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
     BASIC_STEP_TYPE, classOf[BasicExecutorFeatureStep])
@@ -36,12 +37,15 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
     ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep])
   private val localDirsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
     LOCAL_DIRS_STEP_TYPE, classOf[LocalDirsFeatureStep])
+  private val mountVolumesStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+    MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep])
 
   private val builderUnderTest = new KubernetesExecutorBuilder(
     _ => basicFeatureStep,
     _ => mountSecretsStep,
     _ => envSecretsStep,
-    _ => localDirsStep)
+    _ => localDirsStep,
+    _ => mountVolumesStep)
 
   test("Basic steps are consistently applied.") {
     val conf = KubernetesConf(
@@ -55,6 +59,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
       Map.empty,
       Map.empty,
       Map.empty,
+      Nil,
       Seq.empty[String])
     validateStepTypesApplied(
       builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE)
@@ -72,6 +77,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
       Map("secret" -> "secretMountPath"),
       Map("secret-name" -> "secret-key"),
       Map.empty,
+      Nil,
       Seq.empty[String])
     validateStepTypesApplied(
       builderUnderTest.buildFromFeatures(conf),
@@ -81,6 +87,32 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
       ENV_SECRETS_STEP_TYPE)
   }
 
+  test("Apply volumes step if mounts are present.") {
+    val volumeSpec = KubernetesVolumeSpec(
+      "volume",
+      "/tmp",
+      false,
+      KubernetesHostPathVolumeConf("/checkpoint"))
+    val conf = KubernetesConf(
+      new SparkConf(false),
+      KubernetesExecutorSpecificConf(
+        "executor-id", new PodBuilder().build()),
+      "prefix",
+      "appId",
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      volumeSpec :: Nil,
+      Seq.empty[String])
+    validateStepTypesApplied(
+      builderUnderTest.buildFromFeatures(conf),
+      BASIC_STEP_TYPE,
+      LOCAL_DIRS_STEP_TYPE,
+      MOUNT_VOLUMES_STEP_TYPE)
+  }
+
   private def validateStepTypesApplied(resolvedPod: SparkPod, stepTypes: String*): Unit = {
     assert(resolvedPod.pod.getMetadata.getLabels.size === stepTypes.size)
     stepTypes.foreach { stepType =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org