You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/08/31 17:59:53 UTC
spark git commit: [SPARK-20812][MESOS] Add secrets support to the
dispatcher
Repository: spark
Updated Branches:
refs/heads/master 9696580c3 -> fc45c2c88
[SPARK-20812][MESOS] Add secrets support to the dispatcher
Mesos has secrets primitives for environment and file-based secrets, this PR adds that functionality to the Spark dispatcher and the appropriate configuration flags.
Unit tested and manually tested against a DC/OS cluster with Mesos 1.4.
Author: ArtRand <ar...@soe.ucsc.edu>
Closes #18837 from ArtRand/spark-20812-dispatcher-secrets-and-labels.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc45c2c8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc45c2c8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc45c2c8
Branch: refs/heads/master
Commit: fc45c2c88a838b8f46659ebad2a8f3a9923bc95f
Parents: 9696580
Author: ArtRand <ar...@soe.ucsc.edu>
Authored: Thu Aug 31 10:58:13 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Aug 31 10:58:41 2017 -0700
----------------------------------------------------------------------
dev/deps/spark-deps-hadoop-2.6 | 2 +-
dev/deps/spark-deps-hadoop-2.7 | 2 +-
docs/running-on-mesos.md | 43 ++++-
docs/security.md | 3 +
resource-managers/mesos/pom.xml | 2 +-
.../org/apache/spark/deploy/mesos/config.scala | 33 +++-
.../cluster/mesos/MesosClusterScheduler.scala | 136 +++++++++++++++-
.../mesos/MesosSchedulerBackendUtil.scala | 7 +-
.../cluster/mesos/MesosSchedulerUtils.scala | 16 +-
.../mesos/MesosClusterSchedulerSuite.scala | 162 ++++++++++++++++++-
10 files changed, 386 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/fc45c2c8/dev/deps/spark-deps-hadoop-2.6
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index de17507..e481b4d 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -138,7 +138,7 @@ lz4-java-1.4.0.jar
machinist_2.11-0.6.1.jar
macro-compat_2.11-1.1.1.jar
mail-1.4.7.jar
-mesos-1.0.0-shaded-protobuf.jar
+mesos-1.3.0-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
http://git-wip-us.apache.org/repos/asf/spark/blob/fc45c2c8/dev/deps/spark-deps-hadoop-2.7
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index da826a7..b8046b1 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -139,7 +139,7 @@ lz4-java-1.4.0.jar
machinist_2.11-0.6.1.jar
macro-compat_2.11-1.1.1.jar
mail-1.4.7.jar
-mesos-1.0.0-shaded-protobuf.jar
+mesos-1.3.0-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
http://git-wip-us.apache.org/repos/asf/spark/blob/fc45c2c8/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index c12b858..e0944bc 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -33,7 +33,8 @@ To get started, follow the steps below to install Mesos and deploy Spark jobs vi
# Installing Mesos
Spark {{site.SPARK_VERSION}} is designed for use with Mesos {{site.MESOS_VERSION}} or newer and does not
-require any special patches of Mesos.
+require any special patches of Mesos. File and environment-based secrets support requires Mesos 1.3.0 or
+newer.
If you already have a Mesos cluster running, you can skip this Mesos installation step.
@@ -430,7 +431,8 @@ See the [configuration page](configuration.html) for information on Spark config
<td><code>spark.mesos.secret</code></td>
<td>(none)</td>
<td>
- Set the secret with which Spark framework will use to authenticate with Mesos.
+ Set the secret with which Spark framework will use to authenticate with Mesos. Used, for example, when
+ authenticating with the registry.
</td>
</tr>
<tr>
@@ -483,6 +485,43 @@ See the [configuration page](configuration.html) for information on Spark config
</tr>
<tr>
+ <td><code>spark.mesos.driver.secret.envkeys</code></td>
+ <td><code>(none)</code></td>
+ <td>
+ A comma-separated list that, if set, the contents of the secret referenced
+ by spark.mesos.driver.secret.names or spark.mesos.driver.secret.values will be
+ set to the provided environment variable in the driver's process.
+ </td>
+ </tr>
+ <tr>
+<td><code>spark.mesos.driver.secret.filenames</code></td>
+ <td><code>(none)</code></td>
+ <td>
+ A comma-separated list that, if set, the contents of the secret referenced by
+ spark.mesos.driver.secret.names or spark.mesos.driver.secret.values will be
+ written to the provided file. Paths are relative to the container's work
+ directory. Absolute paths must already exist. Consult the Mesos Secret
+ protobuf for more information.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.mesos.driver.secret.names</code></td>
+ <td><code>(none)</code></td>
+ <td>
+ A comma-separated list of secret references. Consult the Mesos Secret
+ protobuf for more information.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.mesos.driver.secret.values</code></td>
+ <td><code>(none)</code></td>
+ <td>
+ A comma-separated list of secret values. Consult the Mesos Secret
+ protobuf for more information.
+ </td>
+</tr>
+
+<tr>
<td><code>spark.mesos.driverEnv.[EnvironmentVariableName]</code></td>
<td><code>(none)</code></td>
<td>
http://git-wip-us.apache.org/repos/asf/spark/blob/fc45c2c8/docs/security.md
----------------------------------------------------------------------
diff --git a/docs/security.md b/docs/security.md
index 9eda428..1d00400 100644
--- a/docs/security.md
+++ b/docs/security.md
@@ -73,6 +73,9 @@ For long-running apps like Spark Streaming apps to be able to write to HDFS, it
### Standalone mode
The user needs to provide key-stores and configuration options for master and workers. They have to be set by attaching appropriate Java system properties in `SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` environment variables, or just in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors to use the SSL settings inherited from the worker which spawned that executor. It can be accomplished by setting `spark.ssl.useNodeLocalConf` to `true`. If that parameter is set, the settings provided by user on the client side, are not used by the executors.
+### Mesos mode
+Mesos 1.3.0 and newer supports `Secrets` primitives as both file-based and environment based secrets. Spark allows the specification of file-based and environment variable based secrets with the `spark.mesos.driver.secret.filenames` and `spark.mesos.driver.secret.envkeys`, respectively. Depending on the secret store backend secrets can be passed by reference or by value with the `spark.mesos.driver.secret.names` and `spark.mesos.driver.secret.values` configuration properties, respectively. Reference type secrets are served by the secret store and referred to by name, for example `/mysecret`. Value type secrets are passed on the command line and translated into their appropriate files or environment variables.
+
### Preparing the key-stores
Key-stores can be generated by `keytool` program. The reference documentation for this tool is
[here](https://docs.oracle.com/javase/7/docs/technotes/tools/solaris/keytool.html). The most basic
http://git-wip-us.apache.org/repos/asf/spark/blob/fc45c2c8/resource-managers/mesos/pom.xml
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/pom.xml b/resource-managers/mesos/pom.xml
index 2aa3228..de8f1c9 100644
--- a/resource-managers/mesos/pom.xml
+++ b/resource-managers/mesos/pom.xml
@@ -29,7 +29,7 @@
<name>Spark Project Mesos</name>
<properties>
<sbt.project.name>mesos</sbt.project.name>
- <mesos.version>1.0.0</mesos.version>
+ <mesos.version>1.3.0</mesos.version>
<mesos.classifier>shaded-protobuf</mesos.classifier>
</properties>
http://git-wip-us.apache.org/repos/asf/spark/blob/fc45c2c8/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
index a5015b9..7e85de9 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
@@ -58,12 +58,43 @@ package object config {
private[spark] val DRIVER_LABELS =
ConfigBuilder("spark.mesos.driver.labels")
- .doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value " +
+ .doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value " +
"pairs should be separated by a colon, and commas used to list more than one." +
"Ex. key:value,key2:value2")
.stringConf
.createOptional
+ private[spark] val SECRET_NAME =
+ ConfigBuilder("spark.mesos.driver.secret.names")
+ .doc("A comma-separated list of secret reference names. Consult the Mesos Secret protobuf " +
+ "for more information.")
+ .stringConf
+ .toSequence
+ .createOptional
+
+ private[spark] val SECRET_VALUE =
+ ConfigBuilder("spark.mesos.driver.secret.values")
+ .doc("A comma-separated list of secret values.")
+ .stringConf
+ .toSequence
+ .createOptional
+
+ private[spark] val SECRET_ENVKEY =
+ ConfigBuilder("spark.mesos.driver.secret.envkeys")
+ .doc("A comma-separated list of the environment variables to contain the secrets." +
+ "The environment variable will be set on the driver.")
+ .stringConf
+ .toSequence
+ .createOptional
+
+ private[spark] val SECRET_FILENAME =
+ ConfigBuilder("spark.mesos.driver.secret.filenames")
+ .doc("A comma-seperated list of file paths secret will be written to. Consult the Mesos " +
+ "Secret protobuf for more information.")
+ .stringConf
+ .toSequence
+ .createOptional
+
private[spark] val DRIVER_FAILOVER_TIMEOUT =
ConfigBuilder("spark.mesos.driver.failoverTimeout")
.doc("Amount of time in seconds that the master will wait to hear from the driver, " +
http://git-wip-us.apache.org/repos/asf/spark/blob/fc45c2c8/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 9ee9cb1..ec533f9 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -28,6 +28,7 @@ import org.apache.mesos.{Scheduler, SchedulerDriver}
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.Protos.Environment.Variable
import org.apache.mesos.Protos.TaskStatus.Reason
+import org.apache.mesos.protobuf.ByteString
import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
import org.apache.spark.deploy.mesos.MesosDriverDescription
@@ -386,12 +387,46 @@ private[spark] class MesosClusterScheduler(
val env = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") ++ commandEnv
val envBuilder = Environment.newBuilder()
+
+ // add normal environment variables
env.foreach { case (k, v) =>
envBuilder.addVariables(Variable.newBuilder().setName(k).setValue(v))
}
+
+ // add secret environment variables
+ getSecretEnvVar(desc).foreach { variable =>
+ if (variable.getSecret.getReference.isInitialized) {
+ logInfo(s"Setting reference secret ${variable.getSecret.getReference.getName}" +
+ s"on file ${variable.getName}")
+ } else {
+ logInfo(s"Setting secret on environment variable name=${variable.getName}")
+ }
+ envBuilder.addVariables(variable)
+ }
+
envBuilder.build()
}
+ private def getSecretEnvVar(desc: MesosDriverDescription): List[Variable] = {
+ val secrets = getSecrets(desc)
+ val secretEnvKeys = desc.conf.get(config.SECRET_ENVKEY).getOrElse(Nil)
+ if (illegalSecretInput(secretEnvKeys, secrets)) {
+ throw new SparkException(
+ s"Need to give equal numbers of secrets and environment keys " +
+ s"for environment-based reference secrets got secrets $secrets, " +
+ s"and keys $secretEnvKeys")
+ }
+
+ secrets.zip(secretEnvKeys).map {
+ case (s, k) =>
+ Variable.newBuilder()
+ .setName(k)
+ .setType(Variable.Type.SECRET)
+ .setSecret(s)
+ .build
+ }.toList
+ }
+
private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = {
val confUris = List(conf.getOption("spark.mesos.uris"),
desc.conf.getOption("spark.mesos.uris"),
@@ -529,18 +564,104 @@ private[spark] class MesosClusterScheduler(
val appName = desc.conf.get("spark.app.name")
+ val driverLabels = MesosProtoUtils.mesosLabels(desc.conf.get(config.DRIVER_LABELS)
+ .getOrElse(""))
+
TaskInfo.newBuilder()
.setTaskId(taskId)
.setName(s"Driver for ${appName}")
.setSlaveId(offer.offer.getSlaveId)
.setCommand(buildDriverCommand(desc))
+ .setContainer(getContainerInfo(desc))
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)
- .setLabels(MesosProtoUtils.mesosLabels(desc.conf.get(config.DRIVER_LABELS).getOrElse("")))
- .setContainer(MesosSchedulerBackendUtil.containerInfo(desc.conf))
+ .setLabels(driverLabels)
.build
}
+ private def getContainerInfo(desc: MesosDriverDescription): ContainerInfo.Builder = {
+ val containerInfo = MesosSchedulerBackendUtil.containerInfo(desc.conf)
+
+ getSecretVolume(desc).foreach { volume =>
+ if (volume.getSource.getSecret.getReference.isInitialized) {
+ logInfo(s"Setting reference secret ${volume.getSource.getSecret.getReference.getName}" +
+ s"on file ${volume.getContainerPath}")
+ } else {
+ logInfo(s"Setting secret on file name=${volume.getContainerPath}")
+ }
+ containerInfo.addVolumes(volume)
+ }
+
+ containerInfo
+ }
+
+
+ private def getSecrets(desc: MesosDriverDescription): Seq[Secret] = {
+ def createValueSecret(data: String): Secret = {
+ Secret.newBuilder()
+ .setType(Secret.Type.VALUE)
+ .setValue(Secret.Value.newBuilder().setData(ByteString.copyFrom(data.getBytes)))
+ .build()
+ }
+
+ def createReferenceSecret(name: String): Secret = {
+ Secret.newBuilder()
+ .setReference(Secret.Reference.newBuilder().setName(name))
+ .setType(Secret.Type.REFERENCE)
+ .build()
+ }
+
+ val referenceSecrets: Seq[Secret] =
+ desc.conf.get(config.SECRET_NAME).getOrElse(Nil).map(s => createReferenceSecret(s))
+
+ val valueSecrets: Seq[Secret] = {
+ desc.conf.get(config.SECRET_VALUE).getOrElse(Nil).map(s => createValueSecret(s))
+ }
+
+ if (valueSecrets.nonEmpty && referenceSecrets.nonEmpty) {
+ throw new SparkException("Cannot specify VALUE type secrets and REFERENCE types ones")
+ }
+
+ if (referenceSecrets.nonEmpty) referenceSecrets else valueSecrets
+ }
+
+ private def illegalSecretInput(dest: Seq[String], s: Seq[Secret]): Boolean = {
+ if (dest.isEmpty) { // no destination set (ie not using secrets of this type
+ return false
+ }
+ if (dest.nonEmpty && s.nonEmpty) {
+ // make sure there is a destination for each secret of this type
+ if (dest.length != s.length) {
+ return true
+ }
+ }
+ false
+ }
+
+ private def getSecretVolume(desc: MesosDriverDescription): List[Volume] = {
+ val secrets = getSecrets(desc)
+ val secretPaths: Seq[String] =
+ desc.conf.get(config.SECRET_FILENAME).getOrElse(Nil)
+
+ if (illegalSecretInput(secretPaths, secrets)) {
+ throw new SparkException(
+ s"Need to give equal numbers of secrets and file paths for file-based " +
+ s"reference secrets got secrets $secrets, and paths $secretPaths")
+ }
+
+ secrets.zip(secretPaths).map {
+ case (s, p) =>
+ val source = Volume.Source.newBuilder()
+ .setType(Volume.Source.Type.SECRET)
+ .setSecret(s)
+ Volume.newBuilder()
+ .setContainerPath(p)
+ .setSource(source)
+ .setMode(Volume.Mode.RO)
+ .build
+ }.toList
+ }
+
/**
* This method takes all the possible candidates and attempt to schedule them with Mesos offers.
* Every time a new task is scheduled, the afterLaunchCallback is called to perform post scheduled
@@ -584,9 +705,14 @@ private[spark] class MesosClusterScheduler(
} catch {
case e: SparkException =>
afterLaunchCallback(submission.submissionId)
- finishedDrivers += new MesosClusterSubmissionState(submission, TaskID.newBuilder().
- setValue(submission.submissionId).build(), SlaveID.newBuilder().setValue("").
- build(), None, null, None, getDriverFrameworkID(submission))
+ finishedDrivers += new MesosClusterSubmissionState(
+ submission,
+ TaskID.newBuilder().setValue(submission.submissionId).build(),
+ SlaveID.newBuilder().setValue("").build(),
+ None,
+ null,
+ None,
+ getDriverFrameworkID(submission))
logError(s"Failed to launch the driver with id: ${submission.submissionId}, " +
s"cpu: $driverCpu, mem: $driverMem, reason: ${e.getMessage}")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/fc45c2c8/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index e5c1e80..f29e541 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -122,7 +122,7 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
.toList
}
- def containerInfo(conf: SparkConf): ContainerInfo = {
+ def containerInfo(conf: SparkConf): ContainerInfo.Builder = {
val containerType = if (conf.contains("spark.mesos.executor.docker.image") &&
conf.get("spark.mesos.containerizer", "docker") == "docker") {
ContainerInfo.Type.DOCKER
@@ -149,8 +149,7 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
.getOrElse(List.empty)
if (containerType == ContainerInfo.Type.DOCKER) {
- containerInfo
- .setDocker(dockerInfo(image, forcePullImage, portMaps, params))
+ containerInfo.setDocker(dockerInfo(image, forcePullImage, portMaps, params))
} else {
containerInfo.setMesos(mesosInfo(image, forcePullImage))
}
@@ -171,7 +170,7 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
containerInfo.addNetworkInfos(info)
}
- containerInfo.build()
+ containerInfo
}
private def dockerInfo(
http://git-wip-us.apache.org/repos/asf/spark/blob/fc45c2c8/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 7ec116c..6fcb30a 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -510,12 +510,20 @@ trait MesosSchedulerUtils extends Logging {
}
def mesosToTaskState(state: MesosTaskState): TaskState.TaskState = state match {
- case MesosTaskState.TASK_STAGING | MesosTaskState.TASK_STARTING => TaskState.LAUNCHING
- case MesosTaskState.TASK_RUNNING | MesosTaskState.TASK_KILLING => TaskState.RUNNING
+ case MesosTaskState.TASK_STAGING |
+ MesosTaskState.TASK_STARTING => TaskState.LAUNCHING
+ case MesosTaskState.TASK_RUNNING |
+ MesosTaskState.TASK_KILLING => TaskState.RUNNING
case MesosTaskState.TASK_FINISHED => TaskState.FINISHED
- case MesosTaskState.TASK_FAILED => TaskState.FAILED
+ case MesosTaskState.TASK_FAILED |
+ MesosTaskState.TASK_GONE |
+ MesosTaskState.TASK_GONE_BY_OPERATOR => TaskState.FAILED
case MesosTaskState.TASK_KILLED => TaskState.KILLED
- case MesosTaskState.TASK_LOST | MesosTaskState.TASK_ERROR => TaskState.LOST
+ case MesosTaskState.TASK_LOST |
+ MesosTaskState.TASK_ERROR |
+ MesosTaskState.TASK_DROPPED |
+ MesosTaskState.TASK_UNKNOWN |
+ MesosTaskState.TASK_UNREACHABLE => TaskState.LOST
}
def taskStateToMesos(state: TaskState.TaskState): MesosTaskState = state match {
http://git-wip-us.apache.org/repos/asf/spark/blob/fc45c2c8/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index 50bb501..f0f99e9 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -21,9 +21,10 @@ import java.util.{Collection, Collections, Date}
import scala.collection.JavaConverters._
-import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
+import org.apache.mesos.Protos.{Environment, Secret, TaskState => MesosTaskState, _}
import org.apache.mesos.Protos.Value.{Scalar, Type}
import org.apache.mesos.SchedulerDriver
+import org.apache.mesos.protobuf.ByteString
import org.mockito.{ArgumentCaptor, Matchers}
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
@@ -338,4 +339,163 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
verify(driver, times(1)).declineOffer(offerId, filter)
}
+
+ test("Creates an env-based reference secrets.") {
+ setScheduler()
+
+ val mem = 1000
+ val cpu = 1
+ val secretName = "/path/to/secret,/anothersecret"
+ val envKey = "SECRET_ENV_KEY,PASSWORD"
+ val driverDesc = new MesosDriverDescription(
+ "d1",
+ "jar",
+ mem,
+ cpu,
+ true,
+ command,
+ Map("spark.mesos.executor.home" -> "test",
+ "spark.app.name" -> "test",
+ "spark.mesos.driver.secret.names" -> secretName,
+ "spark.mesos.driver.secret.envkeys" -> envKey),
+ "s1",
+ new Date())
+ val response = scheduler.submitDriver(driverDesc)
+ assert(response.success)
+ val offer = Utils.createOffer("o1", "s1", mem, cpu)
+ scheduler.resourceOffers(driver, Collections.singletonList(offer))
+ val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
+ assert(launchedTasks.head
+ .getCommand
+ .getEnvironment
+ .getVariablesCount == 3) // SPARK_SUBMIT_OPS and the secret
+ val variableOne = launchedTasks.head.getCommand.getEnvironment
+ .getVariablesList.asScala.filter(_.getName == "SECRET_ENV_KEY").head
+ assert(variableOne.getSecret.isInitialized)
+ assert(variableOne.getSecret.getType == Secret.Type.REFERENCE)
+ assert(variableOne.getSecret.getReference.getName == "/path/to/secret")
+ assert(variableOne.getType == Environment.Variable.Type.SECRET)
+ val variableTwo = launchedTasks.head.getCommand.getEnvironment
+ .getVariablesList.asScala.filter(_.getName == "PASSWORD").head
+ assert(variableTwo.getSecret.isInitialized)
+ assert(variableTwo.getSecret.getType == Secret.Type.REFERENCE)
+ assert(variableTwo.getSecret.getReference.getName == "/anothersecret")
+ assert(variableTwo.getType == Environment.Variable.Type.SECRET)
+ }
+
+ test("Creates an env-based value secrets.") {
+ setScheduler()
+ val mem = 1000
+ val cpu = 1
+ val secretValues = "user,password"
+ val envKeys = "USER,PASSWORD"
+ val driverDesc = new MesosDriverDescription(
+ "d1",
+ "jar",
+ mem,
+ cpu,
+ true,
+ command,
+ Map("spark.mesos.executor.home" -> "test",
+ "spark.app.name" -> "test",
+ "spark.mesos.driver.secret.values" -> secretValues,
+ "spark.mesos.driver.secret.envkeys" -> envKeys),
+ "s1",
+ new Date())
+ val response = scheduler.submitDriver(driverDesc)
+ assert(response.success)
+ val offer = Utils.createOffer("o1", "s1", mem, cpu)
+ scheduler.resourceOffers(driver, Collections.singletonList(offer))
+ val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
+ assert(launchedTasks.head
+ .getCommand
+ .getEnvironment
+ .getVariablesCount == 3) // SPARK_SUBMIT_OPS and the secret
+ val variableOne = launchedTasks.head.getCommand.getEnvironment
+ .getVariablesList.asScala.filter(_.getName == "USER").head
+ assert(variableOne.getSecret.isInitialized)
+ assert(variableOne.getSecret.getType == Secret.Type.VALUE)
+ assert(variableOne.getSecret.getValue.getData == ByteString.copyFrom("user".getBytes))
+ assert(variableOne.getType == Environment.Variable.Type.SECRET)
+ val variableTwo = launchedTasks.head.getCommand.getEnvironment
+ .getVariablesList.asScala.filter(_.getName == "PASSWORD").head
+ assert(variableTwo.getSecret.isInitialized)
+ assert(variableTwo.getSecret.getType == Secret.Type.VALUE)
+ assert(variableTwo.getSecret.getValue.getData == ByteString.copyFrom("password".getBytes))
+ assert(variableTwo.getType == Environment.Variable.Type.SECRET)
+ }
+
+ test("Creates file-based reference secrets.") {
+ setScheduler()
+ val mem = 1000
+ val cpu = 1
+ val secretName = "/path/to/secret,/anothersecret"
+ val secretPath = "/topsecret,/mypassword"
+ val driverDesc = new MesosDriverDescription(
+ "d1",
+ "jar",
+ mem,
+ cpu,
+ true,
+ command,
+ Map("spark.mesos.executor.home" -> "test",
+ "spark.app.name" -> "test",
+ "spark.mesos.driver.secret.names" -> secretName,
+ "spark.mesos.driver.secret.filenames" -> secretPath),
+ "s1",
+ new Date())
+ val response = scheduler.submitDriver(driverDesc)
+ assert(response.success)
+ val offer = Utils.createOffer("o1", "s1", mem, cpu)
+ scheduler.resourceOffers(driver, Collections.singletonList(offer))
+ val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
+ val volumes = launchedTasks.head.getContainer.getVolumesList
+ assert(volumes.size() == 2)
+ val secretVolOne = volumes.get(0)
+ assert(secretVolOne.getContainerPath == "/topsecret")
+ assert(secretVolOne.getSource.getSecret.getType == Secret.Type.REFERENCE)
+ assert(secretVolOne.getSource.getSecret.getReference.getName == "/path/to/secret")
+ val secretVolTwo = volumes.get(1)
+ assert(secretVolTwo.getContainerPath == "/mypassword")
+ assert(secretVolTwo.getSource.getSecret.getType == Secret.Type.REFERENCE)
+ assert(secretVolTwo.getSource.getSecret.getReference.getName == "/anothersecret")
+ }
+
+ test("Creates a file-based value secrets.") {
+ setScheduler()
+ val mem = 1000
+ val cpu = 1
+ val secretValues = "user,password"
+ val secretPath = "/whoami,/mypassword"
+ val driverDesc = new MesosDriverDescription(
+ "d1",
+ "jar",
+ mem,
+ cpu,
+ true,
+ command,
+ Map("spark.mesos.executor.home" -> "test",
+ "spark.app.name" -> "test",
+ "spark.mesos.driver.secret.values" -> secretValues,
+ "spark.mesos.driver.secret.filenames" -> secretPath),
+ "s1",
+ new Date())
+ val response = scheduler.submitDriver(driverDesc)
+ assert(response.success)
+ val offer = Utils.createOffer("o1", "s1", mem, cpu)
+ scheduler.resourceOffers(driver, Collections.singletonList(offer))
+ val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
+ val volumes = launchedTasks.head.getContainer.getVolumesList
+ assert(volumes.size() == 2)
+ val secretVolOne = volumes.get(0)
+ assert(secretVolOne.getContainerPath == "/whoami")
+ assert(secretVolOne.getSource.getSecret.getType == Secret.Type.VALUE)
+ assert(secretVolOne.getSource.getSecret.getValue.getData ==
+ ByteString.copyFrom("user".getBytes))
+ val secretVolTwo = volumes.get(1)
+ assert(secretVolTwo.getContainerPath == "/mypassword")
+ assert(secretVolTwo.getSource.getSecret.getType == Secret.Type.VALUE)
+ assert(secretVolTwo.getSource.getSecret.getValue.getData ==
+ ByteString.copyFrom("password".getBytes))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org