You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2016/08/10 22:39:40 UTC

[2/2] spark git commit: [SPARK-14743][YARN] Add a configurable credential manager for Spark running on YARN

[SPARK-14743][YARN] Add a configurable credential manager for Spark running on YARN

## What changes were proposed in this pull request?

Add a configurable token manager for Spark on running on yarn.

### Current Problems ###

1. Supported token provider is hard-coded, currently only hdfs, hbase and hive are supported and it is impossible for user to add new token provider without code changes.
2. Also this problem exits in timely token renewer and updater.

### Changes In This Proposal ###

In this proposal, to address the problems mentioned above and make the current code more cleaner and easier to understand, mainly has 3 changes:

1. Abstract a `ServiceTokenProvider` as well as `ServiceTokenRenewable` interface for token provider. Each service wants to communicate with Spark through token way needs to implement this interface.
2. Provide a `ConfigurableTokenManager` to manage all the register token providers, also token renewer and updater. Also this class offers the API for other modules to obtain tokens, get renewal interval and so on.
3. Implement 3 built-in token providers `HDFSTokenProvider`, `HiveTokenProvider` and `HBaseTokenProvider` to keep the same semantics as supported today. Whether to load in these built-in token providers is controlled by configuration "spark.yarn.security.tokens.${service}.enabled", by default for all the built-in token providers are loaded.

### Behavior Changes ###

For the end user there's no behavior change, we still use the same configuration `spark.yarn.security.tokens.${service}.enabled` to decide which token provider is enabled (hbase or hive).

For user implemented token provider (assume the name of token provider is "test") needs to add into this class should have two configurations:

1. `spark.yarn.security.tokens.test.enabled` to true
2. `spark.yarn.security.tokens.test.class` to the full qualified class name.

So we still keep the same semantics as current code while add one new configuration.

### Current Status ###

- [x] token provider interface and management framework.
- [x] implement built-in token providers (hdfs, hbase, hive).
- [x] Coverage of unit test.
- [x] Integrated test with security cluster.

## How was this patch tested?

Unit test and integrated test.

Please suggest and review, any comment is greatly appreciated.

Author: jerryshao <ss...@hortonworks.com>

Closes #14065 from jerryshao/SPARK-16342.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab648c00
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab648c00
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab648c00

Branch: refs/heads/master
Commit: ab648c0004cfb20d53554ab333dd2d198cb94ffa
Parents: bd2c12f
Author: jerryshao <ss...@hortonworks.com>
Authored: Wed Aug 10 15:39:30 2016 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Wed Aug 10 15:39:30 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  38 +--
 .../executor/CoarseGrainedExecutorBackend.scala |   4 +-
 .../apache/spark/internal/config/package.scala  |   7 -
 dev/.rat-excludes                               |   1 +
 docs/running-on-yarn.md                         |  22 +-
 project/MimaExcludes.scala                      |   5 +-
 ...ploy.yarn.security.ServiceCredentialProvider |   3 +
 .../deploy/yarn/AMDelegationTokenRenewer.scala  | 210 ----------------
 .../spark/deploy/yarn/ApplicationMaster.scala   |  13 +-
 .../org/apache/spark/deploy/yarn/Client.scala   |  63 +++--
 .../yarn/ExecutorDelegationTokenUpdater.scala   | 114 ---------
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 243 +------------------
 .../org/apache/spark/deploy/yarn/config.scala   |  10 +
 .../yarn/security/AMCredentialRenewer.scala     | 235 ++++++++++++++++++
 .../ConfigurableCredentialManager.scala         | 105 ++++++++
 .../yarn/security/CredentialUpdater.scala       | 130 ++++++++++
 .../yarn/security/HBaseCredentialProvider.scala |  74 ++++++
 .../yarn/security/HDFSCredentialProvider.scala  | 110 +++++++++
 .../yarn/security/HiveCredentialProvider.scala  | 129 ++++++++++
 .../security/ServiceCredentialProvider.scala    |  57 +++++
 .../cluster/YarnClientSchedulerBackend.scala    |   4 +-
 ...ploy.yarn.security.ServiceCredentialProvider |   1 +
 .../deploy/yarn/YarnSparkHadoopUtilSuite.scala  |  97 +-------
 .../ConfigurableCredentialManagerSuite.scala    | 150 ++++++++++++
 .../security/HDFSCredentialProviderSuite.scala  |  71 ++++++
 25 files changed, 1154 insertions(+), 742 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ab648c00/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 671e8e4..3f54ecc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -17,21 +17,19 @@
 
 package org.apache.spark.deploy
 
-import java.io.{ByteArrayInputStream, DataInputStream, IOException}
+import java.io.IOException
 import java.lang.reflect.Method
 import java.security.PrivilegedExceptionAction
 import java.text.DateFormat
 import java.util.{Arrays, Comparator, Date}
 
 import scala.collection.JavaConverters._
-import scala.concurrent.duration._
 import scala.util.control.NonFatal
 
 import com.google.common.primitives.Longs
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
 import org.apache.hadoop.fs.FileSystem.Statistics
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 import org.apache.hadoop.security.token.{Token, TokenIdentifier}
@@ -40,7 +38,6 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config._
 import org.apache.spark.util.Utils
 
 /**
@@ -277,29 +274,6 @@ class SparkHadoopUtil extends Logging {
     }
   }
 
-  /**
-   * How much time is remaining (in millis) from now to (fraction * renewal time for the token that
-   * is valid the latest)?
-   * This will return -ve (or 0) value if the fraction of validity has already expired.
-   */
-  def getTimeFromNowToRenewal(
-      sparkConf: SparkConf,
-      fraction: Double,
-      credentials: Credentials): Long = {
-    val now = System.currentTimeMillis()
-
-    val renewalInterval = sparkConf.get(TOKEN_RENEWAL_INTERVAL).get
-
-    credentials.getAllTokens.asScala
-      .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
-      .map { t =>
-        val identifier = new DelegationTokenIdentifier()
-        identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
-        (identifier.getIssueDate + fraction * renewalInterval).toLong - now
-      }.foldLeft(0L)(math.max)
-  }
-
-
   private[spark] def getSuffixForCredentialsPath(credentialsPath: Path): Int = {
     val fileName = credentialsPath.getName
     fileName.substring(
@@ -337,15 +311,15 @@ class SparkHadoopUtil extends Logging {
   }
 
   /**
-   * Start a thread to periodically update the current user's credentials with new delegation
-   * tokens so that writes to HDFS do not fail.
+   * Start a thread to periodically update the current user's credentials with new credentials so
+   * that access to secured service does not fail.
    */
-  private[spark] def startExecutorDelegationTokenRenewer(conf: SparkConf) {}
+  private[spark] def startCredentialUpdater(conf: SparkConf) {}
 
   /**
-   * Stop the thread that does the delegation token updates.
+   * Stop the thread that does the credential updates.
    */
-  private[spark] def stopExecutorDelegationTokenRenewer() {}
+  private[spark] def stopCredentialUpdater() {}
 
   /**
    * Return a fresh Hadoop configuration, bypassing the HDFS cache mechanism.

http://git-wip-us.apache.org/repos/asf/spark/blob/ab648c00/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index e30839c..391b97d 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -203,7 +203,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
       if (driverConf.contains("spark.yarn.credentials.file")) {
         logInfo("Will periodically update credentials from: " +
           driverConf.get("spark.yarn.credentials.file"))
-        SparkHadoopUtil.get.startExecutorDelegationTokenRenewer(driverConf)
+        SparkHadoopUtil.get.startCredentialUpdater(driverConf)
       }
 
       val env = SparkEnv.createExecutorEnv(
@@ -215,7 +215,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
         env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
       }
       env.rpcEnv.awaitTermination()
-      SparkHadoopUtil.get.stopExecutorDelegationTokenRenewer()
+      SparkHadoopUtil.get.stopCredentialUpdater()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ab648c00/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index cb75716..e646d99 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.internal
 
-import java.util.concurrent.TimeUnit
-
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.network.util.ByteUnit
 
@@ -82,11 +80,6 @@ package object config {
     .doc("Name of the Kerberos principal.")
     .stringConf.createOptional
 
-  private[spark] val TOKEN_RENEWAL_INTERVAL = ConfigBuilder("spark.yarn.token.renewal.interval")
-    .internal()
-    .timeConf(TimeUnit.MILLISECONDS)
-    .createOptional
-
   private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances")
     .intConf
     .createOptional

http://git-wip-us.apache.org/repos/asf/spark/blob/ab648c00/dev/.rat-excludes
----------------------------------------------------------------------
diff --git a/dev/.rat-excludes b/dev/.rat-excludes
index 0c86671..9171f38 100644
--- a/dev/.rat-excludes
+++ b/dev/.rat-excludes
@@ -100,3 +100,4 @@ spark-deps-.*
 org.apache.spark.scheduler.ExternalClusterManager
 .*\.sql
 .Rbuildignore
+org.apache.spark.deploy.yarn.security.ServiceCredentialProvider

http://git-wip-us.apache.org/repos/asf/spark/blob/ab648c00/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index befd3ea..cd18808 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -461,15 +461,14 @@ To use a custom metrics.properties for the application master and executors, upd
   </td>
 </tr>
 <tr>
-  <td><code>spark.yarn.security.tokens.${service}.enabled</code></td>
+  <td><code>spark.yarn.security.credentials.${service}.enabled</code></td>
   <td><code>true</code></td>
   <td>
-  Controls whether to retrieve delegation tokens for non-HDFS services when security is enabled.
-  By default, delegation tokens for all supported services are retrieved when those services are
+  Controls whether to obtain credentials for services when security is enabled.
+  By default, credentials for all supported services are retrieved when those services are
   configured, but it's possible to disable that behavior if it somehow conflicts with the
-  application being run.
-  <p/>
-  Currently supported services are: <code>hive</code>, <code>hbase</code>
+  application being run. For further details please see
+  [Running in a Secure Cluster](running-on-yarn.html#running-in-a-secure-cluster)
   </td>
 </tr>
 <tr>
@@ -525,11 +524,11 @@ token for the cluster's HDFS filesystem, and potentially for HBase and Hive.
 
 An HBase token will be obtained if HBase is in on classpath, the HBase configuration declares
 the application is secure (i.e. `hbase-site.xml` sets `hbase.security.authentication` to `kerberos`),
-and `spark.yarn.security.tokens.hbase.enabled` is not set to `false`.
+and `spark.yarn.security.credentials.hbase.enabled` is not set to `false`.
 
 Similarly, a Hive token will be obtained if Hive is on the classpath, its configuration
 includes a URI of the metadata store in `"hive.metastore.uris`, and
-`spark.yarn.security.tokens.hive.enabled` is not set to `false`.
+`spark.yarn.security.credentials.hive.enabled` is not set to `false`.
 
 If an application needs to interact with other secure HDFS clusters, then
 the tokens needed to access these clusters must be explicitly requested at
@@ -539,6 +538,13 @@ launch time. This is done by listing them in the `spark.yarn.access.namenodes` p
 spark.yarn.access.namenodes hdfs://ireland.example.org:8020/,hdfs://frankfurt.example.org:8020/
 ```
 
+Spark supports integrating with other security-aware services through Java Services mechanism (see
+`java.util.ServiceLoader`). To do that, implementations of `org.apache.spark.deploy.yarn.security.ServiceCredentialProvider`
+should be available to Spark by listing their names in the corresponding file in the jar's
+`META-INF/services` directory. These plug-ins can be disabled by setting
+`spark.yarn.security.tokens.{service}.enabled` to `false`, where `{service}` is the name of
+credential provider.
+
 ## Configuring the External Shuffle Service
 
 To start the Spark Shuffle Service on each `NodeManager` in your YARN cluster, follow these

http://git-wip-us.apache.org/repos/asf/spark/blob/ab648c00/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index a201d7f..688218f 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -784,7 +784,10 @@ object MimaExcludes {
       ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.jdbc"),
       ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.parquetFile"),
       ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.applySchema")
-    )
+    ) ++ Seq(
+        // [SPARK-14743] Improve delegation token handling in secure cluster
+        ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.getTimeFromNowToRenewal")
+      )
   }
 
   def excludes(version: String) = version match {

http://git-wip-us.apache.org/repos/asf/spark/blob/ab648c00/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
----------------------------------------------------------------------
diff --git a/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
new file mode 100644
index 0000000..22ead56
--- /dev/null
+++ b/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
@@ -0,0 +1,3 @@
+org.apache.spark.deploy.yarn.security.HDFSCredentialProvider
+org.apache.spark.deploy.yarn.security.HBaseCredentialProvider
+org.apache.spark.deploy.yarn.security.HiveCredentialProvider

http://git-wip-us.apache.org/repos/asf/spark/blob/ab648c00/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
deleted file mode 100644
index 310a7a6..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.yarn
-
-import java.security.PrivilegedExceptionAction
-import java.util.concurrent.{Executors, TimeUnit}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.UserGroupInformation
-
-import org.apache.spark.SparkConf
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config._
-import org.apache.spark.util.ThreadUtils
-
-/*
- * The following methods are primarily meant to make sure long-running apps like Spark
- * Streaming apps can run without interruption while writing to secure HDFS. The
- * scheduleLoginFromKeytab method is called on the driver when the
- * CoarseGrainedScheduledBackend starts up. This method wakes up a thread that logs into the KDC
- * once 75% of the renewal interval of the original delegation tokens used for the container
- * has elapsed. It then creates new delegation tokens and writes them to HDFS in a
- * pre-specified location - the prefix of which is specified in the sparkConf by
- * spark.yarn.credentials.file (so the file(s) would be named c-1, c-2 etc. - each update goes
- * to a new file, with a monotonically increasing suffix). After this, the credentials are
- * updated once 75% of the new tokens renewal interval has elapsed.
- *
- * On the executor side, the updateCredentialsIfRequired method is called once 80% of the
- * validity of the original tokens has elapsed. At that time the executor finds the
- * credentials file with the latest timestamp and checks if it has read those credentials
- * before (by keeping track of the suffix of the last file it read). If a new file has
- * appeared, it will read the credentials and update the currently running UGI with it. This
- * process happens again once 80% of the validity of this has expired.
- */
-private[yarn] class AMDelegationTokenRenewer(
-    sparkConf: SparkConf,
-    hadoopConf: Configuration) extends Logging {
-
-  private var lastCredentialsFileSuffix = 0
-
-  private val delegationTokenRenewer =
-    Executors.newSingleThreadScheduledExecutor(
-      ThreadUtils.namedThreadFactory("Delegation Token Refresh Thread"))
-
-  private val hadoopUtil = YarnSparkHadoopUtil.get
-
-  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
-  private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
-  private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
-  private val freshHadoopConf =
-    hadoopUtil.getConfBypassingFSCache(hadoopConf, new Path(credentialsFile).toUri.getScheme)
-
-  /**
-   * Schedule a login from the keytab and principal set using the --principal and --keytab
-   * arguments to spark-submit. This login happens only when the credentials of the current user
-   * are about to expire. This method reads spark.yarn.principal and spark.yarn.keytab from
-   * SparkConf to do the login. This method is a no-op in non-YARN mode.
-   *
-   */
-  private[spark] def scheduleLoginFromKeytab(): Unit = {
-    val principal = sparkConf.get(PRINCIPAL).get
-    val keytab = sparkConf.get(KEYTAB).get
-
-    /**
-     * Schedule re-login and creation of new tokens. If tokens have already expired, this method
-     * will synchronously create new ones.
-     */
-    def scheduleRenewal(runnable: Runnable): Unit = {
-      val credentials = UserGroupInformation.getCurrentUser.getCredentials
-      val renewalInterval = hadoopUtil.getTimeFromNowToRenewal(sparkConf, 0.75, credentials)
-      // Run now!
-      if (renewalInterval <= 0) {
-        logInfo("HDFS tokens have expired, creating new tokens now.")
-        runnable.run()
-      } else {
-        logInfo(s"Scheduling login from keytab in $renewalInterval millis.")
-        delegationTokenRenewer.schedule(runnable, renewalInterval, TimeUnit.MILLISECONDS)
-      }
-    }
-
-    // This thread periodically runs on the driver to update the delegation tokens on HDFS.
-    val driverTokenRenewerRunnable =
-      new Runnable {
-        override def run(): Unit = {
-          try {
-            writeNewTokensToHDFS(principal, keytab)
-            cleanupOldFiles()
-          } catch {
-            case e: Exception =>
-              // Log the error and try to write new tokens back in an hour
-              logWarning("Failed to write out new credentials to HDFS, will try again in an " +
-                "hour! If this happens too often tasks will fail.", e)
-              delegationTokenRenewer.schedule(this, 1, TimeUnit.HOURS)
-              return
-          }
-          scheduleRenewal(this)
-        }
-      }
-    // Schedule update of credentials. This handles the case of updating the tokens right now
-    // as well, since the renewal interval will be 0, and the thread will get scheduled
-    // immediately.
-    scheduleRenewal(driverTokenRenewerRunnable)
-  }
-
-  // Keeps only files that are newer than daysToKeepFiles days, and deletes everything else. At
-  // least numFilesToKeep files are kept for safety
-  private def cleanupOldFiles(): Unit = {
-    import scala.concurrent.duration._
-    try {
-      val remoteFs = FileSystem.get(freshHadoopConf)
-      val credentialsPath = new Path(credentialsFile)
-      val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles.days).toMillis
-      hadoopUtil.listFilesSorted(
-        remoteFs, credentialsPath.getParent,
-        credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
-        .dropRight(numFilesToKeep)
-        .takeWhile(_.getModificationTime < thresholdTime)
-        .foreach(x => remoteFs.delete(x.getPath, true))
-    } catch {
-      // Such errors are not fatal, so don't throw. Make sure they are logged though
-      case e: Exception =>
-        logWarning("Error while attempting to cleanup old tokens. If you are seeing many such " +
-          "warnings there may be an issue with your HDFS cluster.", e)
-    }
-  }
-
-  private def writeNewTokensToHDFS(principal: String, keytab: String): Unit = {
-    // Keytab is copied by YARN to the working directory of the AM, so full path is
-    // not needed.
-
-    // HACK:
-    // HDFS will not issue new delegation tokens, if the Credentials object
-    // passed in already has tokens for that FS even if the tokens are expired (it really only
-    // checks if there are tokens for the service, and not if they are valid). So the only real
-    // way to get new tokens is to make sure a different Credentials object is used each time to
-    // get new tokens and then the new tokens are copied over the current user's Credentials.
-    // So:
-    // - we login as a different user and get the UGI
-    // - use that UGI to get the tokens (see doAs block below)
-    // - copy the tokens over to the current user's credentials (this will overwrite the tokens
-    // in the current user's Credentials object for this FS).
-    // The login to KDC happens each time new tokens are required, but this is rare enough to not
-    // have to worry about (like once every day or so). This makes this code clearer than having
-    // to login and then relogin every time (the HDFS API may not relogin since we don't use this
-    // UGI directly for HDFS communication.
-    logInfo(s"Attempting to login to KDC using principal: $principal")
-    val keytabLoggedInUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
-    logInfo("Successfully logged into KDC.")
-    val tempCreds = keytabLoggedInUGI.getCredentials
-    val credentialsPath = new Path(credentialsFile)
-    val dst = credentialsPath.getParent
-    keytabLoggedInUGI.doAs(new PrivilegedExceptionAction[Void] {
-      // Get a copy of the credentials
-      override def run(): Void = {
-        val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
-        hadoopUtil.obtainTokensForNamenodes(nns, freshHadoopConf, tempCreds)
-        hadoopUtil.obtainTokenForHiveMetastore(sparkConf, freshHadoopConf, tempCreds)
-        hadoopUtil.obtainTokenForHBase(sparkConf, freshHadoopConf, tempCreds)
-        null
-      }
-    })
-    // Add the temp credentials back to the original ones.
-    UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
-    val remoteFs = FileSystem.get(freshHadoopConf)
-    // If lastCredentialsFileSuffix is 0, then the AM is either started or restarted. If the AM
-    // was restarted, then the lastCredentialsFileSuffix might be > 0, so find the newest file
-    // and update the lastCredentialsFileSuffix.
-    if (lastCredentialsFileSuffix == 0) {
-      hadoopUtil.listFilesSorted(
-        remoteFs, credentialsPath.getParent,
-        credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
-        .lastOption.foreach { status =>
-        lastCredentialsFileSuffix = hadoopUtil.getSuffixForCredentialsPath(status.getPath)
-      }
-    }
-    val nextSuffix = lastCredentialsFileSuffix + 1
-    val tokenPathStr =
-      credentialsFile + SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM + nextSuffix
-    val tokenPath = new Path(tokenPathStr)
-    val tempTokenPath = new Path(tokenPathStr + SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
-    logInfo("Writing out delegation tokens to " + tempTokenPath.toString)
-    val credentials = UserGroupInformation.getCurrentUser.getCredentials
-    credentials.writeTokenStorageFile(tempTokenPath, freshHadoopConf)
-    logInfo(s"Delegation Tokens written out successfully. Renaming file to $tokenPathStr")
-    remoteFs.rename(tempTokenPath, tokenPath)
-    logInfo("Delegation token file rename complete.")
-    lastCredentialsFileSuffix = nextSuffix
-  }
-
-  def stop(): Unit = {
-    delegationTokenRenewer.shutdown()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/ab648c00/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index c371ad6..614278c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -35,6 +35,7 @@ import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.history.HistoryServer
 import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.deploy.yarn.security.{AMCredentialRenewer, ConfigurableCredentialManager}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.rpc._
@@ -112,7 +113,7 @@ private[spark] class ApplicationMaster(
   // Fields used in cluster mode.
   private val sparkContextRef = new AtomicReference[SparkContext](null)
 
-  private var delegationTokenRenewerOption: Option[AMDelegationTokenRenewer] = None
+  private var credentialRenewer: AMCredentialRenewer = _
 
   // Load the list of localized files set by the client. This is used when launching executors,
   // and is loaded here so that these configs don't pollute the Web UI's environment page in
@@ -235,10 +236,11 @@ private[spark] class ApplicationMaster(
       // If the credentials file config is present, we must periodically renew tokens. So create
       // a new AMDelegationTokenRenewer
       if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) {
-        delegationTokenRenewerOption = Some(new AMDelegationTokenRenewer(sparkConf, yarnConf))
         // If a principal and keytab have been set, use that to create new credentials for executors
         // periodically
-        delegationTokenRenewerOption.foreach(_.scheduleLoginFromKeytab())
+        credentialRenewer =
+          new ConfigurableCredentialManager(sparkConf, yarnConf).credentialRenewer()
+        credentialRenewer.scheduleLoginFromKeytab()
       }
 
       if (isClusterMode) {
@@ -305,7 +307,10 @@ private[spark] class ApplicationMaster(
           logDebug("shutting down user thread")
           userClassThread.interrupt()
         }
-        if (!inShutdown) delegationTokenRenewerOption.foreach(_.stop())
+        if (!inShutdown && credentialRenewer != null) {
+          credentialRenewer.stop()
+          credentialRenewer = null
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ab648c00/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 348f9bf..e3572d7 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,8 +17,7 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, IOException,
-  OutputStreamWriter}
+import java.io.{File, FileOutputStream, IOException, OutputStreamWriter}
 import java.net.{InetAddress, UnknownHostException, URI}
 import java.nio.ByteBuffer
 import java.nio.charset.StandardCharsets
@@ -35,7 +34,6 @@ import com.google.common.io.Files
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 import org.apache.hadoop.fs.permission.FsPermission
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
 import org.apache.hadoop.io.DataOutputBuffer
 import org.apache.hadoop.mapreduce.MRJobConfig
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
@@ -52,6 +50,7 @@ import org.apache.hadoop.yarn.util.Records
 import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
@@ -122,6 +121,8 @@ private[spark] class Client(
   private val appStagingBaseDir = sparkConf.get(STAGING_DIR).map { new Path(_) }
     .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())
 
+  private val credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
+
   def reportLauncherState(state: SparkAppHandle.State): Unit = {
     launcherBackend.setState(state)
   }
@@ -390,8 +391,31 @@ private[spark] class Client(
     // Upload Spark and the application JAR to the remote file system if necessary,
     // and add them as local resources to the application master.
     val fs = destDir.getFileSystem(hadoopConf)
-    val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + destDir
-    YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials)
+
+    // Merge credentials obtained from registered providers
+    val nearestTimeOfNextRenewal = credentialManager.obtainCredentials(hadoopConf, credentials)
+
+    if (credentials != null) {
+      logDebug(YarnSparkHadoopUtil.get.dumpTokens(credentials).mkString("\n"))
+    }
+
+    // If we use principal and keytab to login, also credentials can be renewed some time
+    // after current time, we should pass the next renewal and updating time to credential
+    // renewer and updater.
+    if (loginFromKeytab && nearestTimeOfNextRenewal > System.currentTimeMillis() &&
+      nearestTimeOfNextRenewal != Long.MaxValue) {
+
+      // Valid renewal time is 75% of next renewal time, and the valid update time will be
+      // slightly later then renewal time (80% of next renewal time). This is to make sure
+      // credentials are renewed and updated before expired.
+      val currTime = System.currentTimeMillis()
+      val renewalTime = (nearestTimeOfNextRenewal - currTime) * 0.75 + currTime
+      val updateTime = (nearestTimeOfNextRenewal - currTime) * 0.8 + currTime
+
+      sparkConf.set(CREDENTIALS_RENEWAL_TIME, renewalTime.toLong)
+      sparkConf.set(CREDENTIALS_UPDATE_TIME, updateTime.toLong)
+    }
+
     // Used to keep track of URIs added to the distributed cache. If the same URI is added
     // multiple times, YARN will fail to launch containers for the app with an internal
     // error.
@@ -400,11 +424,6 @@ private[spark] class Client(
     // same name but different path files are added multiple time, YARN will fail to launch
     // containers for the app with an internal error.
     val distributedNames = new HashSet[String]
-    YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials)
-    YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf, credentials)
-    if (credentials != null) {
-      logDebug(YarnSparkHadoopUtil.get.dumpTokens(credentials).mkString("\n"))
-    }
 
     val replication = sparkConf.get(STAGING_FILE_REPLICATION).map(_.toShort)
       .getOrElse(fs.getDefaultReplication(destDir))
@@ -717,28 +736,6 @@ private[spark] class Client(
   }
 
   /**
-   * Get the renewal interval for tokens.
-   */
-  private def getTokenRenewalInterval(stagingDirPath: Path): Long = {
-    // We cannot use the tokens generated above since those have renewer yarn. Trying to renew
-    // those will fail with an access control issue. So create new tokens with the logged in
-    // user as renewer.
-    val creds = new Credentials()
-    val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + stagingDirPath
-    YarnSparkHadoopUtil.get.obtainTokensForNamenodes(
-      nns, hadoopConf, creds, sparkConf.get(PRINCIPAL))
-    val t = creds.getAllTokens.asScala
-      .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
-      .head
-    val newExpiration = t.renew(hadoopConf)
-    val identifier = new DelegationTokenIdentifier()
-    identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
-    val interval = newExpiration - identifier.getIssueDate
-    logInfo(s"Renewal Interval set to $interval")
-    interval
-  }
-
-  /**
    * Set up the environment for launching our ApplicationMaster container.
    */
   private def setupLaunchEnv(
@@ -754,8 +751,6 @@ private[spark] class Client(
       val credentialsFile = "credentials-" + UUID.randomUUID().toString
       sparkConf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString)
       logInfo(s"Credentials file set to: $credentialsFile")
-      val renewalInterval = getTokenRenewalInterval(stagingDirPath)
-      sparkConf.set(TOKEN_RENEWAL_INTERVAL, renewalInterval)
     }
 
     // Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*

http://git-wip-us.apache.org/repos/asf/spark/blob/ab648c00/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
deleted file mode 100644
index 3aa6407..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.yarn
-
-import java.util.concurrent.{Executors, TimeUnit}
-
-import scala.util.control.NonFatal
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.{ThreadUtils, Utils}
-
-private[spark] class ExecutorDelegationTokenUpdater(
-    sparkConf: SparkConf,
-    hadoopConf: Configuration) extends Logging {
-
-  @volatile private var lastCredentialsFileSuffix = 0
-
-  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
-  private val freshHadoopConf =
-    SparkHadoopUtil.get.getConfBypassingFSCache(
-      hadoopConf, new Path(credentialsFile).toUri.getScheme)
-
-  private val delegationTokenRenewer =
-    Executors.newSingleThreadScheduledExecutor(
-      ThreadUtils.namedThreadFactory("Delegation Token Refresh Thread"))
-
-  // On the executor, this thread wakes up and picks up new tokens from HDFS, if any.
-  private val executorUpdaterRunnable =
-    new Runnable {
-      override def run(): Unit = Utils.logUncaughtExceptions(updateCredentialsIfRequired())
-    }
-
-  def updateCredentialsIfRequired(): Unit = {
-    try {
-      val credentialsFilePath = new Path(credentialsFile)
-      val remoteFs = FileSystem.get(freshHadoopConf)
-      SparkHadoopUtil.get.listFilesSorted(
-        remoteFs, credentialsFilePath.getParent,
-        credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
-        .lastOption.foreach { credentialsStatus =>
-        val suffix = SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath)
-        if (suffix > lastCredentialsFileSuffix) {
-          logInfo("Reading new delegation tokens from " + credentialsStatus.getPath)
-          val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath)
-          lastCredentialsFileSuffix = suffix
-          UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
-          logInfo("Tokens updated from credentials file.")
-        } else {
-          // Check every hour to see if new credentials arrived.
-          logInfo("Updated delegation tokens were expected, but the driver has not updated the " +
-            "tokens yet, will check again in an hour.")
-          delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
-          return
-        }
-      }
-      val timeFromNowToRenewal =
-        SparkHadoopUtil.get.getTimeFromNowToRenewal(
-          sparkConf, 0.8, UserGroupInformation.getCurrentUser.getCredentials)
-      if (timeFromNowToRenewal <= 0) {
-        // We just checked for new credentials but none were there, wait a minute and retry.
-        // This handles the shutdown case where the staging directory may have been removed(see
-        // SPARK-12316 for more details).
-        delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.MINUTES)
-      } else {
-        logInfo(s"Scheduling token refresh from HDFS in $timeFromNowToRenewal millis.")
-        delegationTokenRenewer.schedule(
-          executorUpdaterRunnable, timeFromNowToRenewal, TimeUnit.MILLISECONDS)
-      }
-    } catch {
-      // Since the file may get deleted while we are reading it, catch the Exception and come
-      // back in an hour to try again
-      case NonFatal(e) =>
-        logWarning("Error while trying to update credentials, will try again in 1 hour", e)
-        delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
-    }
-  }
-
-  private def getCredentialsFromHDFSFile(remoteFs: FileSystem, tokenPath: Path): Credentials = {
-    val stream = remoteFs.open(tokenPath)
-    try {
-      val newCredentials = new Credentials()
-      newCredentials.readTokenStorageStream(stream)
-      newCredentials
-    } finally {
-      stream.close()
-    }
-  }
-
-  def stop(): Unit = {
-    delegationTokenRenewer.shutdown()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/ab648c00/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 156a7a3..cc53b1b 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -18,25 +18,18 @@
 package org.apache.spark.deploy.yarn
 
 import java.io.File
-import java.lang.reflect.UndeclaredThrowableException
 import java.nio.charset.StandardCharsets.UTF_8
-import java.security.PrivilegedExceptionAction
 import java.util.regex.Matcher
 import java.util.regex.Pattern
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable.{HashMap, ListBuffer}
-import scala.reflect.runtime._
 import scala.util.Try
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
 import org.apache.hadoop.io.Text
-import org.apache.hadoop.mapred.{JobConf, Master}
+import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.security.Credentials
 import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.security.token.{Token, TokenIdentifier}
 import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority}
@@ -45,7 +38,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.deploy.yarn.security.{ConfigurableCredentialManager, CredentialUpdater}
 import org.apache.spark.internal.config._
 import org.apache.spark.launcher.YarnCommandBuilderUtils
 import org.apache.spark.util.Utils
@@ -55,7 +48,7 @@ import org.apache.spark.util.Utils
  */
 class YarnSparkHadoopUtil extends SparkHadoopUtil {
 
-  private var tokenRenewer: Option[ExecutorDelegationTokenUpdater] = None
+  private var credentialUpdater: CredentialUpdater = _
 
   override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
     dest.addCredentials(source.getCredentials())
@@ -96,237 +89,23 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
     if (credentials != null) credentials.getSecretKey(new Text(key)) else null
   }
 
-  /**
-   * Get the list of namenodes the user may access.
-   */
-  def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
-    sparkConf.get(NAMENODES_TO_ACCESS)
-      .map(new Path(_))
-      .toSet
-  }
-
-  def getTokenRenewer(conf: Configuration): String = {
-    val delegTokenRenewer = Master.getMasterPrincipal(conf)
-    logDebug("delegation token renewer is: " + delegTokenRenewer)
-    if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
-      val errorMessage = "Can't get Master Kerberos principal for use as renewer"
-      logError(errorMessage)
-      throw new SparkException(errorMessage)
-    }
-    delegTokenRenewer
-  }
-
-  /**
-   * Obtains tokens for the namenodes passed in and adds them to the credentials.
-   */
-  def obtainTokensForNamenodes(
-    paths: Set[Path],
-    conf: Configuration,
-    creds: Credentials,
-    renewer: Option[String] = None
-  ): Unit = {
-    if (UserGroupInformation.isSecurityEnabled()) {
-      val delegTokenRenewer = renewer.getOrElse(getTokenRenewer(conf))
-      paths.foreach { dst =>
-        val dstFs = dst.getFileSystem(conf)
-        logInfo("getting token for namenode: " + dst)
-        dstFs.addDelegationTokens(delegTokenRenewer, creds)
-      }
-    }
-  }
-
-  /**
-   * Obtains token for the Hive metastore and adds them to the credentials.
-   */
-  def obtainTokenForHiveMetastore(
-      sparkConf: SparkConf,
-      conf: Configuration,
-      credentials: Credentials) {
-    if (shouldGetTokens(sparkConf, "hive") && UserGroupInformation.isSecurityEnabled) {
-      YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(conf).foreach {
-        credentials.addToken(new Text("hive.server2.delegation.token"), _)
-      }
-    }
+  private[spark] override def startCredentialUpdater(sparkConf: SparkConf): Unit = {
+    credentialUpdater =
+      new ConfigurableCredentialManager(sparkConf, newConfiguration(sparkConf)).credentialUpdater()
+    credentialUpdater.start()
   }
 
-  /**
-   * Obtain a security token for HBase.
-   */
-  def obtainTokenForHBase(
-      sparkConf: SparkConf,
-      conf: Configuration,
-      credentials: Credentials): Unit = {
-    if (shouldGetTokens(sparkConf, "hbase") && UserGroupInformation.isSecurityEnabled) {
-      YarnSparkHadoopUtil.get.obtainTokenForHBase(conf).foreach { token =>
-        credentials.addToken(token.getService, token)
-        logInfo("Added HBase security token to credentials.")
-      }
+  private[spark] override def stopCredentialUpdater(): Unit = {
+    if (credentialUpdater != null) {
+      credentialUpdater.stop()
+      credentialUpdater = null
     }
   }
 
-  /**
-   * Return whether delegation tokens should be retrieved for the given service when security is
-   * enabled. By default, tokens are retrieved, but that behavior can be changed by setting
-   * a service-specific configuration.
-   */
-  private def shouldGetTokens(conf: SparkConf, service: String): Boolean = {
-    conf.getBoolean(s"spark.yarn.security.tokens.${service}.enabled", true)
-  }
-
-  private[spark] override def startExecutorDelegationTokenRenewer(sparkConf: SparkConf): Unit = {
-    tokenRenewer = Some(new ExecutorDelegationTokenUpdater(sparkConf, conf))
-    tokenRenewer.get.updateCredentialsIfRequired()
-  }
-
-  private[spark] override def stopExecutorDelegationTokenRenewer(): Unit = {
-    tokenRenewer.foreach(_.stop())
-  }
-
   private[spark] def getContainerId: ContainerId = {
     val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
     ConverterUtils.toContainerId(containerIdString)
   }
-
-  /**
-   * Obtains token for the Hive metastore, using the current user as the principal.
-   * Some exceptions are caught and downgraded to a log message.
-   * @param conf hadoop configuration; the Hive configuration will be based on this
-   * @return a token, or `None` if there's no need for a token (no metastore URI or principal
-   *         in the config), or if a binding exception was caught and downgraded.
-   */
-  def obtainTokenForHiveMetastore(conf: Configuration): Option[Token[DelegationTokenIdentifier]] = {
-    try {
-      obtainTokenForHiveMetastoreInner(conf)
-    } catch {
-      case e: ClassNotFoundException =>
-        logInfo(s"Hive class not found $e")
-        logDebug("Hive class not found", e)
-        None
-    }
-  }
-
-  /**
-   * Inner routine to obtains token for the Hive metastore; exceptions are raised on any problem.
-   * @param conf hadoop configuration; the Hive configuration will be based on this.
-   * @param username the username of the principal requesting the delegating token.
-   * @return a delegation token
-   */
-  private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration):
-      Option[Token[DelegationTokenIdentifier]] = {
-    val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
-
-    // the hive configuration class is a subclass of Hadoop Configuration, so can be cast down
-    // to a Configuration and used without reflection
-    val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
-    // using the (Configuration, Class) constructor allows the current configuration to be included
-    // in the hive config.
-    val ctor = hiveConfClass.getDeclaredConstructor(classOf[Configuration],
-      classOf[Object].getClass)
-    val hiveConf = ctor.newInstance(conf, hiveConfClass).asInstanceOf[Configuration]
-    val metastoreUri = hiveConf.getTrimmed("hive.metastore.uris", "")
-
-    // Check for local metastore
-    if (metastoreUri.nonEmpty) {
-      val principalKey = "hive.metastore.kerberos.principal"
-      val principal = hiveConf.getTrimmed(principalKey, "")
-      require(principal.nonEmpty, "Hive principal $principalKey undefined")
-      val currentUser = UserGroupInformation.getCurrentUser()
-      logDebug(s"Getting Hive delegation token for ${currentUser.getUserName()} against " +
-        s"$principal at $metastoreUri")
-      val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
-      val closeCurrent = hiveClass.getMethod("closeCurrent")
-      try {
-        // get all the instance methods before invoking any
-        val getDelegationToken = hiveClass.getMethod("getDelegationToken",
-          classOf[String], classOf[String])
-        val getHive = hiveClass.getMethod("get", hiveConfClass)
-
-        doAsRealUser {
-          val hive = getHive.invoke(null, hiveConf)
-          val tokenStr = getDelegationToken.invoke(hive, currentUser.getUserName(), principal)
-            .asInstanceOf[String]
-          val hive2Token = new Token[DelegationTokenIdentifier]()
-          hive2Token.decodeFromUrlString(tokenStr)
-          Some(hive2Token)
-        }
-      } finally {
-        Utils.tryLogNonFatalError {
-          closeCurrent.invoke(null)
-        }
-      }
-    } else {
-      logDebug("HiveMetaStore configured in localmode")
-      None
-    }
-  }
-
-  /**
-   * Obtain a security token for HBase.
-   *
-   * Requirements
-   *
-   * 1. `"hbase.security.authentication" == "kerberos"`
-   * 2. The HBase classes `HBaseConfiguration` and `TokenUtil` could be loaded
-   * and invoked.
-   *
-   * @param conf Hadoop configuration; an HBase configuration is created
-   *             from this.
-   * @return a token if the requirements were met, `None` if not.
-   */
-  def obtainTokenForHBase(conf: Configuration): Option[Token[TokenIdentifier]] = {
-    try {
-      obtainTokenForHBaseInner(conf)
-    } catch {
-      case e: ClassNotFoundException =>
-        logInfo(s"HBase class not found $e")
-        logDebug("HBase class not found", e)
-        None
-    }
-  }
-
-  /**
-   * Obtain a security token for HBase if `"hbase.security.authentication" == "kerberos"`
-   *
-   * @param conf Hadoop configuration; an HBase configuration is created
-   *             from this.
-   * @return a token if one was needed
-   */
-  def obtainTokenForHBaseInner(conf: Configuration): Option[Token[TokenIdentifier]] = {
-    val mirror = universe.runtimeMirror(getClass.getClassLoader)
-    val confCreate = mirror.classLoader.
-      loadClass("org.apache.hadoop.hbase.HBaseConfiguration").
-      getMethod("create", classOf[Configuration])
-    val obtainToken = mirror.classLoader.
-      loadClass("org.apache.hadoop.hbase.security.token.TokenUtil").
-      getMethod("obtainToken", classOf[Configuration])
-    val hbaseConf = confCreate.invoke(null, conf).asInstanceOf[Configuration]
-    if ("kerberos" == hbaseConf.get("hbase.security.authentication")) {
-      logDebug("Attempting to fetch HBase security token.")
-      Some(obtainToken.invoke(null, hbaseConf).asInstanceOf[Token[TokenIdentifier]])
-    } else {
-      None
-    }
-  }
-
-  /**
-   * Run some code as the real logged in user (which may differ from the current user, for
-   * example, when using proxying).
-   */
-  private def doAsRealUser[T](fn: => T): T = {
-    val currentUser = UserGroupInformation.getCurrentUser()
-    val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser)
-
-   // For some reason the Scala-generated anonymous class ends up causing an
-   // UndeclaredThrowableException, even if you annotate the method with @throws.
-   try {
-      realUser.doAs(new PrivilegedExceptionAction[T]() {
-        override def run(): T = fn
-      })
-    } catch {
-      case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e)
-    }
-  }
-
 }
 
 object YarnSparkHadoopUtil {

http://git-wip-us.apache.org/repos/asf/spark/blob/ab648c00/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 49c0177..ca8c890 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -319,6 +319,16 @@ package object config {
     .stringConf
     .createOptional
 
+  private[spark] val CREDENTIALS_RENEWAL_TIME = ConfigBuilder("spark.yarn.credentials.renewalTime")
+    .internal()
+    .timeConf(TimeUnit.MILLISECONDS)
+    .createWithDefault(Long.MaxValue)
+
+  private[spark] val CREDENTIALS_UPDATE_TIME = ConfigBuilder("spark.yarn.credentials.updateTime")
+    .internal()
+    .timeConf(TimeUnit.MILLISECONDS)
+    .createWithDefault(Long.MaxValue)
+
   // The list of cache-related config entries. This is used by Client and the AM to clean
   // up the environment so that these settings do not appear on the web UI.
   private[yarn] val CACHE_CONFIGS = Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/ab648c00/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
new file mode 100644
index 0000000..7e76f40
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn.security
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The following methods are primarily meant to make sure long-running apps like Spark
+ * Streaming apps can run without interruption while accessing secured services. The
+ * scheduleLoginFromKeytab method is called on the AM to get the new credentials.
+ * This method wakes up a thread that logs into the KDC
+ * once 75% of the renewal interval of the original credentials used for the container
+ * has elapsed. It then obtains new credentials and writes them to HDFS in a
+ * pre-specified location - the prefix of which is specified in the sparkConf by
+ * spark.yarn.credentials.file (so the file(s) would be named c-timestamp1-1, c-timestamp2-2 etc.
+ * - each update goes to a new file, with a monotonically increasing suffix), also the
+ * timestamp1, timestamp2 here indicates the time of next update for CredentialUpdater.
+ * After this, the credentials are renewed once 75% of the new tokens renewal interval has elapsed.
+ *
+ * On the executor and driver (yarn client mode) side, the updateCredentialsIfRequired method is
+ * called once 80% of the validity of the original credentials has elapsed. At that time the
+ * executor finds the credentials file with the latest timestamp and checks if it has read those
+ * credentials before (by keeping track of the suffix of the last file it read). If a new file has
+ * appeared, it will read the credentials and update the currently running UGI with it. This
+ * process happens again once 80% of the validity of this has expired.
+ */
+private[yarn] class AMCredentialRenewer(
+    sparkConf: SparkConf,
+    hadoopConf: Configuration,
+    credentialManager: ConfigurableCredentialManager) extends Logging {
+
+  private var lastCredentialsFileSuffix = 0
+
+  private val credentialRenewer =
+    Executors.newSingleThreadScheduledExecutor(
+      ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+
+  private val hadoopUtil = YarnSparkHadoopUtil.get
+
+  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
+  private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
+  private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
+  private val freshHadoopConf =
+    hadoopUtil.getConfBypassingFSCache(hadoopConf, new Path(credentialsFile).toUri.getScheme)
+
+  @volatile private var timeOfNextRenewal = sparkConf.get(CREDENTIALS_RENEWAL_TIME)
+
+  /**
+   * Schedule a login from the keytab and principal set using the --principal and --keytab
+   * arguments to spark-submit. This login happens only when the credentials of the current user
+   * are about to expire. This method reads spark.yarn.principal and spark.yarn.keytab from
+   * SparkConf to do the login. This method is a no-op in non-YARN mode.
+   *
+   */
+  private[spark] def scheduleLoginFromKeytab(): Unit = {
+    val principal = sparkConf.get(PRINCIPAL).get
+    val keytab = sparkConf.get(KEYTAB).get
+
+    /**
+     * Schedule re-login and creation of new credentials. If credentials have already expired, this
+     * method will synchronously create new ones.
+     */
+    def scheduleRenewal(runnable: Runnable): Unit = {
+      // Run now!
+      val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+      if (remainingTime <= 0) {
+        logInfo("Credentials have expired, creating new ones now.")
+        runnable.run()
+      } else {
+        logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+        credentialRenewer.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS)
+      }
+    }
+
+    // This thread periodically runs on the AM to update the credentials on HDFS.
+    val credentialRenewerRunnable =
+      new Runnable {
+        override def run(): Unit = {
+          try {
+            writeNewCredentialsToHDFS(principal, keytab)
+            cleanupOldFiles()
+          } catch {
+            case e: Exception =>
+              // Log the error and try to write new tokens back in an hour
+              logWarning("Failed to write out new credentials to HDFS, will try again in an " +
+                "hour! If this happens too often tasks will fail.", e)
+              credentialRenewer.schedule(this, 1, TimeUnit.HOURS)
+              return
+          }
+          scheduleRenewal(this)
+        }
+      }
+    // Schedule update of credentials. This handles the case of updating the credentials right now
+    // as well, since the renewal interval will be 0, and the thread will get scheduled
+    // immediately.
+    scheduleRenewal(credentialRenewerRunnable)
+  }
+
+  // Keeps only files that are newer than daysToKeepFiles days, and deletes everything else. At
+  // least numFilesToKeep files are kept for safety
+  private def cleanupOldFiles(): Unit = {
+    import scala.concurrent.duration._
+    try {
+      val remoteFs = FileSystem.get(freshHadoopConf)
+      val credentialsPath = new Path(credentialsFile)
+      val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles.days).toMillis
+      hadoopUtil.listFilesSorted(
+        remoteFs, credentialsPath.getParent,
+        credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
+        .dropRight(numFilesToKeep)
+        .takeWhile(_.getModificationTime < thresholdTime)
+        .foreach(x => remoteFs.delete(x.getPath, true))
+    } catch {
+      // Such errors are not fatal, so don't throw. Make sure they are logged though
+      case e: Exception =>
+        logWarning("Error while attempting to cleanup old credentials. If you are seeing many " +
+          "such warnings there may be an issue with your HDFS cluster.", e)
+    }
+  }
+
+  private def writeNewCredentialsToHDFS(principal: String, keytab: String): Unit = {
+    // Keytab is copied by YARN to the working directory of the AM, so full path is
+    // not needed.
+
+    // HACK:
+    // HDFS will not issue new delegation tokens, if the Credentials object
+    // passed in already has tokens for that FS even if the tokens are expired (it really only
+    // checks if there are tokens for the service, and not if they are valid). So the only real
+    // way to get new tokens is to make sure a different Credentials object is used each time to
+    // get new tokens and then the new tokens are copied over the current user's Credentials.
+    // So:
+    // - we login as a different user and get the UGI
+    // - use that UGI to get the tokens (see doAs block below)
+    // - copy the tokens over to the current user's credentials (this will overwrite the tokens
+    // in the current user's Credentials object for this FS).
+    // The login to KDC happens each time new tokens are required, but this is rare enough to not
+    // have to worry about (like once every day or so). This makes this code clearer than having
+    // to login and then relogin every time (the HDFS API may not relogin since we don't use this
+    // UGI directly for HDFS communication.
+    logInfo(s"Attempting to login to KDC using principal: $principal")
+    val keytabLoggedInUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
+    logInfo("Successfully logged into KDC.")
+    val tempCreds = keytabLoggedInUGI.getCredentials
+    val credentialsPath = new Path(credentialsFile)
+    val dst = credentialsPath.getParent
+    var nearestNextRenewalTime = Long.MaxValue
+    keytabLoggedInUGI.doAs(new PrivilegedExceptionAction[Void] {
+      // Get a copy of the credentials
+      override def run(): Void = {
+        nearestNextRenewalTime = credentialManager.obtainCredentials(freshHadoopConf, tempCreds)
+        null
+      }
+    })
+
+    val currTime = System.currentTimeMillis()
+    val timeOfNextUpdate = if (nearestNextRenewalTime <= currTime) {
+      // If next renewal time is earlier than current time, we set next renewal time to current
+      // time, this will trigger next renewal immediately. Also set next update time to current
+      // time. There still has a gap between token renewal and update will potentially introduce
+      // issue.
+      logWarning(s"Next credential renewal time ($nearestNextRenewalTime) is earlier than " +
+        s"current time ($currTime), which is unexpected, please check your credential renewal " +
+        "related configurations in the target services.")
+      timeOfNextRenewal = currTime
+      currTime
+    } else {
+      // Next valid renewal time is about 75% of credential renewal time, and update time is
+      // slightly later than valid renewal time (80% of renewal time).
+      timeOfNextRenewal = ((nearestNextRenewalTime - currTime) * 0.75 + currTime).toLong
+      ((nearestNextRenewalTime - currTime) * 0.8 + currTime).toLong
+    }
+
+    // Add the temp credentials back to the original ones.
+    UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
+    val remoteFs = FileSystem.get(freshHadoopConf)
+    // If lastCredentialsFileSuffix is 0, then the AM is either started or restarted. If the AM
+    // was restarted, then the lastCredentialsFileSuffix might be > 0, so find the newest file
+    // and update the lastCredentialsFileSuffix.
+    if (lastCredentialsFileSuffix == 0) {
+      hadoopUtil.listFilesSorted(
+        remoteFs, credentialsPath.getParent,
+        credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
+        .lastOption.foreach { status =>
+        lastCredentialsFileSuffix = hadoopUtil.getSuffixForCredentialsPath(status.getPath)
+      }
+    }
+    val nextSuffix = lastCredentialsFileSuffix + 1
+
+    val tokenPathStr =
+      credentialsFile + SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM +
+        timeOfNextUpdate.toLong.toString + SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM +
+          nextSuffix
+    val tokenPath = new Path(tokenPathStr)
+    val tempTokenPath = new Path(tokenPathStr + SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
+
+    logInfo("Writing out delegation tokens to " + tempTokenPath.toString)
+    val credentials = UserGroupInformation.getCurrentUser.getCredentials
+    credentials.writeTokenStorageFile(tempTokenPath, freshHadoopConf)
+    logInfo(s"Delegation Tokens written out successfully. Renaming file to $tokenPathStr")
+    remoteFs.rename(tempTokenPath, tokenPath)
+    logInfo("Delegation token file rename complete.")
+    lastCredentialsFileSuffix = nextSuffix
+  }
+
+  def stop(): Unit = {
+    credentialRenewer.shutdown()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ab648c00/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
new file mode 100644
index 0000000..c4c07b4
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import java.util.ServiceLoader
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * A ConfigurableCredentialManager to manage all the registered credential providers and offer
+ * APIs for other modules to obtain credentials as well as renewal time. By default
+ * [[HDFSCredentialProvider]], [[HiveCredentialProvider]] and [[HBaseCredentialProvider]] will
+ * be loaded in if not explicitly disabled, any plugged-in credential provider wants to be
+ * managed by ConfigurableCredentialManager needs to implement [[ServiceCredentialProvider]]
+ * interface and put into resources/META-INF/services to be loaded by ServiceLoader.
+ *
+ * Also each credential provider is controlled by
+ * spark.yarn.security.credentials.{service}.enabled, it will not be loaded in if set to false.
+ */
+private[yarn] final class ConfigurableCredentialManager(
+    sparkConf: SparkConf, hadoopConf: Configuration) extends Logging {
+  private val deprecatedProviderEnabledConfig = "spark.yarn.security.tokens.%s.enabled"
+  private val providerEnabledConfig = "spark.yarn.security.credentials.%s.enabled"
+
+  // Maintain all the registered credential providers
+  private val credentialProviders = {
+    val providers = ServiceLoader.load(classOf[ServiceCredentialProvider],
+      Utils.getContextOrSparkClassLoader).asScala
+
+    // Filter out credentials in which spark.yarn.security.credentials.{service}.enabled is false.
+    providers.filter { p =>
+      sparkConf.getOption(providerEnabledConfig.format(p.serviceName))
+        .orElse {
+          sparkConf.getOption(deprecatedProviderEnabledConfig.format(p.serviceName)).map { c =>
+            logWarning(s"${deprecatedProviderEnabledConfig.format(p.serviceName)} is deprecated, " +
+              s"using ${providerEnabledConfig.format(p.serviceName)} instead")
+            c
+          }
+        }.map(_.toBoolean).getOrElse(true)
+    }.map { p => (p.serviceName, p) }.toMap
+  }
+
+  /**
+   * Get credential provider for the specified service.
+   */
+  def getServiceCredentialProvider(service: String): Option[ServiceCredentialProvider] = {
+    credentialProviders.get(service)
+  }
+
+  /**
+   * Obtain credentials from all the registered providers.
+   * @return nearest time of next renewal, Long.MaxValue if all the credentials aren't renewable,
+   *         otherwise the nearest renewal time of any credentials will be returned.
+   */
+  def obtainCredentials(hadoopConf: Configuration, creds: Credentials): Long = {
+    credentialProviders.values.flatMap { provider =>
+      if (provider.credentialsRequired(hadoopConf)) {
+        provider.obtainCredentials(hadoopConf, sparkConf, creds)
+      } else {
+        logDebug(s"Service ${provider.serviceName} does not require a token." +
+          s" Check your configuration to see if security is disabled or not.")
+        None
+      }
+    }.foldLeft(Long.MaxValue)(math.min)
+  }
+
+  /**
+   * Create an [[AMCredentialRenewer]] instance, caller should be responsible to stop this
+   * instance when it is not used. AM will use it to renew credentials periodically.
+   */
+  def credentialRenewer(): AMCredentialRenewer = {
+    new AMCredentialRenewer(sparkConf, hadoopConf, this)
+  }
+
+  /**
+   * Create an [[CredentialUpdater]] instance, caller should be resposible to stop this intance
+   * when it is not used. Executors and driver (client mode) will use it to update credentials.
+   * periodically.
+   */
+  def credentialUpdater(): CredentialUpdater = {
+    new CredentialUpdater(sparkConf, hadoopConf, this)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ab648c00/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
new file mode 100644
index 0000000..5df4fbd
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class CredentialUpdater(
+    sparkConf: SparkConf,
+    hadoopConf: Configuration,
+    credentialManager: ConfigurableCredentialManager) extends Logging {
+
+  @volatile private var lastCredentialsFileSuffix = 0
+
+  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
+  private val freshHadoopConf =
+    SparkHadoopUtil.get.getConfBypassingFSCache(
+      hadoopConf, new Path(credentialsFile).toUri.getScheme)
+
+  private val credentialUpdater =
+    Executors.newSingleThreadScheduledExecutor(
+      ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+
+  // This thread wakes up and picks up new credentials from HDFS, if any.
+  private val credentialUpdaterRunnable =
+    new Runnable {
+      override def run(): Unit = Utils.logUncaughtExceptions(updateCredentialsIfRequired())
+    }
+
+  /** Start the credential updater task */
+  def start(): Unit = {
+    val startTime = sparkConf.get(CREDENTIALS_RENEWAL_TIME)
+    val remainingTime = startTime - System.currentTimeMillis()
+    if (remainingTime <= 0) {
+      credentialUpdater.schedule(credentialUpdaterRunnable, 1, TimeUnit.MINUTES)
+    } else {
+      logInfo(s"Scheduling credentials refresh from HDFS in $remainingTime millis.")
+      credentialUpdater.schedule(credentialUpdaterRunnable, remainingTime, TimeUnit.MILLISECONDS)
+    }
+  }
+
+  private def updateCredentialsIfRequired(): Unit = {
+    val timeToNextUpdate = try {
+      val credentialsFilePath = new Path(credentialsFile)
+      val remoteFs = FileSystem.get(freshHadoopConf)
+      SparkHadoopUtil.get.listFilesSorted(
+        remoteFs, credentialsFilePath.getParent,
+        credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
+        .lastOption.map { credentialsStatus =>
+          val suffix = SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath)
+          if (suffix > lastCredentialsFileSuffix) {
+            logInfo("Reading new credentials from " + credentialsStatus.getPath)
+            val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath)
+            lastCredentialsFileSuffix = suffix
+            UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
+            logInfo("Credentials updated from credentials file.")
+
+            val remainingTime = getTimeOfNextUpdateFromFileName(credentialsStatus.getPath)
+              - System.currentTimeMillis()
+            if (remainingTime <= 0) TimeUnit.MINUTES.toMillis(1) else remainingTime
+          } else {
+            // If current credential file is older than expected, sleep 1 hour and check again.
+            TimeUnit.HOURS.toMillis(1)
+          }
+      }.getOrElse {
+        // Wait for 1 minute to check again if there's no credential file currently
+        TimeUnit.MINUTES.toMillis(1)
+      }
+    } catch {
+      // Since the file may get deleted while we are reading it, catch the Exception and come
+      // back in an hour to try again
+      case NonFatal(e) =>
+        logWarning("Error while trying to update credentials, will try again in 1 hour", e)
+        TimeUnit.HOURS.toMillis(1)
+    }
+
+    credentialUpdater.schedule(
+      credentialUpdaterRunnable, timeToNextUpdate, TimeUnit.MILLISECONDS)
+  }
+
+  private def getCredentialsFromHDFSFile(remoteFs: FileSystem, tokenPath: Path): Credentials = {
+    val stream = remoteFs.open(tokenPath)
+    try {
+      val newCredentials = new Credentials()
+      newCredentials.readTokenStorageStream(stream)
+      newCredentials
+    } finally {
+      stream.close()
+    }
+  }
+
+  private def getTimeOfNextUpdateFromFileName(credentialsPath: Path): Long = {
+    val name = credentialsPath.getName
+    val index = name.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM)
+    val slice = name.substring(0, index)
+    val last2index = slice.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM)
+    name.substring(last2index + 1, index).toLong
+  }
+
+  def stop(): Unit = {
+    credentialUpdater.shutdown()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ab648c00/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala
new file mode 100644
index 0000000..5571df0
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+private[security] class HBaseCredentialProvider extends ServiceCredentialProvider with Logging {
+
+  override def serviceName: String = "hbase"
+
+  override def obtainCredentials(
+      hadoopConf: Configuration,
+      sparkConf: SparkConf,
+      creds: Credentials): Option[Long] = {
+    try {
+      val mirror = universe.runtimeMirror(getClass.getClassLoader)
+      val obtainToken = mirror.classLoader.
+        loadClass("org.apache.hadoop.hbase.security.token.TokenUtil").
+        getMethod("obtainToken", classOf[Configuration])
+
+      logDebug("Attempting to fetch HBase security token.")
+      val token = obtainToken.invoke(null, hbaseConf(hadoopConf))
+        .asInstanceOf[Token[_ <: TokenIdentifier]]
+      logInfo(s"Get token from HBase: ${token.toString}")
+      creds.addToken(token.getService, token)
+    } catch {
+      case NonFatal(e) =>
+        logDebug(s"Failed to get token from service $serviceName", e)
+    }
+
+    None
+  }
+
+  override def credentialsRequired(hadoopConf: Configuration): Boolean = {
+    hbaseConf(hadoopConf).get("hbase.security.authentication") == "kerberos"
+  }
+
+  private def hbaseConf(conf: Configuration): Configuration = {
+    try {
+      val mirror = universe.runtimeMirror(getClass.getClassLoader)
+      val confCreate = mirror.classLoader.
+        loadClass("org.apache.hadoop.hbase.HBaseConfiguration").
+        getMethod("create", classOf[Configuration])
+      confCreate.invoke(null, conf).asInstanceOf[Configuration]
+    } catch {
+      case NonFatal(e) =>
+        logDebug("Fail to invoke HBaseConfiguration", e)
+        conf
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ab648c00/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala
new file mode 100644
index 0000000..8d06d73
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import java.io.{ByteArrayInputStream, DataInputStream}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
+import org.apache.hadoop.mapred.Master
+import org.apache.hadoop.security.Credentials
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[security] class HDFSCredentialProvider extends ServiceCredentialProvider with Logging {
+  // Token renewal interval, this value will be set in the first call,
+  // if None means no token renewer specified, so cannot get token renewal interval.
+  private var tokenRenewalInterval: Option[Long] = null
+
+  override val serviceName: String = "hdfs"
+
+  override def obtainCredentials(
+      hadoopConf: Configuration,
+      sparkConf: SparkConf,
+      creds: Credentials): Option[Long] = {
+    // NameNode to access, used to get tokens from different FileSystems
+    nnsToAccess(hadoopConf, sparkConf).foreach { dst =>
+      val dstFs = dst.getFileSystem(hadoopConf)
+      logInfo("getting token for namenode: " + dst)
+      dstFs.addDelegationTokens(getTokenRenewer(hadoopConf), creds)
+    }
+
+    // Get the token renewal interval if it is not set. It will only be called once.
+    if (tokenRenewalInterval == null) {
+      tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf)
+    }
+
+    // Get the time of next renewal.
+    tokenRenewalInterval.map { interval =>
+      creds.getAllTokens.asScala
+        .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
+        .map { t =>
+          val identifier = new DelegationTokenIdentifier()
+          identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
+          identifier.getIssueDate + interval
+      }.foldLeft(0L)(math.max)
+    }
+  }
+
+  private def getTokenRenewalInterval(
+      hadoopConf: Configuration, sparkConf: SparkConf): Option[Long] = {
+    // We cannot use the tokens generated with renewer yarn. Trying to renew
+    // those will fail with an access control issue. So create new tokens with the logged in
+    // user as renewer.
+    sparkConf.get(PRINCIPAL).map { renewer =>
+      val creds = new Credentials()
+      nnsToAccess(hadoopConf, sparkConf).foreach { dst =>
+        val dstFs = dst.getFileSystem(hadoopConf)
+        dstFs.addDelegationTokens(renewer, creds)
+      }
+      val t = creds.getAllTokens.asScala
+        .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
+        .head
+      val newExpiration = t.renew(hadoopConf)
+      val identifier = new DelegationTokenIdentifier()
+      identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
+      val interval = newExpiration - identifier.getIssueDate
+      logInfo(s"Renewal Interval is $interval")
+      interval
+    }
+  }
+
+  private def getTokenRenewer(conf: Configuration): String = {
+    val delegTokenRenewer = Master.getMasterPrincipal(conf)
+    logDebug("delegation token renewer is: " + delegTokenRenewer)
+    if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+      val errorMessage = "Can't get Master Kerberos principal for use as renewer"
+      logError(errorMessage)
+      throw new SparkException(errorMessage)
+    }
+
+    delegTokenRenewer
+  }
+
+  private def nnsToAccess(hadoopConf: Configuration, sparkConf: SparkConf): Set[Path] = {
+    sparkConf.get(NAMENODES_TO_ACCESS).map(new Path(_)).toSet +
+      sparkConf.get(STAGING_DIR).map(new Path(_))
+        .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org