You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/08/31 17:59:53 UTC

spark git commit: [SPARK-20812][MESOS] Add secrets support to the dispatcher

Repository: spark
Updated Branches:
  refs/heads/master 9696580c3 -> fc45c2c88


[SPARK-20812][MESOS] Add secrets support to the dispatcher

Mesos has secrets primitives for environment and file-based secrets, this PR adds that functionality to the Spark dispatcher and the appropriate configuration flags.
Unit tested and manually tested against a DC/OS cluster with Mesos 1.4.

Author: ArtRand <ar...@soe.ucsc.edu>

Closes #18837 from ArtRand/spark-20812-dispatcher-secrets-and-labels.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc45c2c8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc45c2c8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc45c2c8

Branch: refs/heads/master
Commit: fc45c2c88a838b8f46659ebad2a8f3a9923bc95f
Parents: 9696580
Author: ArtRand <ar...@soe.ucsc.edu>
Authored: Thu Aug 31 10:58:13 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Aug 31 10:58:41 2017 -0700

----------------------------------------------------------------------
 dev/deps/spark-deps-hadoop-2.6                  |   2 +-
 dev/deps/spark-deps-hadoop-2.7                  |   2 +-
 docs/running-on-mesos.md                        |  43 ++++-
 docs/security.md                                |   3 +
 resource-managers/mesos/pom.xml                 |   2 +-
 .../org/apache/spark/deploy/mesos/config.scala  |  33 +++-
 .../cluster/mesos/MesosClusterScheduler.scala   | 136 +++++++++++++++-
 .../mesos/MesosSchedulerBackendUtil.scala       |   7 +-
 .../cluster/mesos/MesosSchedulerUtils.scala     |  16 +-
 .../mesos/MesosClusterSchedulerSuite.scala      | 162 ++++++++++++++++++-
 10 files changed, 386 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fc45c2c8/dev/deps/spark-deps-hadoop-2.6
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index de17507..e481b4d 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -138,7 +138,7 @@ lz4-java-1.4.0.jar
 machinist_2.11-0.6.1.jar
 macro-compat_2.11-1.1.1.jar
 mail-1.4.7.jar
-mesos-1.0.0-shaded-protobuf.jar
+mesos-1.3.0-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/fc45c2c8/dev/deps/spark-deps-hadoop-2.7
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index da826a7..b8046b1 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -139,7 +139,7 @@ lz4-java-1.4.0.jar
 machinist_2.11-0.6.1.jar
 macro-compat_2.11-1.1.1.jar
 mail-1.4.7.jar
-mesos-1.0.0-shaded-protobuf.jar
+mesos-1.3.0-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/fc45c2c8/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index c12b858..e0944bc 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -33,7 +33,8 @@ To get started, follow the steps below to install Mesos and deploy Spark jobs vi
 # Installing Mesos
 
 Spark {{site.SPARK_VERSION}} is designed for use with Mesos {{site.MESOS_VERSION}} or newer and does not
-require any special patches of Mesos.
+require any special patches of Mesos. File and environment-based secrets support requires Mesos 1.3.0 or
+newer.
 
 If you already have a Mesos cluster running, you can skip this Mesos installation step.
 
@@ -430,7 +431,8 @@ See the [configuration page](configuration.html) for information on Spark config
   <td><code>spark.mesos.secret</code></td>
   <td>(none)</td>
   <td>
-    Set the secret with which Spark framework will use to authenticate with Mesos.
+    Set the secret with which Spark framework will use to authenticate with Mesos. Used, for example, when
+    authenticating with the registry.
   </td>
 </tr>
 <tr>
@@ -483,6 +485,43 @@ See the [configuration page](configuration.html) for information on Spark config
 </tr>
 
 <tr>
+  <td><code>spark.mesos.driver.secret.envkeys</code></td>
+  <td><code>(none)</code></td>
+  <td>
+    A comma-separated list that, if set, the contents of the secret referenced
+    by spark.mesos.driver.secret.names or spark.mesos.driver.secret.values will be
+    set to the provided environment variable in the driver's process.
+  </td>
+  </tr>
+  <tr>
+<td><code>spark.mesos.driver.secret.filenames</code></td>
+  <td><code>(none)</code></td>
+  <td>
+    A comma-separated list that, if set, the contents of the secret referenced by
+    spark.mesos.driver.secret.names or spark.mesos.driver.secret.values will be
+    written to the provided file. Paths are relative to the container's work
+    directory.  Absolute paths must already exist.  Consult the Mesos Secret
+    protobuf for more information.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.mesos.driver.secret.names</code></td>
+  <td><code>(none)</code></td>
+  <td>
+    A comma-separated list of secret references. Consult the Mesos Secret
+    protobuf for more information.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.mesos.driver.secret.values</code></td>
+  <td><code>(none)</code></td>
+  <td>
+    A comma-separated list of secret values. Consult the Mesos Secret
+    protobuf for more information.
+  </td>
+</tr>
+
+<tr>
   <td><code>spark.mesos.driverEnv.[EnvironmentVariableName]</code></td>
   <td><code>(none)</code></td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/fc45c2c8/docs/security.md
----------------------------------------------------------------------
diff --git a/docs/security.md b/docs/security.md
index 9eda428..1d00400 100644
--- a/docs/security.md
+++ b/docs/security.md
@@ -73,6 +73,9 @@ For long-running apps like Spark Streaming apps to be able to write to HDFS, it
 ### Standalone mode
 The user needs to provide key-stores and configuration options for master and workers. They have to be set by attaching appropriate Java system properties in `SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` environment variables, or just in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors to use the SSL settings inherited from the worker which spawned that executor. It can be accomplished by setting `spark.ssl.useNodeLocalConf` to `true`. If that parameter is set, the settings provided by user on the client side, are not used by the executors.
 
+### Mesos mode
+Mesos 1.3.0 and newer supports `Secrets` primitives as both file-based and environment based secrets. Spark allows the specification of file-based and environment variable based secrets with the `spark.mesos.driver.secret.filenames` and `spark.mesos.driver.secret.envkeys`, respectively. Depending on the secret store backend secrets can be passed by reference or by value with the `spark.mesos.driver.secret.names` and `spark.mesos.driver.secret.values` configuration properties, respectively. Reference type secrets are served by the secret store and referred to by name, for example `/mysecret`. Value type secrets are passed on the command line and translated into their appropriate files or environment variables. 
+
 ### Preparing the key-stores
 Key-stores can be generated by `keytool` program. The reference documentation for this tool is
 [here](https://docs.oracle.com/javase/7/docs/technotes/tools/solaris/keytool.html). The most basic

http://git-wip-us.apache.org/repos/asf/spark/blob/fc45c2c8/resource-managers/mesos/pom.xml
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/pom.xml b/resource-managers/mesos/pom.xml
index 2aa3228..de8f1c9 100644
--- a/resource-managers/mesos/pom.xml
+++ b/resource-managers/mesos/pom.xml
@@ -29,7 +29,7 @@
   <name>Spark Project Mesos</name>
   <properties>
     <sbt.project.name>mesos</sbt.project.name>
-    <mesos.version>1.0.0</mesos.version>
+    <mesos.version>1.3.0</mesos.version>
     <mesos.classifier>shaded-protobuf</mesos.classifier>
   </properties>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fc45c2c8/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
index a5015b9..7e85de9 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
@@ -58,12 +58,43 @@ package object config {
 
   private[spark] val DRIVER_LABELS =
     ConfigBuilder("spark.mesos.driver.labels")
-      .doc("Mesos labels to add to the driver.  Labels are free-form key-value pairs.  Key-value " +
+      .doc("Mesos labels to add to the driver.  Labels are free-form key-value pairs. Key-value " +
         "pairs should be separated by a colon, and commas used to list more than one." +
         "Ex. key:value,key2:value2")
       .stringConf
       .createOptional
 
+  private[spark] val SECRET_NAME =
+    ConfigBuilder("spark.mesos.driver.secret.names")
+      .doc("A comma-separated list of secret reference names. Consult the Mesos Secret protobuf " +
+        "for more information.")
+      .stringConf
+      .toSequence
+      .createOptional
+
+  private[spark] val SECRET_VALUE =
+    ConfigBuilder("spark.mesos.driver.secret.values")
+      .doc("A comma-separated list of secret values.")
+      .stringConf
+      .toSequence
+      .createOptional
+
+  private[spark] val SECRET_ENVKEY =
+    ConfigBuilder("spark.mesos.driver.secret.envkeys")
+      .doc("A comma-separated list of the environment variables to contain the secrets." +
+        "The environment variable will be set on the driver.")
+      .stringConf
+      .toSequence
+      .createOptional
+
+  private[spark] val SECRET_FILENAME =
+    ConfigBuilder("spark.mesos.driver.secret.filenames")
+      .doc("A comma-seperated list of file paths secret will be written to.  Consult the Mesos " +
+        "Secret protobuf for more information.")
+      .stringConf
+      .toSequence
+      .createOptional
+
   private[spark] val DRIVER_FAILOVER_TIMEOUT =
     ConfigBuilder("spark.mesos.driver.failoverTimeout")
       .doc("Amount of time in seconds that the master will wait to hear from the driver, " +

http://git-wip-us.apache.org/repos/asf/spark/blob/fc45c2c8/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 9ee9cb1..ec533f9 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -28,6 +28,7 @@ import org.apache.mesos.{Scheduler, SchedulerDriver}
 import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
 import org.apache.mesos.Protos.Environment.Variable
 import org.apache.mesos.Protos.TaskStatus.Reason
+import org.apache.mesos.protobuf.ByteString
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
 import org.apache.spark.deploy.mesos.MesosDriverDescription
@@ -386,12 +387,46 @@ private[spark] class MesosClusterScheduler(
     val env = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") ++ commandEnv
 
     val envBuilder = Environment.newBuilder()
+
+    // add normal environment variables
     env.foreach { case (k, v) =>
       envBuilder.addVariables(Variable.newBuilder().setName(k).setValue(v))
     }
+
+    // add secret environment variables
+    getSecretEnvVar(desc).foreach { variable =>
+      if (variable.getSecret.getReference.isInitialized) {
+        logInfo(s"Setting reference secret ${variable.getSecret.getReference.getName}" +
+          s"on file ${variable.getName}")
+      } else {
+        logInfo(s"Setting secret on environment variable name=${variable.getName}")
+      }
+      envBuilder.addVariables(variable)
+    }
+
     envBuilder.build()
   }
 
+  private def getSecretEnvVar(desc: MesosDriverDescription): List[Variable] = {
+    val secrets = getSecrets(desc)
+    val secretEnvKeys = desc.conf.get(config.SECRET_ENVKEY).getOrElse(Nil)
+    if (illegalSecretInput(secretEnvKeys, secrets)) {
+      throw new SparkException(
+        s"Need to give equal numbers of secrets and environment keys " +
+          s"for environment-based reference secrets got secrets $secrets, " +
+          s"and keys $secretEnvKeys")
+    }
+
+    secrets.zip(secretEnvKeys).map {
+      case (s, k) =>
+        Variable.newBuilder()
+          .setName(k)
+          .setType(Variable.Type.SECRET)
+          .setSecret(s)
+          .build
+    }.toList
+  }
+
   private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = {
     val confUris = List(conf.getOption("spark.mesos.uris"),
       desc.conf.getOption("spark.mesos.uris"),
@@ -529,18 +564,104 @@ private[spark] class MesosClusterScheduler(
 
     val appName = desc.conf.get("spark.app.name")
 
+    val driverLabels = MesosProtoUtils.mesosLabels(desc.conf.get(config.DRIVER_LABELS)
+      .getOrElse(""))
+
     TaskInfo.newBuilder()
       .setTaskId(taskId)
       .setName(s"Driver for ${appName}")
       .setSlaveId(offer.offer.getSlaveId)
       .setCommand(buildDriverCommand(desc))
+      .setContainer(getContainerInfo(desc))
       .addAllResources(cpuResourcesToUse.asJava)
       .addAllResources(memResourcesToUse.asJava)
-      .setLabels(MesosProtoUtils.mesosLabels(desc.conf.get(config.DRIVER_LABELS).getOrElse("")))
-      .setContainer(MesosSchedulerBackendUtil.containerInfo(desc.conf))
+      .setLabels(driverLabels)
       .build
   }
 
+  private def getContainerInfo(desc: MesosDriverDescription): ContainerInfo.Builder = {
+    val containerInfo = MesosSchedulerBackendUtil.containerInfo(desc.conf)
+
+    getSecretVolume(desc).foreach { volume =>
+      if (volume.getSource.getSecret.getReference.isInitialized) {
+        logInfo(s"Setting reference secret ${volume.getSource.getSecret.getReference.getName}" +
+          s"on file ${volume.getContainerPath}")
+      } else {
+        logInfo(s"Setting secret on file name=${volume.getContainerPath}")
+      }
+      containerInfo.addVolumes(volume)
+    }
+
+    containerInfo
+  }
+
+
+  private def getSecrets(desc: MesosDriverDescription): Seq[Secret] = {
+    def createValueSecret(data: String): Secret = {
+      Secret.newBuilder()
+        .setType(Secret.Type.VALUE)
+        .setValue(Secret.Value.newBuilder().setData(ByteString.copyFrom(data.getBytes)))
+        .build()
+    }
+
+    def createReferenceSecret(name: String): Secret = {
+      Secret.newBuilder()
+        .setReference(Secret.Reference.newBuilder().setName(name))
+        .setType(Secret.Type.REFERENCE)
+        .build()
+    }
+
+    val referenceSecrets: Seq[Secret] =
+      desc.conf.get(config.SECRET_NAME).getOrElse(Nil).map(s => createReferenceSecret(s))
+
+    val valueSecrets: Seq[Secret] = {
+      desc.conf.get(config.SECRET_VALUE).getOrElse(Nil).map(s => createValueSecret(s))
+    }
+
+    if (valueSecrets.nonEmpty && referenceSecrets.nonEmpty) {
+      throw new SparkException("Cannot specify VALUE type secrets and REFERENCE types ones")
+    }
+
+    if (referenceSecrets.nonEmpty) referenceSecrets else valueSecrets
+  }
+
+  private def illegalSecretInput(dest: Seq[String], s: Seq[Secret]): Boolean = {
+    if (dest.isEmpty) {  // no destination set (ie not using secrets of this type
+      return false
+    }
+    if (dest.nonEmpty && s.nonEmpty) {
+      // make sure there is a destination for each secret of this type
+      if (dest.length != s.length) {
+        return true
+      }
+    }
+    false
+  }
+
+  private def getSecretVolume(desc: MesosDriverDescription): List[Volume] = {
+    val secrets = getSecrets(desc)
+    val secretPaths: Seq[String] =
+      desc.conf.get(config.SECRET_FILENAME).getOrElse(Nil)
+
+    if (illegalSecretInput(secretPaths, secrets)) {
+      throw new SparkException(
+        s"Need to give equal numbers of secrets and file paths for file-based " +
+          s"reference secrets got secrets $secrets, and paths $secretPaths")
+    }
+
+    secrets.zip(secretPaths).map {
+      case (s, p) =>
+        val source = Volume.Source.newBuilder()
+          .setType(Volume.Source.Type.SECRET)
+          .setSecret(s)
+        Volume.newBuilder()
+          .setContainerPath(p)
+          .setSource(source)
+          .setMode(Volume.Mode.RO)
+          .build
+    }.toList
+  }
+
   /**
    * This method takes all the possible candidates and attempt to schedule them with Mesos offers.
    * Every time a new task is scheduled, the afterLaunchCallback is called to perform post scheduled
@@ -584,9 +705,14 @@ private[spark] class MesosClusterScheduler(
         } catch {
           case e: SparkException =>
             afterLaunchCallback(submission.submissionId)
-            finishedDrivers += new MesosClusterSubmissionState(submission, TaskID.newBuilder().
-              setValue(submission.submissionId).build(), SlaveID.newBuilder().setValue("").
-              build(), None, null, None, getDriverFrameworkID(submission))
+            finishedDrivers += new MesosClusterSubmissionState(
+              submission,
+              TaskID.newBuilder().setValue(submission.submissionId).build(),
+              SlaveID.newBuilder().setValue("").build(),
+              None,
+              null,
+              None,
+              getDriverFrameworkID(submission))
             logError(s"Failed to launch the driver with id: ${submission.submissionId}, " +
               s"cpu: $driverCpu, mem: $driverMem, reason: ${e.getMessage}")
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/fc45c2c8/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index e5c1e80..f29e541 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -122,7 +122,7 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
     .toList
   }
 
-  def containerInfo(conf: SparkConf): ContainerInfo = {
+  def containerInfo(conf: SparkConf): ContainerInfo.Builder = {
     val containerType = if (conf.contains("spark.mesos.executor.docker.image") &&
       conf.get("spark.mesos.containerizer", "docker") == "docker") {
       ContainerInfo.Type.DOCKER
@@ -149,8 +149,7 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
         .getOrElse(List.empty)
 
       if (containerType == ContainerInfo.Type.DOCKER) {
-        containerInfo
-          .setDocker(dockerInfo(image, forcePullImage, portMaps, params))
+        containerInfo.setDocker(dockerInfo(image, forcePullImage, portMaps, params))
       } else {
         containerInfo.setMesos(mesosInfo(image, forcePullImage))
       }
@@ -171,7 +170,7 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
       containerInfo.addNetworkInfos(info)
     }
 
-    containerInfo.build()
+    containerInfo
   }
 
   private def dockerInfo(

http://git-wip-us.apache.org/repos/asf/spark/blob/fc45c2c8/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 7ec116c..6fcb30a 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -510,12 +510,20 @@ trait MesosSchedulerUtils extends Logging {
   }
 
   def mesosToTaskState(state: MesosTaskState): TaskState.TaskState = state match {
-    case MesosTaskState.TASK_STAGING | MesosTaskState.TASK_STARTING => TaskState.LAUNCHING
-    case MesosTaskState.TASK_RUNNING | MesosTaskState.TASK_KILLING => TaskState.RUNNING
+    case MesosTaskState.TASK_STAGING |
+         MesosTaskState.TASK_STARTING => TaskState.LAUNCHING
+    case MesosTaskState.TASK_RUNNING |
+         MesosTaskState.TASK_KILLING => TaskState.RUNNING
     case MesosTaskState.TASK_FINISHED => TaskState.FINISHED
-    case MesosTaskState.TASK_FAILED => TaskState.FAILED
+    case MesosTaskState.TASK_FAILED |
+         MesosTaskState.TASK_GONE |
+         MesosTaskState.TASK_GONE_BY_OPERATOR => TaskState.FAILED
     case MesosTaskState.TASK_KILLED => TaskState.KILLED
-    case MesosTaskState.TASK_LOST | MesosTaskState.TASK_ERROR => TaskState.LOST
+    case MesosTaskState.TASK_LOST |
+         MesosTaskState.TASK_ERROR |
+         MesosTaskState.TASK_DROPPED |
+         MesosTaskState.TASK_UNKNOWN |
+         MesosTaskState.TASK_UNREACHABLE => TaskState.LOST
   }
 
   def taskStateToMesos(state: TaskState.TaskState): MesosTaskState = state match {

http://git-wip-us.apache.org/repos/asf/spark/blob/fc45c2c8/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index 50bb501..f0f99e9 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -21,9 +21,10 @@ import java.util.{Collection, Collections, Date}
 
 import scala.collection.JavaConverters._
 
-import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
+import org.apache.mesos.Protos.{Environment, Secret, TaskState => MesosTaskState, _}
 import org.apache.mesos.Protos.Value.{Scalar, Type}
 import org.apache.mesos.SchedulerDriver
+import org.apache.mesos.protobuf.ByteString
 import org.mockito.{ArgumentCaptor, Matchers}
 import org.mockito.Mockito._
 import org.scalatest.mock.MockitoSugar
@@ -338,4 +339,163 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
 
     verify(driver, times(1)).declineOffer(offerId, filter)
   }
+
+  test("Creates an env-based reference secrets.") {
+    setScheduler()
+
+    val mem = 1000
+    val cpu = 1
+    val secretName = "/path/to/secret,/anothersecret"
+    val envKey = "SECRET_ENV_KEY,PASSWORD"
+    val driverDesc = new MesosDriverDescription(
+      "d1",
+      "jar",
+      mem,
+      cpu,
+      true,
+      command,
+      Map("spark.mesos.executor.home" -> "test",
+        "spark.app.name" -> "test",
+        "spark.mesos.driver.secret.names" -> secretName,
+        "spark.mesos.driver.secret.envkeys" -> envKey),
+      "s1",
+      new Date())
+    val response = scheduler.submitDriver(driverDesc)
+    assert(response.success)
+    val offer = Utils.createOffer("o1", "s1", mem, cpu)
+    scheduler.resourceOffers(driver, Collections.singletonList(offer))
+    val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
+    assert(launchedTasks.head
+      .getCommand
+      .getEnvironment
+      .getVariablesCount == 3)  // SPARK_SUBMIT_OPS and the secret
+    val variableOne = launchedTasks.head.getCommand.getEnvironment
+      .getVariablesList.asScala.filter(_.getName == "SECRET_ENV_KEY").head
+    assert(variableOne.getSecret.isInitialized)
+    assert(variableOne.getSecret.getType == Secret.Type.REFERENCE)
+    assert(variableOne.getSecret.getReference.getName == "/path/to/secret")
+    assert(variableOne.getType == Environment.Variable.Type.SECRET)
+    val variableTwo = launchedTasks.head.getCommand.getEnvironment
+      .getVariablesList.asScala.filter(_.getName == "PASSWORD").head
+    assert(variableTwo.getSecret.isInitialized)
+    assert(variableTwo.getSecret.getType == Secret.Type.REFERENCE)
+    assert(variableTwo.getSecret.getReference.getName == "/anothersecret")
+    assert(variableTwo.getType == Environment.Variable.Type.SECRET)
+  }
+
+  test("Creates an env-based value secrets.") {
+    setScheduler()
+    val mem = 1000
+    val cpu = 1
+    val secretValues = "user,password"
+    val envKeys = "USER,PASSWORD"
+    val driverDesc = new MesosDriverDescription(
+      "d1",
+      "jar",
+      mem,
+      cpu,
+      true,
+      command,
+      Map("spark.mesos.executor.home" -> "test",
+        "spark.app.name" -> "test",
+        "spark.mesos.driver.secret.values" -> secretValues,
+        "spark.mesos.driver.secret.envkeys" -> envKeys),
+      "s1",
+      new Date())
+    val response = scheduler.submitDriver(driverDesc)
+    assert(response.success)
+    val offer = Utils.createOffer("o1", "s1", mem, cpu)
+    scheduler.resourceOffers(driver, Collections.singletonList(offer))
+    val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
+    assert(launchedTasks.head
+      .getCommand
+      .getEnvironment
+      .getVariablesCount == 3)  // SPARK_SUBMIT_OPS and the secret
+    val variableOne = launchedTasks.head.getCommand.getEnvironment
+      .getVariablesList.asScala.filter(_.getName == "USER").head
+    assert(variableOne.getSecret.isInitialized)
+    assert(variableOne.getSecret.getType == Secret.Type.VALUE)
+    assert(variableOne.getSecret.getValue.getData == ByteString.copyFrom("user".getBytes))
+    assert(variableOne.getType == Environment.Variable.Type.SECRET)
+    val variableTwo = launchedTasks.head.getCommand.getEnvironment
+      .getVariablesList.asScala.filter(_.getName == "PASSWORD").head
+    assert(variableTwo.getSecret.isInitialized)
+    assert(variableTwo.getSecret.getType == Secret.Type.VALUE)
+    assert(variableTwo.getSecret.getValue.getData == ByteString.copyFrom("password".getBytes))
+    assert(variableTwo.getType == Environment.Variable.Type.SECRET)
+  }
+
+  test("Creates file-based reference secrets.") {
+    setScheduler()
+    val mem = 1000
+    val cpu = 1
+    val secretName = "/path/to/secret,/anothersecret"
+    val secretPath = "/topsecret,/mypassword"
+    val driverDesc = new MesosDriverDescription(
+      "d1",
+      "jar",
+      mem,
+      cpu,
+      true,
+      command,
+      Map("spark.mesos.executor.home" -> "test",
+        "spark.app.name" -> "test",
+        "spark.mesos.driver.secret.names" -> secretName,
+        "spark.mesos.driver.secret.filenames" -> secretPath),
+      "s1",
+      new Date())
+    val response = scheduler.submitDriver(driverDesc)
+    assert(response.success)
+    val offer = Utils.createOffer("o1", "s1", mem, cpu)
+    scheduler.resourceOffers(driver, Collections.singletonList(offer))
+    val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
+    val volumes = launchedTasks.head.getContainer.getVolumesList
+    assert(volumes.size() == 2)
+    val secretVolOne = volumes.get(0)
+    assert(secretVolOne.getContainerPath == "/topsecret")
+    assert(secretVolOne.getSource.getSecret.getType == Secret.Type.REFERENCE)
+    assert(secretVolOne.getSource.getSecret.getReference.getName == "/path/to/secret")
+    val secretVolTwo = volumes.get(1)
+    assert(secretVolTwo.getContainerPath == "/mypassword")
+    assert(secretVolTwo.getSource.getSecret.getType == Secret.Type.REFERENCE)
+    assert(secretVolTwo.getSource.getSecret.getReference.getName == "/anothersecret")
+  }
+
+  test("Creates a file-based value secrets.") {
+    setScheduler()
+    val mem = 1000
+    val cpu = 1
+    val secretValues = "user,password"
+    val secretPath = "/whoami,/mypassword"
+    val driverDesc = new MesosDriverDescription(
+      "d1",
+      "jar",
+      mem,
+      cpu,
+      true,
+      command,
+      Map("spark.mesos.executor.home" -> "test",
+        "spark.app.name" -> "test",
+        "spark.mesos.driver.secret.values" -> secretValues,
+        "spark.mesos.driver.secret.filenames" -> secretPath),
+      "s1",
+      new Date())
+    val response = scheduler.submitDriver(driverDesc)
+    assert(response.success)
+    val offer = Utils.createOffer("o1", "s1", mem, cpu)
+    scheduler.resourceOffers(driver, Collections.singletonList(offer))
+    val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
+    val volumes = launchedTasks.head.getContainer.getVolumesList
+    assert(volumes.size() == 2)
+    val secretVolOne = volumes.get(0)
+    assert(secretVolOne.getContainerPath == "/whoami")
+    assert(secretVolOne.getSource.getSecret.getType == Secret.Type.VALUE)
+    assert(secretVolOne.getSource.getSecret.getValue.getData ==
+      ByteString.copyFrom("user".getBytes))
+    val secretVolTwo = volumes.get(1)
+    assert(secretVolTwo.getContainerPath == "/mypassword")
+    assert(secretVolTwo.getSource.getSecret.getType == Secret.Type.VALUE)
+    assert(secretVolTwo.getSource.getSecret.getValue.getData ==
+      ByteString.copyFrom("password".getBytes))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org