You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by js...@apache.org on 2017/10/19 06:56:55 UTC

spark git commit: [SPARK-22290][CORE] Avoid creating Hive delegation tokens when not necessary.

Repository: spark
Updated Branches:
  refs/heads/master 6f1d0dea1 -> dc2714da5


[SPARK-22290][CORE] Avoid creating Hive delegation tokens when not necessary.

Hive delegation tokens are only needed when the Spark driver has no access
to the kerberos TGT. That happens only in two situations:

- when using a proxy user
- when using cluster mode without a keytab

This change modifies the Hive provider so that it only generates delegation
tokens in those situations, and tweaks the YARN AM so that it makes the proper
user visible to the Hive code when running with keytabs, so that the TGT
can be used instead of a delegation token.

The effect of this change is that now it's possible to initialize multiple,
non-concurrent SparkContext instances in the same JVM. Before, the second
invocation would fail to fetch a new Hive delegation token, which then could
make the second (or third or...) application fail once the token expired.
With this change, the TGT will be used to authenticate to the HMS instead.

This change also avoids polluting the current logged in user's credentials
when launching applications. The credentials are copied only when running
applications as a proxy user. This makes it possible to implement SPARK-11035
later, where multiple threads might be launching applications, and each app
should have its own set of credentials.

Tested by verifying HDFS and Hive access in following scenarios:
- client and cluster mode
- client and cluster mode with proxy user
- client and cluster mode with principal / keytab
- long-running cluster app with principal / keytab
- pyspark app that creates (and stops) multiple SparkContext instances
  through its lifetime

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #19509 from vanzin/SPARK-22290.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc2714da
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc2714da
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc2714da

Branch: refs/heads/master
Commit: dc2714da50ecba1bf1fdf555a82a4314f763a76e
Parents: 6f1d0de
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Thu Oct 19 14:56:48 2017 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Thu Oct 19 14:56:48 2017 +0800

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkHadoopUtil.scala   | 17 +++--
 .../security/HBaseDelegationTokenProvider.scala |  4 +-
 .../security/HadoopDelegationTokenManager.scala |  2 +-
 .../HadoopDelegationTokenProvider.scala         |  2 +-
 .../HadoopFSDelegationTokenProvider.scala       |  4 +-
 .../security/HiveDelegationTokenProvider.scala  | 20 +++++-
 docs/running-on-yarn.md                         |  9 +++
 .../spark/deploy/yarn/ApplicationMaster.scala   | 69 ++++++++++++++++----
 .../org/apache/spark/deploy/yarn/Client.scala   |  5 +-
 .../org/apache/spark/deploy/yarn/config.scala   |  4 ++
 .../spark/sql/hive/client/HiveClientImpl.scala  |  6 --
 11 files changed, 110 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 53775db..1fa10ab 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -61,13 +61,17 @@ class SparkHadoopUtil extends Logging {
    * do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems
    */
   def runAsSparkUser(func: () => Unit) {
+    createSparkUser().doAs(new PrivilegedExceptionAction[Unit] {
+      def run: Unit = func()
+    })
+  }
+
+  def createSparkUser(): UserGroupInformation = {
     val user = Utils.getCurrentUserName()
-    logDebug("running as user: " + user)
+    logDebug("creating UGI for user: " + user)
     val ugi = UserGroupInformation.createRemoteUser(user)
     transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
-    ugi.doAs(new PrivilegedExceptionAction[Unit] {
-      def run: Unit = func()
-    })
+    ugi
   }
 
   def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
@@ -417,6 +421,11 @@ class SparkHadoopUtil extends Logging {
     creds.readTokenStorageStream(new DataInputStream(tokensBuf))
     creds
   }
+
+  def isProxyUser(ugi: UserGroupInformation): Boolean = {
+    ugi.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.PROXY
+  }
+
 }
 
 object SparkHadoopUtil {

http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
index 78b0e6b..5dcde4e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
@@ -56,7 +56,9 @@ private[security] class HBaseDelegationTokenProvider
     None
   }
 
-  override def delegationTokensRequired(hadoopConf: Configuration): Boolean = {
+  override def delegationTokensRequired(
+      sparkConf: SparkConf,
+      hadoopConf: Configuration): Boolean = {
     hbaseConf(hadoopConf).get("hbase.security.authentication") == "kerberos"
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index c134b7e..483d0de 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -115,7 +115,7 @@ private[spark] class HadoopDelegationTokenManager(
       hadoopConf: Configuration,
       creds: Credentials): Long = {
     delegationTokenProviders.values.flatMap { provider =>
-      if (provider.delegationTokensRequired(hadoopConf)) {
+      if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
         provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
       } else {
         logDebug(s"Service ${provider.serviceName} does not require a token." +

http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala
index 1ba245e..ed09050 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala
@@ -37,7 +37,7 @@ private[spark] trait HadoopDelegationTokenProvider {
    * Returns true if delegation tokens are required for this service. By default, it is based on
    * whether Hadoop security is enabled.
    */
-  def delegationTokensRequired(hadoopConf: Configuration): Boolean
+  def delegationTokensRequired(sparkConf: SparkConf, hadoopConf: Configuration): Boolean
 
   /**
    * Obtain delegation tokens for this service and get the time of the next renewal.

http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
index 300773c..21ca669 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
@@ -69,7 +69,9 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration
     nextRenewalDate
   }
 
-  def delegationTokensRequired(hadoopConf: Configuration): Boolean = {
+  override def delegationTokensRequired(
+      sparkConf: SparkConf,
+      hadoopConf: Configuration): Boolean = {
     UserGroupInformation.isSecurityEnabled
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala
index b31cc59..ece5ce7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala
@@ -31,7 +31,9 @@ import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 import org.apache.hadoop.security.token.Token
 
 import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.KEYTAB
 import org.apache.spark.util.Utils
 
 private[security] class HiveDelegationTokenProvider
@@ -55,9 +57,21 @@ private[security] class HiveDelegationTokenProvider
     }
   }
 
-  override def delegationTokensRequired(hadoopConf: Configuration): Boolean = {
+  override def delegationTokensRequired(
+      sparkConf: SparkConf,
+      hadoopConf: Configuration): Boolean = {
+    // Delegation tokens are needed only when:
+    // - trying to connect to a secure metastore
+    // - either deploying in cluster mode without a keytab, or impersonating another user
+    //
+    // Other modes (such as client with or without keytab, or cluster mode with keytab) do not need
+    // a delegation token, since there's a valid kerberos TGT for the right user available to the
+    // driver, which is the only process that connects to the HMS.
+    val deployMode = sparkConf.get("spark.submit.deployMode", "client")
     UserGroupInformation.isSecurityEnabled &&
-      hiveConf(hadoopConf).getTrimmed("hive.metastore.uris", "").nonEmpty
+      hiveConf(hadoopConf).getTrimmed("hive.metastore.uris", "").nonEmpty &&
+      (SparkHadoopUtil.get.isProxyUser(UserGroupInformation.getCurrentUser()) ||
+        (deployMode == "cluster" && !sparkConf.contains(KEYTAB)))
   }
 
   override def obtainDelegationTokens(
@@ -83,7 +97,7 @@ private[security] class HiveDelegationTokenProvider
 
         val hive2Token = new Token[DelegationTokenIdentifier]()
         hive2Token.decodeFromUrlString(tokenStr)
-        logInfo(s"Get Token from hive metastore: ${hive2Token.toString}")
+        logDebug(s"Get Token from hive metastore: ${hive2Token.toString}")
         creds.addToken(new Text("hive.server2.delegation.token"), hive2Token)
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 4326395..9599d40 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -402,6 +402,15 @@ To use a custom metrics.properties for the application master and executors, upd
   </td>
 </tr>
 <tr>
+  <td><code>spark.yarn.kerberos.relogin.period</code></td>
+  <td>1m</td>
+  <td>
+  How often to check whether the kerberos TGT should be renewed. This should be set to a value
+  that is shorter than the TGT renewal period (or the TGT lifetime if TGT renewal is not enabled).
+  The default value should be enough for most deployments.
+  </td>
+</tr>
+<tr>
   <td><code>spark.yarn.config.gatewayPath</code></td>
   <td>(none)</td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e227bff..f616723 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.yarn
 import java.io.{File, IOException}
 import java.lang.reflect.InvocationTargetException
 import java.net.{Socket, URI, URL}
+import java.security.PrivilegedExceptionAction
 import java.util.concurrent.{TimeoutException, TimeUnit}
 
 import scala.collection.mutable.HashMap
@@ -28,6 +29,7 @@ import scala.concurrent.duration.Duration
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -49,10 +51,7 @@ import org.apache.spark.util._
 /**
  * Common application master functionality for Spark on Yarn.
  */
-private[spark] class ApplicationMaster(
-    args: ApplicationMasterArguments,
-    client: YarnRMClient)
-  extends Logging {
+private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
 
   // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
   // optimal as more containers are available. Might need to handle this better.
@@ -62,6 +61,46 @@ private[spark] class ApplicationMaster(
     .asInstanceOf[YarnConfiguration]
   private val isClusterMode = args.userClass != null
 
+  private val ugi = {
+    val original = UserGroupInformation.getCurrentUser()
+
+    // If a principal and keytab were provided, log in to kerberos, and set up a thread to
+    // renew the kerberos ticket when needed. Because the UGI API does not expose the TTL
+    // of the TGT, use a configuration to define how often to check that a relogin is necessary.
+    // checkTGTAndReloginFromKeytab() is a no-op if the relogin is not yet needed.
+    val principal = sparkConf.get(PRINCIPAL).orNull
+    val keytab = sparkConf.get(KEYTAB).orNull
+    if (principal != null && keytab != null) {
+      UserGroupInformation.loginUserFromKeytab(principal, keytab)
+
+      val renewer = new Thread() {
+        override def run(): Unit = Utils.tryLogNonFatalError {
+          while (true) {
+            TimeUnit.SECONDS.sleep(sparkConf.get(KERBEROS_RELOGIN_PERIOD))
+            UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab()
+          }
+        }
+      }
+      renewer.setName("am-kerberos-renewer")
+      renewer.setDaemon(true)
+      renewer.start()
+
+      // Transfer the original user's tokens to the new user, since that's needed to connect to
+      // YARN. It also copies over any delegation tokens that might have been created by the
+      // client, which will then be transferred over when starting executors (until new ones
+      // are created by the periodic task).
+      val newUser = UserGroupInformation.getCurrentUser()
+      SparkHadoopUtil.get.transferCredentials(original, newUser)
+      newUser
+    } else {
+      SparkHadoopUtil.get.createSparkUser()
+    }
+  }
+
+  private val client = ugi.doAs(new PrivilegedExceptionAction[YarnRMClient]() {
+    def run: YarnRMClient = new YarnRMClient()
+  })
+
   // Default to twice the number of executors (twice the maximum number of executors if dynamic
   // allocation is enabled), with a minimum of 3.
 
@@ -201,6 +240,13 @@ private[spark] class ApplicationMaster(
   }
 
   final def run(): Int = {
+    ugi.doAs(new PrivilegedExceptionAction[Unit]() {
+      def run: Unit = runImpl()
+    })
+    exitCode
+  }
+
+  private def runImpl(): Unit = {
     try {
       val appAttemptId = client.getAttemptId()
 
@@ -254,11 +300,6 @@ private[spark] class ApplicationMaster(
         }
       }
 
-      // Call this to force generation of secret so it gets populated into the
-      // Hadoop UGI. This has to happen before the startUserApplication which does a
-      // doAs in order for the credentials to be passed on to the executor containers.
-      val securityMgr = new SecurityManager(sparkConf)
-
       // If the credentials file config is present, we must periodically renew tokens. So create
       // a new AMDelegationTokenRenewer
       if (sparkConf.contains(CREDENTIALS_FILE_PATH)) {
@@ -284,6 +325,9 @@ private[spark] class ApplicationMaster(
         credentialRenewerThread.join()
       }
 
+      // Call this to force generation of secret so it gets populated into the Hadoop UGI.
+      val securityMgr = new SecurityManager(sparkConf)
+
       if (isClusterMode) {
         runDriver(securityMgr)
       } else {
@@ -297,7 +341,6 @@ private[spark] class ApplicationMaster(
           ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
           "Uncaught exception: " + e)
     }
-    exitCode
   }
 
   /**
@@ -775,10 +818,8 @@ object ApplicationMaster extends Logging {
         sys.props(k) = v
       }
     }
-    SparkHadoopUtil.get.runAsSparkUser { () =>
-      master = new ApplicationMaster(amArgs, new YarnRMClient)
-      System.exit(master.run())
-    }
+    master = new ApplicationMaster(amArgs)
+    System.exit(master.run())
   }
 
   private[spark] def sparkContextInitialized(sc: SparkContext): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 64b2b4d..1fe25c4 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -394,7 +394,10 @@ private[spark] class Client(
     if (credentials != null) {
       // Add credentials to current user's UGI, so that following operations don't need to use the
       // Kerberos tgt to get delegations again in the client side.
-      UserGroupInformation.getCurrentUser.addCredentials(credentials)
+      val currentUser = UserGroupInformation.getCurrentUser()
+      if (SparkHadoopUtil.get.isProxyUser(currentUser)) {
+        currentUser.addCredentials(credentials)
+      }
       logDebug(YarnSparkHadoopUtil.get.dumpTokens(credentials).mkString("\n"))
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 187803c..e1af8ba 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -347,6 +347,10 @@ package object config {
     .timeConf(TimeUnit.MILLISECONDS)
     .createWithDefault(Long.MaxValue)
 
+  private[spark] val KERBEROS_RELOGIN_PERIOD = ConfigBuilder("spark.yarn.kerberos.relogin.period")
+    .timeConf(TimeUnit.SECONDS)
+    .createWithDefaultString("1m")
+
   // The list of cache-related config entries. This is used by Client and the AM to clean
   // up the environment so that these settings do not appear on the web UI.
   private[yarn] val CACHE_CONFIGS = Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index a01c312..16c95c5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -111,12 +111,6 @@ private[hive] class HiveClientImpl(
     if (clientLoader.isolationOn) {
       // Switch to the initClassLoader.
       Thread.currentThread().setContextClassLoader(initClassLoader)
-      // Set up kerberos credentials for UserGroupInformation.loginUser within current class loader
-      if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) {
-        val principal = sparkConf.get("spark.yarn.principal")
-        val keytab = sparkConf.get("spark.yarn.keytab")
-        SparkHadoopUtil.get.loginUserFromKeytab(principal, keytab)
-      }
       try {
         newState()
       } finally {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org