You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/08/17 22:47:15 UTC

spark git commit: [SPARK-16742] Mesos Kerberos Support

Repository: spark
Updated Branches:
  refs/heads/master 6aad02d03 -> bfdc361ed


[SPARK-16742] Mesos Kerberos Support

## What changes were proposed in this pull request?

Add Kerberos Support to Mesos.   This includes kinit and --keytab support, but does not include delegation token renewal.

## How was this patch tested?

Manually against a Secure DC/OS Apache HDFS cluster.

Author: ArtRand <ar...@soe.ucsc.edu>
Author: Michael Gummelt <mg...@mesosphere.io>

Closes #18519 from mgummelt/SPARK-16742-kerberos.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bfdc361e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bfdc361e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bfdc361e

Branch: refs/heads/master
Commit: bfdc361ededb2ed4e323f075fdc40ed004b7f41d
Parents: 6aad02d
Author: ArtRand <ar...@soe.ucsc.edu>
Authored: Thu Aug 17 15:47:07 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Aug 17 15:47:07 2017 -0700

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkHadoopUtil.scala   | 29 ++++++++++++---
 .../org/apache/spark/deploy/SparkSubmit.scala   | 38 +++++++++++++++-----
 .../security/HadoopDelegationTokenManager.scala |  8 +++++
 .../executor/CoarseGrainedExecutorBackend.scala |  7 ++++
 .../cluster/CoarseGrainedClusterMessage.scala   |  3 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala | 33 ++++++++++++++---
 resource-managers/mesos/pom.xml                 | 11 ++++++
 .../MesosCoarseGrainedSchedulerBackend.scala    | 12 ++++---
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  8 -----
 9 files changed, 117 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 2a92ef9..6d507d8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.deploy
 
-import java.io.{File, IOException}
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, IOException}
 import java.security.PrivilegedExceptionAction
 import java.text.DateFormat
 import java.util.{Arrays, Comparator, Date, Locale}
@@ -147,14 +147,18 @@ class SparkHadoopUtil extends Logging {
 
   def isYarnMode(): Boolean = { false }
 
-  def getCurrentUserCredentials(): Credentials = { null }
-
-  def addCurrentUserCredentials(creds: Credentials) {}
-
   def addSecretKeyToUserCredentials(key: String, secret: String) {}
 
   def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null }
 
+  def getCurrentUserCredentials(): Credentials = {
+    UserGroupInformation.getCurrentUser().getCredentials()
+  }
+
+  def addCurrentUserCredentials(creds: Credentials): Unit = {
+    UserGroupInformation.getCurrentUser.addCredentials(creds)
+  }
+
   def loginUserFromKeytab(principalName: String, keytabFilename: String): Unit = {
     if (!new File(keytabFilename).exists()) {
       throw new SparkException(s"Keytab file: ${keytabFilename} does not exist")
@@ -425,6 +429,21 @@ class SparkHadoopUtil extends Logging {
       s"${if (status.isDirectory) "d" else "-"}$perm")
     false
   }
+
+  def serialize(creds: Credentials): Array[Byte] = {
+    val byteStream = new ByteArrayOutputStream
+    val dataStream = new DataOutputStream(byteStream)
+    creds.writeTokenStorageToStream(dataStream)
+    byteStream.toByteArray
+  }
+
+  def deserialize(tokenBytes: Array[Byte]): Credentials = {
+    val tokensBuf = new ByteArrayInputStream(tokenBytes)
+
+    val creds = new Credentials()
+    creds.readTokenStorageStream(new DataInputStream(tokensBuf))
+    creds
+  }
 }
 
 object SparkHadoopUtil {

http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 6d744a0..e7e8fbc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -34,6 +34,7 @@ import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.ivy.Ivy
 import org.apache.ivy.core.LogOptions
 import org.apache.ivy.core.module.descriptor._
@@ -49,6 +50,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBibl
 import org.apache.spark._
 import org.apache.spark.api.r.RUtils
 import org.apache.spark.deploy.rest._
+import org.apache.spark.internal.Logging
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.util._
 
@@ -556,19 +558,25 @@ object SparkSubmit extends CommandLineUtils {
     }
 
     // assure a keytab is available from any place in a JVM
-    if (clusterManager == YARN || clusterManager == LOCAL) {
+    if (clusterManager == YARN || clusterManager == LOCAL || clusterManager == MESOS) {
       if (args.principal != null) {
-        require(args.keytab != null, "Keytab must be specified when principal is specified")
-        SparkHadoopUtil.get.loginUserFromKeytab(args.principal, args.keytab)
-        // Add keytab and principal configurations in sysProps to make them available
-        // for later use; e.g. in spark sql, the isolated class loader used to talk
-        // to HiveMetastore will use these settings. They will be set as Java system
-        // properties and then loaded by SparkConf
-        sysProps.put("spark.yarn.keytab", args.keytab)
-        sysProps.put("spark.yarn.principal", args.principal)
+        if (args.keytab != null) {
+          require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
+          // Add keytab and principal configurations in sysProps to make them available
+          // for later use; e.g. in spark sql, the isolated class loader used to talk
+          // to HiveMetastore will use these settings. They will be set as Java system
+          // properties and then loaded by SparkConf
+          sysProps.put("spark.yarn.keytab", args.keytab)
+          sysProps.put("spark.yarn.principal", args.principal)
+          UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
+        }
       }
     }
 
+    if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) {
+      setRMPrincipal(sysProps)
+    }
+
     // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
     if (isYarnCluster) {
       childMainClass = "org.apache.spark.deploy.yarn.Client"
@@ -653,6 +661,18 @@ object SparkSubmit extends CommandLineUtils {
     (childArgs, childClasspath, sysProps, childMainClass)
   }
 
+  // [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with
+  // renewer set to the YARN ResourceManager.  Since YARN isn't configured in Mesos mode, we
+  // must trick it into thinking we're YARN.
+  private def setRMPrincipal(sysProps: HashMap[String, String]): Unit = {
+    val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName
+    val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}"
+    // scalastyle:off println
+    printStream.println(s"Setting ${key} to ${shortUserName}")
+    // scalastyle:off println
+    sysProps.put(key, shortUserName)
+  }
+
   /**
    * Run the main method of the child class using the provided launch environment.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index 01cbfe1..c317c4f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -55,6 +55,14 @@ private[spark] class HadoopDelegationTokenManager(
   logDebug(s"Using the following delegation token providers: " +
     s"${delegationTokenProviders.keys.mkString(", ")}.")
 
+  /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */
+  def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
+    this(
+      sparkConf,
+      hadoopConf,
+      hadoopConf => Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf)))
+  }
+
   private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
     val providers = List(new HadoopFSDelegationTokenProvider(fileSystems),
       new HiveDelegationTokenProvider,

http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index a2f1aa2..a5d60e9 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -26,6 +26,8 @@ import scala.collection.mutable
 import scala.util.{Failure, Success}
 import scala.util.control.NonFatal
 
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -219,6 +221,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
         SparkHadoopUtil.get.startCredentialUpdater(driverConf)
       }
 
+      cfg.hadoopDelegationCreds.foreach { hadoopCreds =>
+        val creds = SparkHadoopUtil.get.deserialize(hadoopCreds)
+        SparkHadoopUtil.get.addCurrentUserCredentials(creds)
+      }
+
       val env = SparkEnv.createExecutorEnv(
         driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 89a9ad6..5d65731 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -32,7 +32,8 @@ private[spark] object CoarseGrainedClusterMessages {
 
   case class SparkAppConfig(
       sparkProperties: Seq[(String, String)],
-      ioEncryptionKey: Option[Array[Byte]])
+      ioEncryptionKey: Option[Array[Byte]],
+      hadoopDelegationCreds: Option[Array[Byte]])
     extends CoarseGrainedClusterMessage
 
   case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage

http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index a46824a..a0ef209 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -24,7 +24,11 @@ import javax.annotation.concurrent.GuardedBy
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.concurrent.Future
 
+import org.apache.hadoop.security.UserGroupInformation
+
 import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc._
 import org.apache.spark.scheduler._
@@ -42,8 +46,8 @@ import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils}
  */
 private[spark]
 class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
-  extends ExecutorAllocationClient with SchedulerBackend with Logging
-{
+  extends ExecutorAllocationClient with SchedulerBackend with Logging {
+
   // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
   protected val totalCoreCount = new AtomicInteger(0)
   // Total number of executors that are currently registered
@@ -95,6 +99,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   // The num of current max ExecutorId used to re-register appMaster
   @volatile protected var currentExecutorIdCounter = 0
 
+  // hadoop token manager used by some sub-classes (e.g. Mesos)
+  def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None
+
+  // Hadoop delegation tokens to be sent to the executors.
+  val hadoopDelegationCreds: Option[Array[Byte]] = getHadoopDelegationCreds()
+
   class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
     extends ThreadSafeRpcEndpoint with Logging {
 
@@ -223,8 +233,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
         context.reply(true)
 
       case RetrieveSparkAppConfig =>
-        val reply = SparkAppConfig(sparkProperties,
-          SparkEnv.get.securityManager.getIOEncryptionKey())
+        val reply = SparkAppConfig(
+          sparkProperties,
+          SparkEnv.get.securityManager.getIOEncryptionKey(),
+          hadoopDelegationCreds)
         context.reply(reply)
     }
 
@@ -675,6 +687,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     driverEndpoint.send(KillExecutorsOnHost(host))
     true
   }
+
+  protected def getHadoopDelegationCreds(): Option[Array[Byte]] = {
+    if (UserGroupInformation.isSecurityEnabled && hadoopDelegationTokenManager.isDefined) {
+      hadoopDelegationTokenManager.map { manager =>
+        val creds = UserGroupInformation.getCurrentUser.getCredentials
+        val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+        manager.obtainDelegationTokens(hadoopConf, creds)
+        SparkHadoopUtil.get.serialize(creds)
+      }
+    } else {
+      None
+    }
+  }
 }
 
 private[spark] object CoarseGrainedSchedulerBackend {

http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/resource-managers/mesos/pom.xml
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/pom.xml b/resource-managers/mesos/pom.xml
index 20b53f2..2aa3228 100644
--- a/resource-managers/mesos/pom.xml
+++ b/resource-managers/mesos/pom.xml
@@ -74,6 +74,17 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>${hive.group}</groupId>
+      <artifactId>hive-exec</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>${hive.group}</groupId>
+      <artifactId>hive-metastore</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
     <!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive -->
     <dependency>
       <groupId>com.google.guava</groupId>

http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index e6b0957..5ecd466 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -22,15 +22,15 @@ import java.util.{Collections, List => JList}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 import java.util.concurrent.locks.ReentrantLock
 
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
+import org.apache.mesos.SchedulerDriver
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.concurrent.Future
 
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
-import org.apache.mesos.SchedulerDriver
-
 import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
 import org.apache.spark.deploy.mesos.config._
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.internal.config
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
@@ -55,8 +55,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
     master: String,
     securityManager: SecurityManager)
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
-  with org.apache.mesos.Scheduler
-  with MesosSchedulerUtils {
+    with org.apache.mesos.Scheduler with MesosSchedulerUtils {
+
+  override def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] =
+    Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration))
 
   // Blacklist a slave after this many failures
   private val MAX_SLAVE_FAILURES = 2

http://git-wip-us.apache.org/repos/asf/spark/blob/bfdc361e/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 4fef439..3d9f99f 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -74,14 +74,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
     jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
   }
 
-  override def getCurrentUserCredentials(): Credentials = {
-    UserGroupInformation.getCurrentUser().getCredentials()
-  }
-
-  override def addCurrentUserCredentials(creds: Credentials) {
-    UserGroupInformation.getCurrentUser().addCredentials(creds)
-  }
-
   override def addSecretKeyToUserCredentials(key: String, secret: String) {
     val creds = new Credentials()
     creds.addSecretKey(new Text(key), secret.getBytes(UTF_8))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org