You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/10/26 23:13:54 UTC

spark git commit: [SPARK-22131][MESOS] Mesos driver secrets

Repository: spark
Updated Branches:
  refs/heads/master 4f8dc6b01 -> 5415963d2


[SPARK-22131][MESOS] Mesos driver secrets

## Background

In #18837 , ArtRand added Mesos secrets support to the dispatcher. **This PR is to add the same secrets support to the drivers.** This means if the secret configs are set, the driver will launch executors that have access to either env or file-based secrets.

One use case for this is to support TLS in the driver <=> executor communication.

## What changes were proposed in this pull request?

Most of the changes are a refactor of the dispatcher secrets support (#18837) - moving it to a common place that can be used by both the dispatcher and drivers. The same goes for the unit tests.

## How was this patch tested?

There are four config combinations: [env or file-based] x [value or reference secret]. For each combination:
- Added a unit test.
- Tested in DC/OS.

Author: Susan X. Huynh <xh...@mesosphere.com>

Closes #19437 from susanxhuynh/sh-mesos-driver-secret.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5415963d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5415963d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5415963d

Branch: refs/heads/master
Commit: 5415963d2caaf95604211419ffc4e29fff38e1d7
Parents: 4f8dc6b
Author: Susan X. Huynh <xh...@mesosphere.com>
Authored: Thu Oct 26 16:13:48 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Oct 26 16:13:48 2017 -0700

----------------------------------------------------------------------
 docs/running-on-mesos.md                        | 111 +++++++++++---
 .../org/apache/spark/deploy/mesos/config.scala  |  64 ++++----
 .../cluster/mesos/MesosClusterScheduler.scala   | 138 ++++-------------
 .../MesosCoarseGrainedSchedulerBackend.scala    |  31 +++-
 .../MesosFineGrainedSchedulerBackend.scala      |   4 +-
 .../mesos/MesosSchedulerBackendUtil.scala       |  92 +++++++++++-
 .../mesos/MesosClusterSchedulerSuite.scala      | 150 +++----------------
 ...esosCoarseGrainedSchedulerBackendSuite.scala |  34 ++++-
 .../mesos/MesosSchedulerBackendUtilSuite.scala  |   7 +-
 .../spark/scheduler/cluster/mesos/Utils.scala   | 107 +++++++++++++
 10 files changed, 434 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5415963d/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index e0944bc..b7e3e64 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -485,39 +485,106 @@ See the [configuration page](configuration.html) for information on Spark config
 </tr>
 
 <tr>
-  <td><code>spark.mesos.driver.secret.envkeys</code></td>
-  <td><code>(none)</code></td>
   <td>
-    A comma-separated list that, if set, the contents of the secret referenced
-    by spark.mesos.driver.secret.names or spark.mesos.driver.secret.values will be
-    set to the provided environment variable in the driver's process.
+    <code>spark.mesos.driver.secret.values</code>,
+    <code>spark.mesos.driver.secret.names</code>,
+    <code>spark.mesos.executor.secret.values</code>,
+    <code>spark.mesos.executor.secret.names</code>,
   </td>
-  </tr>
-  <tr>
-<td><code>spark.mesos.driver.secret.filenames</code></td>
   <td><code>(none)</code></td>
   <td>
-    A comma-separated list that, if set, the contents of the secret referenced by
-    spark.mesos.driver.secret.names or spark.mesos.driver.secret.values will be
-    written to the provided file. Paths are relative to the container's work
-    directory.  Absolute paths must already exist.  Consult the Mesos Secret
-    protobuf for more information.
+    <p>
+      A secret is specified by its contents and destination. These properties
+      specify a secret's contents. To specify a secret's destination, see the cell below.
+    </p>
+    <p>
+      You can specify a secret's contents either (1) by value or (2) by reference.
+    </p>
+    <p>
+      (1) To specify a secret by value, set the
+      <code>spark.mesos.[driver|executor].secret.values</code>
+      property, to make the secret available in the driver or executors.
+      For example, to make a secret password "guessme" available to the driver process, set:
+    
+      <pre>spark.mesos.driver.secret.values=guessme</pre>
+    </p>
+    <p>
+      (2) To specify a secret that has been placed in a secret store
+      by reference, specify its name within the secret store
+      by setting the <code>spark.mesos.[driver|executor].secret.names</code>
+      property. For example, to make a secret password named "password" in a secret store
+      available to the driver process, set:
+
+      <pre>spark.mesos.driver.secret.names=password</pre>
+    </p>
+    <p>
+      Note: To use a secret store, make sure one has been integrated with Mesos via a custom
+      <a href="http://mesos.apache.org/documentation/latest/secrets/">SecretResolver
+      module</a>.
+    </p>
+    <p>
+      To specify multiple secrets, provide a comma-separated list:
+
+      <pre>spark.mesos.driver.secret.values=guessme,passwd123</pre>
+
+      or
+
+      <pre>spark.mesos.driver.secret.names=password1,password2</pre>
+    </p>
   </td>
 </tr>
+
 <tr>
-  <td><code>spark.mesos.driver.secret.names</code></td>
-  <td><code>(none)</code></td>
   <td>
-    A comma-separated list of secret references. Consult the Mesos Secret
-    protobuf for more information.
+    <code>spark.mesos.driver.secret.envkeys</code>,
+    <code>spark.mesos.driver.secret.filenames</code>,
+    <code>spark.mesos.executor.secret.envkeys</code>,
+    <code>spark.mesos.executor.secret.filenames</code>,
   </td>
-</tr>
-<tr>
-  <td><code>spark.mesos.driver.secret.values</code></td>
   <td><code>(none)</code></td>
   <td>
-    A comma-separated list of secret values. Consult the Mesos Secret
-    protobuf for more information.
+    <p>
+      A secret is specified by its contents and destination. These properties
+      specify a secret's destination. To specify a secret's contents, see the cell above.
+    </p>
+    <p>
+      You can specify a secret's destination in the driver or
+      executors as either (1) an environment variable or (2) as a file.
+    </p>
+    <p>
+      (1) To make an environment-based secret, set the
+      <code>spark.mesos.[driver|executor].secret.envkeys</code> property.
+      The secret will appear as an environment variable with the
+      given name in the driver or executors. For example, to make a secret password available
+      to the driver process as $PASSWORD, set:
+
+      <pre>spark.mesos.driver.secret.envkeys=PASSWORD</pre>
+    </p>
+    <p>
+      (2) To make a file-based secret, set the
+      <code>spark.mesos.[driver|executor].secret.filenames</code> property.
+      The secret will appear in the contents of a file with the given file name in
+      the driver or executors. For example, to make a secret password available in a
+      file named "pwdfile" in the driver process, set:
+
+      <pre>spark.mesos.driver.secret.filenames=pwdfile</pre>
+    </p>
+    <p>
+      Paths are relative to the container's work directory. Absolute paths must
+      already exist. Note: File-based secrets require a custom
+      <a href="http://mesos.apache.org/documentation/latest/secrets/">SecretResolver
+      module</a>.
+    </p>
+    <p>
+      To specify env vars or file names corresponding to multiple secrets,
+      provide a comma-separated list:
+
+      <pre>spark.mesos.driver.secret.envkeys=PASSWORD1,PASSWORD2</pre>
+    
+      or
+    
+      <pre>spark.mesos.driver.secret.filenames=pwdfile1,pwdfile2</pre>
+    </p>
   </td>
 </tr>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5415963d/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
index 7e85de9..821534e 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
@@ -23,6 +23,39 @@ import org.apache.spark.internal.config.ConfigBuilder
 
 package object config {
 
+  private[spark] class MesosSecretConfig private[config](taskType: String) {
+    private[spark] val SECRET_NAMES =
+      ConfigBuilder(s"spark.mesos.$taskType.secret.names")
+        .doc("A comma-separated list of secret reference names. Consult the Mesos Secret " +
+          "protobuf for more information.")
+        .stringConf
+        .toSequence
+        .createOptional
+
+    private[spark] val SECRET_VALUES =
+      ConfigBuilder(s"spark.mesos.$taskType.secret.values")
+        .doc("A comma-separated list of secret values.")
+        .stringConf
+        .toSequence
+        .createOptional
+
+    private[spark] val SECRET_ENVKEYS =
+      ConfigBuilder(s"spark.mesos.$taskType.secret.envkeys")
+        .doc("A comma-separated list of the environment variables to contain the secrets." +
+          "The environment variable will be set on the driver.")
+        .stringConf
+        .toSequence
+        .createOptional
+
+    private[spark] val SECRET_FILENAMES =
+      ConfigBuilder(s"spark.mesos.$taskType.secret.filenames")
+        .doc("A comma-separated list of file paths secret will be written to.  Consult the Mesos " +
+          "Secret protobuf for more information.")
+        .stringConf
+        .toSequence
+        .createOptional
+  }
+
   /* Common app configuration. */
 
   private[spark] val SHUFFLE_CLEANER_INTERVAL_S =
@@ -64,36 +97,9 @@ package object config {
       .stringConf
       .createOptional
 
-  private[spark] val SECRET_NAME =
-    ConfigBuilder("spark.mesos.driver.secret.names")
-      .doc("A comma-separated list of secret reference names. Consult the Mesos Secret protobuf " +
-        "for more information.")
-      .stringConf
-      .toSequence
-      .createOptional
-
-  private[spark] val SECRET_VALUE =
-    ConfigBuilder("spark.mesos.driver.secret.values")
-      .doc("A comma-separated list of secret values.")
-      .stringConf
-      .toSequence
-      .createOptional
+  private[spark] val driverSecretConfig = new MesosSecretConfig("driver")
 
-  private[spark] val SECRET_ENVKEY =
-    ConfigBuilder("spark.mesos.driver.secret.envkeys")
-      .doc("A comma-separated list of the environment variables to contain the secrets." +
-        "The environment variable will be set on the driver.")
-      .stringConf
-      .toSequence
-      .createOptional
-
-  private[spark] val SECRET_FILENAME =
-    ConfigBuilder("spark.mesos.driver.secret.filenames")
-      .doc("A comma-seperated list of file paths secret will be written to.  Consult the Mesos " +
-        "Secret protobuf for more information.")
-      .stringConf
-      .toSequence
-      .createOptional
+  private[spark] val executorSecretConfig = new MesosSecretConfig("executor")
 
   private[spark] val DRIVER_FAILOVER_TIMEOUT =
     ConfigBuilder("spark.mesos.driver.failoverTimeout")

http://git-wip-us.apache.org/repos/asf/spark/blob/5415963d/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index ec533f9..8247026 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -28,7 +28,6 @@ import org.apache.mesos.{Scheduler, SchedulerDriver}
 import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
 import org.apache.mesos.Protos.Environment.Variable
 import org.apache.mesos.Protos.TaskStatus.Reason
-import org.apache.mesos.protobuf.ByteString
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
 import org.apache.spark.deploy.mesos.MesosDriverDescription
@@ -394,39 +393,20 @@ private[spark] class MesosClusterScheduler(
     }
 
     // add secret environment variables
-    getSecretEnvVar(desc).foreach { variable =>
-      if (variable.getSecret.getReference.isInitialized) {
-        logInfo(s"Setting reference secret ${variable.getSecret.getReference.getName}" +
-          s"on file ${variable.getName}")
-      } else {
-        logInfo(s"Setting secret on environment variable name=${variable.getName}")
-      }
-      envBuilder.addVariables(variable)
+    MesosSchedulerBackendUtil.getSecretEnvVar(desc.conf, config.driverSecretConfig)
+      .foreach { variable =>
+        if (variable.getSecret.getReference.isInitialized) {
+          logInfo(s"Setting reference secret ${variable.getSecret.getReference.getName} " +
+            s"on file ${variable.getName}")
+        } else {
+          logInfo(s"Setting secret on environment variable name=${variable.getName}")
+        }
+        envBuilder.addVariables(variable)
     }
 
     envBuilder.build()
   }
 
-  private def getSecretEnvVar(desc: MesosDriverDescription): List[Variable] = {
-    val secrets = getSecrets(desc)
-    val secretEnvKeys = desc.conf.get(config.SECRET_ENVKEY).getOrElse(Nil)
-    if (illegalSecretInput(secretEnvKeys, secrets)) {
-      throw new SparkException(
-        s"Need to give equal numbers of secrets and environment keys " +
-          s"for environment-based reference secrets got secrets $secrets, " +
-          s"and keys $secretEnvKeys")
-    }
-
-    secrets.zip(secretEnvKeys).map {
-      case (s, k) =>
-        Variable.newBuilder()
-          .setName(k)
-          .setType(Variable.Type.SECRET)
-          .setSecret(s)
-          .build
-    }.toList
-  }
-
   private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = {
     val confUris = List(conf.getOption("spark.mesos.uris"),
       desc.conf.getOption("spark.mesos.uris"),
@@ -440,6 +420,23 @@ private[spark] class MesosClusterScheduler(
       CommandInfo.URI.newBuilder().setValue(uri.trim()).setCache(useFetchCache).build())
   }
 
+  private def getContainerInfo(desc: MesosDriverDescription): ContainerInfo.Builder = {
+    val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo(desc.conf)
+
+    MesosSchedulerBackendUtil.getSecretVolume(desc.conf, config.driverSecretConfig)
+      .foreach { volume =>
+        if (volume.getSource.getSecret.getReference.isInitialized) {
+          logInfo(s"Setting reference secret ${volume.getSource.getSecret.getReference.getName} " +
+            s"on file ${volume.getContainerPath}")
+        } else {
+          logInfo(s"Setting secret on file name=${volume.getContainerPath}")
+        }
+        containerInfo.addVolumes(volume)
+    }
+
+    containerInfo
+  }
+
   private def getDriverCommandValue(desc: MesosDriverDescription): String = {
     val dockerDefined = desc.conf.contains("spark.mesos.executor.docker.image")
     val executorUri = getDriverExecutorURI(desc)
@@ -579,89 +576,6 @@ private[spark] class MesosClusterScheduler(
       .build
   }
 
-  private def getContainerInfo(desc: MesosDriverDescription): ContainerInfo.Builder = {
-    val containerInfo = MesosSchedulerBackendUtil.containerInfo(desc.conf)
-
-    getSecretVolume(desc).foreach { volume =>
-      if (volume.getSource.getSecret.getReference.isInitialized) {
-        logInfo(s"Setting reference secret ${volume.getSource.getSecret.getReference.getName}" +
-          s"on file ${volume.getContainerPath}")
-      } else {
-        logInfo(s"Setting secret on file name=${volume.getContainerPath}")
-      }
-      containerInfo.addVolumes(volume)
-    }
-
-    containerInfo
-  }
-
-
-  private def getSecrets(desc: MesosDriverDescription): Seq[Secret] = {
-    def createValueSecret(data: String): Secret = {
-      Secret.newBuilder()
-        .setType(Secret.Type.VALUE)
-        .setValue(Secret.Value.newBuilder().setData(ByteString.copyFrom(data.getBytes)))
-        .build()
-    }
-
-    def createReferenceSecret(name: String): Secret = {
-      Secret.newBuilder()
-        .setReference(Secret.Reference.newBuilder().setName(name))
-        .setType(Secret.Type.REFERENCE)
-        .build()
-    }
-
-    val referenceSecrets: Seq[Secret] =
-      desc.conf.get(config.SECRET_NAME).getOrElse(Nil).map(s => createReferenceSecret(s))
-
-    val valueSecrets: Seq[Secret] = {
-      desc.conf.get(config.SECRET_VALUE).getOrElse(Nil).map(s => createValueSecret(s))
-    }
-
-    if (valueSecrets.nonEmpty && referenceSecrets.nonEmpty) {
-      throw new SparkException("Cannot specify VALUE type secrets and REFERENCE types ones")
-    }
-
-    if (referenceSecrets.nonEmpty) referenceSecrets else valueSecrets
-  }
-
-  private def illegalSecretInput(dest: Seq[String], s: Seq[Secret]): Boolean = {
-    if (dest.isEmpty) {  // no destination set (ie not using secrets of this type
-      return false
-    }
-    if (dest.nonEmpty && s.nonEmpty) {
-      // make sure there is a destination for each secret of this type
-      if (dest.length != s.length) {
-        return true
-      }
-    }
-    false
-  }
-
-  private def getSecretVolume(desc: MesosDriverDescription): List[Volume] = {
-    val secrets = getSecrets(desc)
-    val secretPaths: Seq[String] =
-      desc.conf.get(config.SECRET_FILENAME).getOrElse(Nil)
-
-    if (illegalSecretInput(secretPaths, secrets)) {
-      throw new SparkException(
-        s"Need to give equal numbers of secrets and file paths for file-based " +
-          s"reference secrets got secrets $secrets, and paths $secretPaths")
-    }
-
-    secrets.zip(secretPaths).map {
-      case (s, p) =>
-        val source = Volume.Source.newBuilder()
-          .setType(Volume.Source.Type.SECRET)
-          .setSecret(s)
-        Volume.newBuilder()
-          .setContainerPath(p)
-          .setSource(source)
-          .setMode(Volume.Mode.RO)
-          .build
-    }.toList
-  }
-
   /**
    * This method takes all the possible candidates and attempt to schedule them with Mesos offers.
    * Every time a new task is scheduled, the afterLaunchCallback is called to perform post scheduled

http://git-wip-us.apache.org/repos/asf/spark/blob/5415963d/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 603c980..104ed01 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -28,7 +28,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.concurrent.Future
 
-import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
+import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException, TaskState}
 import org.apache.spark.deploy.mesos.config._
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.internal.config
@@ -244,6 +244,17 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
         .setValue(value)
         .build())
     }
+
+    MesosSchedulerBackendUtil.getSecretEnvVar(conf, executorSecretConfig).foreach { variable =>
+      if (variable.getSecret.getReference.isInitialized) {
+        logInfo(s"Setting reference secret ${variable.getSecret.getReference.getName} " +
+          s"on file ${variable.getName}")
+      } else {
+        logInfo(s"Setting secret on environment variable name=${variable.getName}")
+      }
+      environment.addVariables(variable)
+    }
+
     val command = CommandInfo.newBuilder()
       .setEnvironment(environment)
 
@@ -424,6 +435,22 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
     }
   }
 
+  private def getContainerInfo(conf: SparkConf): ContainerInfo.Builder = {
+    val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo(conf)
+
+    MesosSchedulerBackendUtil.getSecretVolume(conf, executorSecretConfig).foreach { volume =>
+      if (volume.getSource.getSecret.getReference.isInitialized) {
+        logInfo(s"Setting reference secret ${volume.getSource.getSecret.getReference.getName} " +
+          s"on file ${volume.getContainerPath}")
+      } else {
+        logInfo(s"Setting secret on file name=${volume.getContainerPath}")
+      }
+      containerInfo.addVolumes(volume)
+    }
+
+    containerInfo
+  }
+
   /**
    * Returns a map from OfferIDs to the tasks to launch on those offers.  In order to maximize
    * per-task memory and IO, tasks are round-robin assigned to offers.
@@ -475,7 +502,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
             .setName(s"${sc.appName} $taskId")
             .setLabels(MesosProtoUtils.mesosLabels(taskLabels))
             .addAllResources(resourcesToUse.asJava)
-            .setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))
+            .setContainer(getContainerInfo(sc.conf))
 
           tasks(offer.getId) ::= taskBuilder.build()
           remainingResources(offerId) = resourcesLeft.asJava

http://git-wip-us.apache.org/repos/asf/spark/blob/5415963d/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index 66b8e0a..d6d939d 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -28,6 +28,7 @@ import org.apache.mesos.SchedulerDriver
 import org.apache.mesos.protobuf.ByteString
 
 import org.apache.spark.{SparkContext, SparkException, TaskState}
+import org.apache.spark.deploy.mesos.config
 import org.apache.spark.executor.MesosExecutorBackend
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -159,7 +160,8 @@ private[spark] class MesosFineGrainedSchedulerBackend(
       .setCommand(command)
       .setData(ByteString.copyFrom(createExecArg()))
 
-    executorInfo.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))
+    executorInfo.setContainer(
+      MesosSchedulerBackendUtil.buildContainerInfo(sc.conf))
     (executorInfo.build(), resourcesAfterMem.asJava)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5415963d/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index f29e541..bfb7361 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -17,11 +17,15 @@
 
 package org.apache.spark.scheduler.cluster.mesos
 
-import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Parameter, Volume}
+import org.apache.mesos.Protos.{ContainerInfo, Environment, Image, NetworkInfo, Parameter, Secret, Volume}
 import org.apache.mesos.Protos.ContainerInfo.{DockerInfo, MesosInfo}
+import org.apache.mesos.Protos.Environment.Variable
+import org.apache.mesos.protobuf.ByteString
 
-import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkException
 import org.apache.spark.deploy.mesos.config.{NETWORK_LABELS, NETWORK_NAME}
+import org.apache.spark.deploy.mesos.config.MesosSecretConfig
 import org.apache.spark.internal.Logging
 
 /**
@@ -122,7 +126,7 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
     .toList
   }
 
-  def containerInfo(conf: SparkConf): ContainerInfo.Builder = {
+  def buildContainerInfo(conf: SparkConf): ContainerInfo.Builder = {
     val containerType = if (conf.contains("spark.mesos.executor.docker.image") &&
       conf.get("spark.mesos.containerizer", "docker") == "docker") {
       ContainerInfo.Type.DOCKER
@@ -173,6 +177,88 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
     containerInfo
   }
 
+  private def getSecrets(conf: SparkConf, secretConfig: MesosSecretConfig): Seq[Secret] = {
+    def createValueSecret(data: String): Secret = {
+      Secret.newBuilder()
+        .setType(Secret.Type.VALUE)
+        .setValue(Secret.Value.newBuilder().setData(ByteString.copyFrom(data.getBytes)))
+        .build()
+    }
+
+    def createReferenceSecret(name: String): Secret = {
+      Secret.newBuilder()
+        .setReference(Secret.Reference.newBuilder().setName(name))
+        .setType(Secret.Type.REFERENCE)
+        .build()
+    }
+
+    val referenceSecrets: Seq[Secret] =
+      conf.get(secretConfig.SECRET_NAMES).getOrElse(Nil).map { s => createReferenceSecret(s) }
+
+    val valueSecrets: Seq[Secret] = {
+      conf.get(secretConfig.SECRET_VALUES).getOrElse(Nil).map { s => createValueSecret(s) }
+    }
+
+    if (valueSecrets.nonEmpty && referenceSecrets.nonEmpty) {
+      throw new SparkException("Cannot specify both value-type and reference-type secrets.")
+    }
+
+    if (referenceSecrets.nonEmpty) referenceSecrets else valueSecrets
+  }
+
+  private def illegalSecretInput(dest: Seq[String], secrets: Seq[Secret]): Boolean = {
+    if (dest.nonEmpty) {
+      // make sure there is a one-to-one correspondence between destinations and secrets
+      if (dest.length != secrets.length) {
+        return true
+      }
+    }
+    false
+  }
+
+  def getSecretVolume(conf: SparkConf, secretConfig: MesosSecretConfig): List[Volume] = {
+    val secrets = getSecrets(conf, secretConfig)
+    val secretPaths: Seq[String] =
+      conf.get(secretConfig.SECRET_FILENAMES).getOrElse(Nil)
+
+    if (illegalSecretInput(secretPaths, secrets)) {
+      throw new SparkException(
+        s"Need to give equal numbers of secrets and file paths for file-based " +
+          s"reference secrets got secrets $secrets, and paths $secretPaths")
+    }
+
+    secrets.zip(secretPaths).map { case (s, p) =>
+      val source = Volume.Source.newBuilder()
+        .setType(Volume.Source.Type.SECRET)
+        .setSecret(s)
+      Volume.newBuilder()
+        .setContainerPath(p)
+        .setSource(source)
+        .setMode(Volume.Mode.RO)
+        .build
+    }.toList
+  }
+
+  def getSecretEnvVar(conf: SparkConf, secretConfig: MesosSecretConfig):
+    List[Variable] = {
+    val secrets = getSecrets(conf, secretConfig)
+    val secretEnvKeys = conf.get(secretConfig.SECRET_ENVKEYS).getOrElse(Nil)
+    if (illegalSecretInput(secretEnvKeys, secrets)) {
+      throw new SparkException(
+        s"Need to give equal numbers of secrets and environment keys " +
+          s"for environment-based reference secrets got secrets $secrets, " +
+          s"and keys $secretEnvKeys")
+    }
+
+    secrets.zip(secretEnvKeys).map { case (s, k) =>
+      Variable.newBuilder()
+        .setName(k)
+        .setType(Variable.Type.SECRET)
+        .setSecret(s)
+        .build
+    }.toList
+  }
+
   private def dockerInfo(
       image: String,
       forcePullImage: Boolean,

http://git-wip-us.apache.org/repos/asf/spark/blob/5415963d/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index ff63e3f..77acee6 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -24,7 +24,6 @@ import scala.collection.JavaConverters._
 import org.apache.mesos.Protos.{Environment, Secret, TaskState => MesosTaskState, _}
 import org.apache.mesos.Protos.Value.{Scalar, Type}
 import org.apache.mesos.SchedulerDriver
-import org.apache.mesos.protobuf.ByteString
 import org.mockito.{ArgumentCaptor, Matchers}
 import org.mockito.Mockito._
 import org.scalatest.mockito.MockitoSugar
@@ -32,6 +31,7 @@ import org.scalatest.mockito.MockitoSugar
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.Command
 import org.apache.spark.deploy.mesos.MesosDriverDescription
+import org.apache.spark.deploy.mesos.config
 
 class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
 
@@ -341,132 +341,33 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
   }
 
   test("Creates an env-based reference secrets.") {
-    setScheduler()
-
-    val mem = 1000
-    val cpu = 1
-    val secretName = "/path/to/secret,/anothersecret"
-    val envKey = "SECRET_ENV_KEY,PASSWORD"
-    val driverDesc = new MesosDriverDescription(
-      "d1",
-      "jar",
-      mem,
-      cpu,
-      true,
-      command,
-      Map("spark.mesos.executor.home" -> "test",
-        "spark.app.name" -> "test",
-        "spark.mesos.driver.secret.names" -> secretName,
-        "spark.mesos.driver.secret.envkeys" -> envKey),
-      "s1",
-      new Date())
-    val response = scheduler.submitDriver(driverDesc)
-    assert(response.success)
-    val offer = Utils.createOffer("o1", "s1", mem, cpu)
-    scheduler.resourceOffers(driver, Collections.singletonList(offer))
-    val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
-    assert(launchedTasks.head
-      .getCommand
-      .getEnvironment
-      .getVariablesCount == 3)  // SPARK_SUBMIT_OPS and the secret
-    val variableOne = launchedTasks.head.getCommand.getEnvironment
-      .getVariablesList.asScala.filter(_.getName == "SECRET_ENV_KEY").head
-    assert(variableOne.getSecret.isInitialized)
-    assert(variableOne.getSecret.getType == Secret.Type.REFERENCE)
-    assert(variableOne.getSecret.getReference.getName == "/path/to/secret")
-    assert(variableOne.getType == Environment.Variable.Type.SECRET)
-    val variableTwo = launchedTasks.head.getCommand.getEnvironment
-      .getVariablesList.asScala.filter(_.getName == "PASSWORD").head
-    assert(variableTwo.getSecret.isInitialized)
-    assert(variableTwo.getSecret.getType == Secret.Type.REFERENCE)
-    assert(variableTwo.getSecret.getReference.getName == "/anothersecret")
-    assert(variableTwo.getType == Environment.Variable.Type.SECRET)
+    val launchedTasks = launchDriverTask(
+      Utils.configEnvBasedRefSecrets(config.driverSecretConfig))
+    Utils.verifyEnvBasedRefSecrets(launchedTasks)
   }
 
   test("Creates an env-based value secrets.") {
-    setScheduler()
-    val mem = 1000
-    val cpu = 1
-    val secretValues = "user,password"
-    val envKeys = "USER,PASSWORD"
-    val driverDesc = new MesosDriverDescription(
-      "d1",
-      "jar",
-      mem,
-      cpu,
-      true,
-      command,
-      Map("spark.mesos.executor.home" -> "test",
-        "spark.app.name" -> "test",
-        "spark.mesos.driver.secret.values" -> secretValues,
-        "spark.mesos.driver.secret.envkeys" -> envKeys),
-      "s1",
-      new Date())
-    val response = scheduler.submitDriver(driverDesc)
-    assert(response.success)
-    val offer = Utils.createOffer("o1", "s1", mem, cpu)
-    scheduler.resourceOffers(driver, Collections.singletonList(offer))
-    val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
-    assert(launchedTasks.head
-      .getCommand
-      .getEnvironment
-      .getVariablesCount == 3)  // SPARK_SUBMIT_OPS and the secret
-    val variableOne = launchedTasks.head.getCommand.getEnvironment
-      .getVariablesList.asScala.filter(_.getName == "USER").head
-    assert(variableOne.getSecret.isInitialized)
-    assert(variableOne.getSecret.getType == Secret.Type.VALUE)
-    assert(variableOne.getSecret.getValue.getData == ByteString.copyFrom("user".getBytes))
-    assert(variableOne.getType == Environment.Variable.Type.SECRET)
-    val variableTwo = launchedTasks.head.getCommand.getEnvironment
-      .getVariablesList.asScala.filter(_.getName == "PASSWORD").head
-    assert(variableTwo.getSecret.isInitialized)
-    assert(variableTwo.getSecret.getType == Secret.Type.VALUE)
-    assert(variableTwo.getSecret.getValue.getData == ByteString.copyFrom("password".getBytes))
-    assert(variableTwo.getType == Environment.Variable.Type.SECRET)
+    val launchedTasks = launchDriverTask(
+      Utils.configEnvBasedValueSecrets(config.driverSecretConfig))
+    Utils.verifyEnvBasedValueSecrets(launchedTasks)
   }
 
   test("Creates file-based reference secrets.") {
-    setScheduler()
-    val mem = 1000
-    val cpu = 1
-    val secretName = "/path/to/secret,/anothersecret"
-    val secretPath = "/topsecret,/mypassword"
-    val driverDesc = new MesosDriverDescription(
-      "d1",
-      "jar",
-      mem,
-      cpu,
-      true,
-      command,
-      Map("spark.mesos.executor.home" -> "test",
-        "spark.app.name" -> "test",
-        "spark.mesos.driver.secret.names" -> secretName,
-        "spark.mesos.driver.secret.filenames" -> secretPath),
-      "s1",
-      new Date())
-    val response = scheduler.submitDriver(driverDesc)
-    assert(response.success)
-    val offer = Utils.createOffer("o1", "s1", mem, cpu)
-    scheduler.resourceOffers(driver, Collections.singletonList(offer))
-    val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
-    val volumes = launchedTasks.head.getContainer.getVolumesList
-    assert(volumes.size() == 2)
-    val secretVolOne = volumes.get(0)
-    assert(secretVolOne.getContainerPath == "/topsecret")
-    assert(secretVolOne.getSource.getSecret.getType == Secret.Type.REFERENCE)
-    assert(secretVolOne.getSource.getSecret.getReference.getName == "/path/to/secret")
-    val secretVolTwo = volumes.get(1)
-    assert(secretVolTwo.getContainerPath == "/mypassword")
-    assert(secretVolTwo.getSource.getSecret.getType == Secret.Type.REFERENCE)
-    assert(secretVolTwo.getSource.getSecret.getReference.getName == "/anothersecret")
+    val launchedTasks = launchDriverTask(
+      Utils.configFileBasedRefSecrets(config.driverSecretConfig))
+    Utils.verifyFileBasedRefSecrets(launchedTasks)
   }
 
   test("Creates a file-based value secrets.") {
+    val launchedTasks = launchDriverTask(
+      Utils.configFileBasedValueSecrets(config.driverSecretConfig))
+    Utils.verifyFileBasedValueSecrets(launchedTasks)
+  }
+
+  private def launchDriverTask(addlSparkConfVars: Map[String, String]): List[TaskInfo] = {
     setScheduler()
     val mem = 1000
     val cpu = 1
-    val secretValues = "user,password"
-    val secretPath = "/whoami,/mypassword"
     val driverDesc = new MesosDriverDescription(
       "d1",
       "jar",
@@ -475,27 +376,14 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
       true,
       command,
       Map("spark.mesos.executor.home" -> "test",
-        "spark.app.name" -> "test",
-        "spark.mesos.driver.secret.values" -> secretValues,
-        "spark.mesos.driver.secret.filenames" -> secretPath),
+        "spark.app.name" -> "test") ++
+        addlSparkConfVars,
       "s1",
       new Date())
     val response = scheduler.submitDriver(driverDesc)
     assert(response.success)
     val offer = Utils.createOffer("o1", "s1", mem, cpu)
     scheduler.resourceOffers(driver, Collections.singletonList(offer))
-    val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
-    val volumes = launchedTasks.head.getContainer.getVolumesList
-    assert(volumes.size() == 2)
-    val secretVolOne = volumes.get(0)
-    assert(secretVolOne.getContainerPath == "/whoami")
-    assert(secretVolOne.getSource.getSecret.getType == Secret.Type.VALUE)
-    assert(secretVolOne.getSource.getSecret.getValue.getData ==
-      ByteString.copyFrom("user".getBytes))
-    val secretVolTwo = volumes.get(1)
-    assert(secretVolTwo.getContainerPath == "/mypassword")
-    assert(secretVolTwo.getSource.getSecret.getType == Secret.Type.VALUE)
-    assert(secretVolTwo.getSource.getSecret.getValue.getData ==
-      ByteString.copyFrom("password".getBytes))
+    Utils.verifyTaskLaunched(driver, "o1")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5415963d/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 6c40792..f4bd1ee 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConverters._
 import scala.concurrent.duration._
-import scala.reflect.ClassTag
 
 import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
 import org.apache.mesos.Protos._
@@ -38,7 +37,7 @@ import org.apache.spark.internal.config._
 import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
 import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor}
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor}
 import org.apache.spark.scheduler.cluster.mesos.Utils._
 
 class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
@@ -653,6 +652,37 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     offerResourcesAndVerify(2, true)
   }
 
+  test("Creates an env-based reference secrets.") {
+    val launchedTasks = launchExecutorTasks(configEnvBasedRefSecrets(executorSecretConfig))
+    verifyEnvBasedRefSecrets(launchedTasks)
+  }
+
+  test("Creates an env-based value secrets.") {
+    val launchedTasks = launchExecutorTasks(configEnvBasedValueSecrets(executorSecretConfig))
+    verifyEnvBasedValueSecrets(launchedTasks)
+  }
+
+  test("Creates file-based reference secrets.") {
+    val launchedTasks = launchExecutorTasks(configFileBasedRefSecrets(executorSecretConfig))
+    verifyFileBasedRefSecrets(launchedTasks)
+  }
+
+  test("Creates a file-based value secrets.") {
+    val launchedTasks = launchExecutorTasks(configFileBasedValueSecrets(executorSecretConfig))
+    verifyFileBasedValueSecrets(launchedTasks)
+  }
+
+  private def launchExecutorTasks(sparkConfVars: Map[String, String]): List[TaskInfo] = {
+    setBackend(sparkConfVars)
+
+    val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+    val offer1 = createOffer("o1", "s1", mem, cpu)
+    backend.resourceOffers(driver, List(offer1).asJava)
+
+    verifyTaskLaunched(driver, "o1")
+  }
+
   private case class Resources(mem: Int, cpus: Int, gpus: Int = 0)
 
   private def registerMockExecutor(executorId: String, slaveId: String, cores: Integer) = {

http://git-wip-us.apache.org/repos/asf/spark/blob/5415963d/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala
index f49d7c2..442c439 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.scheduler.cluster.mesos
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.mesos.config
 
 class MesosSchedulerBackendUtilSuite extends SparkFunSuite {
 
@@ -26,7 +27,8 @@ class MesosSchedulerBackendUtilSuite extends SparkFunSuite {
     conf.set("spark.mesos.executor.docker.parameters", "a,b")
     conf.set("spark.mesos.executor.docker.image", "test")
 
-    val containerInfo = MesosSchedulerBackendUtil.containerInfo(conf)
+    val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo(
+      conf)
     val params = containerInfo.getDocker.getParametersList
 
     assert(params.size() == 0)
@@ -37,7 +39,8 @@ class MesosSchedulerBackendUtilSuite extends SparkFunSuite {
     conf.set("spark.mesos.executor.docker.parameters", "a=1,b=2,c=3")
     conf.set("spark.mesos.executor.docker.image", "test")
 
-    val containerInfo = MesosSchedulerBackendUtil.containerInfo(conf)
+    val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo(
+      conf)
     val params = containerInfo.getDocker.getParametersList
     assert(params.size() == 3)
     assert(params.get(0).getKey == "a")

http://git-wip-us.apache.org/repos/asf/spark/blob/5415963d/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
index 833db0c..5636ac5 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
@@ -24,9 +24,12 @@ import scala.collection.JavaConverters._
 import org.apache.mesos.Protos._
 import org.apache.mesos.Protos.Value.{Range => MesosRange, Ranges, Scalar}
 import org.apache.mesos.SchedulerDriver
+import org.apache.mesos.protobuf.ByteString
 import org.mockito.{ArgumentCaptor, Matchers}
 import org.mockito.Mockito._
 
+import org.apache.spark.deploy.mesos.config.MesosSecretConfig
+
 object Utils {
 
   val TEST_FRAMEWORK_ID = FrameworkID.newBuilder()
@@ -105,4 +108,108 @@ object Utils {
   def createTaskId(taskId: String): TaskID = {
     TaskID.newBuilder().setValue(taskId).build()
   }
+
+  def configEnvBasedRefSecrets(secretConfig: MesosSecretConfig): Map[String, String] = {
+    val secretName = "/path/to/secret,/anothersecret"
+    val envKey = "SECRET_ENV_KEY,PASSWORD"
+    Map(
+      secretConfig.SECRET_NAMES.key -> secretName,
+      secretConfig.SECRET_ENVKEYS.key -> envKey
+    )
+  }
+
+  def verifyEnvBasedRefSecrets(launchedTasks: List[TaskInfo]): Unit = {
+    val envVars = launchedTasks.head
+      .getCommand
+      .getEnvironment
+      .getVariablesList
+      .asScala
+    assert(envVars
+      .count(!_.getName.startsWith("SPARK_")) == 2)  // user-defined secret env vars
+    val variableOne = envVars.filter(_.getName == "SECRET_ENV_KEY").head
+    assert(variableOne.getSecret.isInitialized)
+    assert(variableOne.getSecret.getType == Secret.Type.REFERENCE)
+    assert(variableOne.getSecret.getReference.getName == "/path/to/secret")
+    assert(variableOne.getType == Environment.Variable.Type.SECRET)
+    val variableTwo = envVars.filter(_.getName == "PASSWORD").head
+    assert(variableTwo.getSecret.isInitialized)
+    assert(variableTwo.getSecret.getType == Secret.Type.REFERENCE)
+    assert(variableTwo.getSecret.getReference.getName == "/anothersecret")
+    assert(variableTwo.getType == Environment.Variable.Type.SECRET)
+  }
+
+  def configEnvBasedValueSecrets(secretConfig: MesosSecretConfig): Map[String, String] = {
+    val secretValues = "user,password"
+    val envKeys = "USER,PASSWORD"
+    Map(
+      secretConfig.SECRET_VALUES.key -> secretValues,
+      secretConfig.SECRET_ENVKEYS.key -> envKeys
+    )
+  }
+
+  def verifyEnvBasedValueSecrets(launchedTasks: List[TaskInfo]): Unit = {
+    val envVars = launchedTasks.head
+      .getCommand
+      .getEnvironment
+      .getVariablesList
+      .asScala
+    assert(envVars
+      .count(!_.getName.startsWith("SPARK_")) == 2)  // user-defined secret env vars
+    val variableOne = envVars.filter(_.getName == "USER").head
+    assert(variableOne.getSecret.isInitialized)
+    assert(variableOne.getSecret.getType == Secret.Type.VALUE)
+    assert(variableOne.getSecret.getValue.getData == ByteString.copyFrom("user".getBytes))
+    assert(variableOne.getType == Environment.Variable.Type.SECRET)
+    val variableTwo = envVars.filter(_.getName == "PASSWORD").head
+    assert(variableTwo.getSecret.isInitialized)
+    assert(variableTwo.getSecret.getType == Secret.Type.VALUE)
+    assert(variableTwo.getSecret.getValue.getData == ByteString.copyFrom("password".getBytes))
+    assert(variableTwo.getType == Environment.Variable.Type.SECRET)
+  }
+
+  def configFileBasedRefSecrets(secretConfig: MesosSecretConfig): Map[String, String] = {
+    val secretName = "/path/to/secret,/anothersecret"
+    val secretPath = "/topsecret,/mypassword"
+    Map(
+      secretConfig.SECRET_NAMES.key -> secretName,
+      secretConfig.SECRET_FILENAMES.key -> secretPath
+    )
+  }
+
+  def verifyFileBasedRefSecrets(launchedTasks: List[TaskInfo]): Unit = {
+    val volumes = launchedTasks.head.getContainer.getVolumesList
+    assert(volumes.size() == 2)
+    val secretVolOne = volumes.get(0)
+    assert(secretVolOne.getContainerPath == "/topsecret")
+    assert(secretVolOne.getSource.getSecret.getType == Secret.Type.REFERENCE)
+    assert(secretVolOne.getSource.getSecret.getReference.getName == "/path/to/secret")
+    val secretVolTwo = volumes.get(1)
+    assert(secretVolTwo.getContainerPath == "/mypassword")
+    assert(secretVolTwo.getSource.getSecret.getType == Secret.Type.REFERENCE)
+    assert(secretVolTwo.getSource.getSecret.getReference.getName == "/anothersecret")
+  }
+
+  def configFileBasedValueSecrets(secretConfig: MesosSecretConfig): Map[String, String] = {
+    val secretValues = "user,password"
+    val secretPath = "/whoami,/mypassword"
+    Map(
+      secretConfig.SECRET_VALUES.key -> secretValues,
+      secretConfig.SECRET_FILENAMES.key -> secretPath
+    )
+  }
+
+  def verifyFileBasedValueSecrets(launchedTasks: List[TaskInfo]): Unit = {
+    val volumes = launchedTasks.head.getContainer.getVolumesList
+    assert(volumes.size() == 2)
+    val secretVolOne = volumes.get(0)
+    assert(secretVolOne.getContainerPath == "/whoami")
+    assert(secretVolOne.getSource.getSecret.getType == Secret.Type.VALUE)
+    assert(secretVolOne.getSource.getSecret.getValue.getData ==
+      ByteString.copyFrom("user".getBytes))
+    val secretVolTwo = volumes.get(1)
+    assert(secretVolTwo.getContainerPath == "/mypassword")
+    assert(secretVolTwo.getSource.getSecret.getType == Secret.Type.VALUE)
+    assert(secretVolTwo.getSource.getSecret.getValue.getData ==
+      ByteString.copyFrom("password".getBytes))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org