You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/08/17 22:47:15 UTC
spark git commit: [SPARK-16742] Mesos Kerberos Support
Repository: spark
Updated Branches:
refs/heads/master 6aad02d03 -> bfdc361ed
[SPARK-16742] Mesos Kerberos Support
## What changes were proposed in this pull request?
Add Kerberos Support to Mesos. This includes kinit and --keytab support, but does not include delegation token renewal.
## How was this patch tested?
Manually against a Secure DC/OS Apache HDFS cluster.
Author: ArtRand <ar...@soe.ucsc.edu>
Author: Michael Gummelt <mg...@mesosphere.io>
Closes #18519 from mgummelt/SPARK-16742-kerberos.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bfdc361e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bfdc361e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bfdc361e
Branch: refs/heads/master
Commit: bfdc361ededb2ed4e323f075fdc40ed004b7f41d
Parents: 6aad02d
Author: ArtRand <ar...@soe.ucsc.edu>
Authored: Thu Aug 17 15:47:07 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Aug 17 15:47:07 2017 -0700
----------------------------------------------------------------------
.../apache/spark/deploy/SparkHadoopUtil.scala | 29 ++++++++++++---
.../org/apache/spark/deploy/SparkSubmit.scala | 38 +++++++++++++++-----
.../security/HadoopDelegationTokenManager.scala | 8 +++++
.../executor/CoarseGrainedExecutorBackend.scala | 7 ++++
.../cluster/CoarseGrainedClusterMessage.scala | 3 +-
.../cluster/CoarseGrainedSchedulerBackend.scala | 33 ++++++++++++++---
resource-managers/mesos/pom.xml | 11 ++++++
.../MesosCoarseGrainedSchedulerBackend.scala | 12 ++++---
.../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 8 -----
9 files changed, 117 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 2a92ef9..6d507d8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy
-import java.io.{File, IOException}
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, IOException}
import java.security.PrivilegedExceptionAction
import java.text.DateFormat
import java.util.{Arrays, Comparator, Date, Locale}
@@ -147,14 +147,18 @@ class SparkHadoopUtil extends Logging {
def isYarnMode(): Boolean = { false }
- def getCurrentUserCredentials(): Credentials = { null }
-
- def addCurrentUserCredentials(creds: Credentials) {}
-
def addSecretKeyToUserCredentials(key: String, secret: String) {}
def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null }
+ def getCurrentUserCredentials(): Credentials = {
+ UserGroupInformation.getCurrentUser().getCredentials()
+ }
+
+ def addCurrentUserCredentials(creds: Credentials): Unit = {
+ UserGroupInformation.getCurrentUser.addCredentials(creds)
+ }
+
def loginUserFromKeytab(principalName: String, keytabFilename: String): Unit = {
if (!new File(keytabFilename).exists()) {
throw new SparkException(s"Keytab file: ${keytabFilename} does not exist")
@@ -425,6 +429,21 @@ class SparkHadoopUtil extends Logging {
s"${if (status.isDirectory) "d" else "-"}$perm")
false
}
+
+ def serialize(creds: Credentials): Array[Byte] = {
+ val byteStream = new ByteArrayOutputStream
+ val dataStream = new DataOutputStream(byteStream)
+ creds.writeTokenStorageToStream(dataStream)
+ byteStream.toByteArray
+ }
+
+ def deserialize(tokenBytes: Array[Byte]): Credentials = {
+ val tokensBuf = new ByteArrayInputStream(tokenBytes)
+
+ val creds = new Credentials()
+ creds.readTokenStorageStream(new DataInputStream(tokensBuf))
+ creds
+ }
}
object SparkHadoopUtil {
http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 6d744a0..e7e8fbc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -34,6 +34,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.ivy.Ivy
import org.apache.ivy.core.LogOptions
import org.apache.ivy.core.module.descriptor._
@@ -49,6 +50,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBibl
import org.apache.spark._
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.rest._
+import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util._
@@ -556,19 +558,25 @@ object SparkSubmit extends CommandLineUtils {
}
// assure a keytab is available from any place in a JVM
- if (clusterManager == YARN || clusterManager == LOCAL) {
+ if (clusterManager == YARN || clusterManager == LOCAL || clusterManager == MESOS) {
if (args.principal != null) {
- require(args.keytab != null, "Keytab must be specified when principal is specified")
- SparkHadoopUtil.get.loginUserFromKeytab(args.principal, args.keytab)
- // Add keytab and principal configurations in sysProps to make them available
- // for later use; e.g. in spark sql, the isolated class loader used to talk
- // to HiveMetastore will use these settings. They will be set as Java system
- // properties and then loaded by SparkConf
- sysProps.put("spark.yarn.keytab", args.keytab)
- sysProps.put("spark.yarn.principal", args.principal)
+ if (args.keytab != null) {
+ require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
+ // Add keytab and principal configurations in sysProps to make them available
+ // for later use; e.g. in spark sql, the isolated class loader used to talk
+ // to HiveMetastore will use these settings. They will be set as Java system
+ // properties and then loaded by SparkConf
+ sysProps.put("spark.yarn.keytab", args.keytab)
+ sysProps.put("spark.yarn.principal", args.principal)
+ UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
+ }
}
}
+ if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) {
+ setRMPrincipal(sysProps)
+ }
+
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
@@ -653,6 +661,18 @@ object SparkSubmit extends CommandLineUtils {
(childArgs, childClasspath, sysProps, childMainClass)
}
+ // [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with
+ // renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos mode, we
+ // must trick it into thinking we're YARN.
+ private def setRMPrincipal(sysProps: HashMap[String, String]): Unit = {
+ val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName
+ val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}"
+ // scalastyle:off println
+ printStream.println(s"Setting ${key} to ${shortUserName}")
+ // scalastyle:off println
+ sysProps.put(key, shortUserName)
+ }
+
/**
* Run the main method of the child class using the provided launch environment.
*
http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index 01cbfe1..c317c4f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -55,6 +55,14 @@ private[spark] class HadoopDelegationTokenManager(
logDebug(s"Using the following delegation token providers: " +
s"${delegationTokenProviders.keys.mkString(", ")}.")
+ /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */
+ def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
+ this(
+ sparkConf,
+ hadoopConf,
+ hadoopConf => Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf)))
+ }
+
private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
val providers = List(new HadoopFSDelegationTokenProvider(fileSystems),
new HiveDelegationTokenProvider,
http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index a2f1aa2..a5d60e9 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -26,6 +26,8 @@ import scala.collection.mutable
import scala.util.{Failure, Success}
import scala.util.control.NonFatal
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
@@ -219,6 +221,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
SparkHadoopUtil.get.startCredentialUpdater(driverConf)
}
+ cfg.hadoopDelegationCreds.foreach { hadoopCreds =>
+ val creds = SparkHadoopUtil.get.deserialize(hadoopCreds)
+ SparkHadoopUtil.get.addCurrentUserCredentials(creds)
+ }
+
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 89a9ad6..5d65731 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -32,7 +32,8 @@ private[spark] object CoarseGrainedClusterMessages {
case class SparkAppConfig(
sparkProperties: Seq[(String, String)],
- ioEncryptionKey: Option[Array[Byte]])
+ ioEncryptionKey: Option[Array[Byte]],
+ hadoopDelegationCreds: Option[Array[Byte]])
extends CoarseGrainedClusterMessage
case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage
http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index a46824a..a0ef209 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -24,7 +24,11 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Future
+import org.apache.hadoop.security.UserGroupInformation
+
import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.rpc._
import org.apache.spark.scheduler._
@@ -42,8 +46,8 @@ import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils}
*/
private[spark]
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
- extends ExecutorAllocationClient with SchedulerBackend with Logging
-{
+ extends ExecutorAllocationClient with SchedulerBackend with Logging {
+
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
protected val totalCoreCount = new AtomicInteger(0)
// Total number of executors that are currently registered
@@ -95,6 +99,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// The num of current max ExecutorId used to re-register appMaster
@volatile protected var currentExecutorIdCounter = 0
+ // hadoop token manager used by some sub-classes (e.g. Mesos)
+ def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None
+
+ // Hadoop delegation tokens to be sent to the executors.
+ val hadoopDelegationCreds: Option[Array[Byte]] = getHadoopDelegationCreds()
+
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
@@ -223,8 +233,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
context.reply(true)
case RetrieveSparkAppConfig =>
- val reply = SparkAppConfig(sparkProperties,
- SparkEnv.get.securityManager.getIOEncryptionKey())
+ val reply = SparkAppConfig(
+ sparkProperties,
+ SparkEnv.get.securityManager.getIOEncryptionKey(),
+ hadoopDelegationCreds)
context.reply(reply)
}
@@ -675,6 +687,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
driverEndpoint.send(KillExecutorsOnHost(host))
true
}
+
+ protected def getHadoopDelegationCreds(): Option[Array[Byte]] = {
+ if (UserGroupInformation.isSecurityEnabled && hadoopDelegationTokenManager.isDefined) {
+ hadoopDelegationTokenManager.map { manager =>
+ val creds = UserGroupInformation.getCurrentUser.getCredentials
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+ manager.obtainDelegationTokens(hadoopConf, creds)
+ SparkHadoopUtil.get.serialize(creds)
+ }
+ } else {
+ None
+ }
+ }
}
private[spark] object CoarseGrainedSchedulerBackend {
http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/resource-managers/mesos/pom.xml
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/pom.xml b/resource-managers/mesos/pom.xml
index 20b53f2..2aa3228 100644
--- a/resource-managers/mesos/pom.xml
+++ b/resource-managers/mesos/pom.xml
@@ -74,6 +74,17 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>${hive.group}</groupId>
+ <artifactId>hive-exec</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>${hive.group}</groupId>
+ <artifactId>hive-metastore</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
<!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive -->
<dependency>
<groupId>com.google.guava</groupId>
http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index e6b0957..5ecd466 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -22,15 +22,15 @@ import java.util.{Collections, List => JList}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.locks.ReentrantLock
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
+import org.apache.mesos.SchedulerDriver
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.Future
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
-import org.apache.mesos.SchedulerDriver
-
import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
import org.apache.spark.deploy.mesos.config._
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
@@ -55,8 +55,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
master: String,
securityManager: SecurityManager)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
- with org.apache.mesos.Scheduler
- with MesosSchedulerUtils {
+ with org.apache.mesos.Scheduler with MesosSchedulerUtils {
+
+ override def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] =
+ Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration))
// Blacklist a slave after this many failures
private val MAX_SLAVE_FAILURES = 2
http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 4fef439..3d9f99f 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -74,14 +74,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
}
- override def getCurrentUserCredentials(): Credentials = {
- UserGroupInformation.getCurrentUser().getCredentials()
- }
-
- override def addCurrentUserCredentials(creds: Credentials) {
- UserGroupInformation.getCurrentUser().addCredentials(creds)
- }
-
override def addSecretKeyToUserCredentials(key: String, secret: String) {
val creds = new Credentials()
creds.addSecretKey(new Text(key), secret.getBytes(UTF_8))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org