You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/11/15 23:53:16 UTC

spark git commit: [SPARK-21842][MESOS] Support Kerberos ticket renewal and creation in Mesos

Repository: spark
Updated Branches:
  refs/heads/master 2014e7a78 -> 1e8233541


[SPARK-21842][MESOS] Support Kerberos ticket renewal and creation in Mesos

## What changes were proposed in this pull request?
tl;dr: Add a class, `MesosHadoopDelegationTokenManager` that updates delegation tokens on a schedule on the behalf of Spark Drivers. Broadcast renewed credentials to the executors.

## The problem
We recently added Kerberos support to Mesos-based Spark jobs as well as Secrets support to the Mesos Dispatcher (SPARK-16742, SPARK-20812, respectively). However the delegation tokens have a defined expiration. This poses a problem for long running Spark jobs (e.g. Spark Streaming applications). YARN has a solution for this where a thread is scheduled to renew the tokens they reach 75% of their way to expiration. It then writes the tokens to HDFS for the executors to find (uses a monotonically increasing suffix).

## This solution
We replace the current method in `CoarseGrainedSchedulerBackend` which used to discard the token renewal time with a protected method `fetchHadoopDelegationTokens`. Now the individual cluster backends are responsible for overriding this method to fetch and manage token renewal. The delegation tokens themselves, are still part of the `CoarseGrainedSchedulerBackend` as before.
In the case of Mesos renewed Credentials are broadcasted to the executors. This maintains all transfer of Credentials within Spark (as opposed to Spark-to-HDFS). It also does not require any writing of Credentials to disk. It also does not require any GC of old files.

## How was this patch tested?
Manually against a Kerberized HDFS cluster.

Thank you for the reviews.

Author: ArtRand <ar...@soe.ucsc.edu>

Closes #19272 from ArtRand/spark-21842-450-kerberos-ticket-renewal.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e823354
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e823354
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e823354

Branch: refs/heads/master
Commit: 1e82335413bc2384073ead0d6d581c862036d0f5
Parents: 2014e7a
Author: ArtRand <ar...@soe.ucsc.edu>
Authored: Wed Nov 15 15:53:05 2017 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Wed Nov 15 15:53:05 2017 -0800

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  28 +++-
 .../security/HadoopDelegationTokenManager.scala |   3 +
 .../executor/CoarseGrainedExecutorBackend.scala |   9 +-
 .../cluster/CoarseGrainedClusterMessage.scala   |   3 +
 .../cluster/CoarseGrainedSchedulerBackend.scala |  30 +---
 .../cluster/mesos/MesosClusterManager.scala     |   2 +-
 .../MesosCoarseGrainedSchedulerBackend.scala    |  19 ++-
 .../MesosHadoopDelegationTokenManager.scala     | 157 +++++++++++++++++++
 .../yarn/security/AMCredentialRenewer.scala     |  21 +--
 9 files changed, 228 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 1fa10ab..17c7319 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -140,13 +140,24 @@ class SparkHadoopUtil extends Logging {
     if (!new File(keytabFilename).exists()) {
       throw new SparkException(s"Keytab file: ${keytabFilename} does not exist")
     } else {
-      logInfo("Attempting to login to Kerberos" +
-        s" using principal: ${principalName} and keytab: ${keytabFilename}")
+      logInfo("Attempting to login to Kerberos " +
+        s"using principal: ${principalName} and keytab: ${keytabFilename}")
       UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
     }
   }
 
   /**
+   * Add or overwrite current user's credentials with serialized delegation tokens,
+   * also confirms correct hadoop configuration is set.
+   */
+  private[spark] def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) {
+    UserGroupInformation.setConfiguration(newConfiguration(sparkConf))
+    val creds = deserialize(tokens)
+    logInfo(s"Adding/updating delegation tokens ${dumpTokens(creds)}")
+    addCurrentUserCredentials(creds)
+  }
+
+  /**
    * Returns a function that can be called to find Hadoop FileSystem bytes read. If
    * getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will
    * return the bytes read on r since t.
@@ -463,6 +474,19 @@ object SparkHadoopUtil {
   }
 
   /**
+   * Given an expiration date (e.g. for Hadoop Delegation Tokens) return a the date
+   * when a given fraction of the duration until the expiration date has passed.
+   * Formula: current time + (fraction * (time until expiration))
+   * @param expirationDate Drop-dead expiration date
+   * @param fraction fraction of the time until expiration return
+   * @return Date when the fraction of the time until expiration has passed
+   */
+  private[spark] def getDateOfNextUpdate(expirationDate: Long, fraction: Double): Long = {
+    val ct = System.currentTimeMillis
+    (ct + (fraction * (expirationDate - ct))).toLong
+  }
+
+  /**
    * Returns a Configuration object with Spark configuration applied on top. Unlike
    * the instance method, this will always return a Configuration instance, and not a
    * cluster manager-specific type.

http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index 483d0de..116a686 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -109,6 +109,8 @@ private[spark] class HadoopDelegationTokenManager(
    * Writes delegation tokens to creds.  Delegation tokens are fetched from all registered
    * providers.
    *
+   * @param hadoopConf hadoop Configuration
+   * @param creds Credentials that will be updated in place (overwritten)
    * @return Time after which the fetched delegation tokens should be renewed.
    */
   def obtainDelegationTokens(
@@ -125,3 +127,4 @@ private[spark] class HadoopDelegationTokenManager(
     }.foldLeft(Long.MaxValue)(math.min)
   }
 }
+

http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index d27362a..acefc9d 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -123,6 +123,10 @@ private[spark] class CoarseGrainedExecutorBackend(
           executor.stop()
         }
       }.start()
+
+    case UpdateDelegationTokens(tokenBytes) =>
+      logInfo(s"Received tokens of ${tokenBytes.length} bytes")
+      SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)
   }
 
   override def onDisconnected(remoteAddress: RpcAddress): Unit = {
@@ -219,9 +223,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
         SparkHadoopUtil.get.startCredentialUpdater(driverConf)
       }
 
-      cfg.hadoopDelegationCreds.foreach { hadoopCreds =>
-        val creds = SparkHadoopUtil.get.deserialize(hadoopCreds)
-        SparkHadoopUtil.get.addCurrentUserCredentials(creds)
+      cfg.hadoopDelegationCreds.foreach { tokens =>
+        SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf)
       }
 
       val env = SparkEnv.createExecutorEnv(

http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 5d65731..e8b7fc0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -54,6 +54,9 @@ private[spark] object CoarseGrainedClusterMessages {
   case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
     with RegisterExecutorResponse
 
+  case class UpdateDelegationTokens(tokens: Array[Byte])
+    extends CoarseGrainedClusterMessage
+
   // Executors to driver
   case class RegisterExecutor(
       executorId: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 424e43b..22d9c4c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -24,11 +24,7 @@ import javax.annotation.concurrent.GuardedBy
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.concurrent.Future
 
-import org.apache.hadoop.security.UserGroupInformation
-
 import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc._
 import org.apache.spark.scheduler._
@@ -99,12 +95,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   // The num of current max ExecutorId used to re-register appMaster
   @volatile protected var currentExecutorIdCounter = 0
 
-  // hadoop token manager used by some sub-classes (e.g. Mesos)
-  def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None
-
-  // Hadoop delegation tokens to be sent to the executors.
-  val hadoopDelegationCreds: Option[Array[Byte]] = getHadoopDelegationCreds()
-
   class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
     extends ThreadSafeRpcEndpoint with Logging {
 
@@ -159,6 +149,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
         scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
           killExecutors(exec.toSeq, replace = true, force = true)
         }
+
+      case UpdateDelegationTokens(newDelegationTokens) =>
+        executorDataMap.values.foreach { ed =>
+          ed.executorEndpoint.send(UpdateDelegationTokens(newDelegationTokens))
+        }
     }
 
     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -236,7 +231,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
         val reply = SparkAppConfig(
           sparkProperties,
           SparkEnv.get.securityManager.getIOEncryptionKey(),
-          hadoopDelegationCreds)
+          fetchHadoopDelegationTokens())
         context.reply(reply)
     }
 
@@ -686,18 +681,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     true
   }
 
-  protected def getHadoopDelegationCreds(): Option[Array[Byte]] = {
-    if (UserGroupInformation.isSecurityEnabled && hadoopDelegationTokenManager.isDefined) {
-      hadoopDelegationTokenManager.map { manager =>
-        val creds = UserGroupInformation.getCurrentUser.getCredentials
-        val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
-        manager.obtainDelegationTokens(hadoopConf, creds)
-        SparkHadoopUtil.get.serialize(creds)
-      }
-    } else {
-      None
-    }
-  }
+  protected def fetchHadoopDelegationTokens(): Option[Array[Byte]] = { None }
 }
 
 private[spark] object CoarseGrainedSchedulerBackend {

http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala
index 911a085..da71f8f 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler.cluster.mesos
 
-import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.SparkContext
 import org.apache.spark.internal.config._
 import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 104ed01..c392061 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -22,15 +22,16 @@ import java.util.{Collections, List => JList}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 import java.util.concurrent.locks.ReentrantLock
 
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
-import org.apache.mesos.SchedulerDriver
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.concurrent.Future
 
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
+import org.apache.mesos.SchedulerDriver
+
 import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException, TaskState}
 import org.apache.spark.deploy.mesos.config._
-import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.internal.config
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
 import org.apache.spark.network.netty.SparkTransportConf
@@ -58,8 +59,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
     with org.apache.mesos.Scheduler with MesosSchedulerUtils {
 
-  override def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] =
-    Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration))
+  private lazy val hadoopDelegationTokenManager: MesosHadoopDelegationTokenManager =
+    new MesosHadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint)
 
   // Blacklist a slave after this many failures
   private val MAX_SLAVE_FAILURES = 2
@@ -772,6 +773,14 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       offer.getHostname
     }
   }
+
+  override def fetchHadoopDelegationTokens(): Option[Array[Byte]] = {
+    if (UserGroupInformation.isSecurityEnabled) {
+      Some(hadoopDelegationTokenManager.getTokens())
+    } else {
+      None
+    }
+  }
 }
 
 private class Slave(val hostname: String) {

http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
new file mode 100644
index 0000000..325dc17
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal interval has passed.
+ * The principal difference is that instead of writing the new credentials to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+    conf: SparkConf,
+    hadoopConfig: Configuration,
+    driverEndpoint: RpcEndpointRef)
+  extends Logging {
+
+  require(driverEndpoint != null, "DriverEndpoint is not initialized")
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+    ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
+
+  private val tokenManager: HadoopDelegationTokenManager =
+    new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+  private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+    try {
+      val creds = UserGroupInformation.getCurrentUser.getCredentials
+      val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+      val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+      logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}")
+      (SparkHadoopUtil.get.serialize(creds), rt)
+    } catch {
+      case e: Exception =>
+        logError(s"Failed to fetch Hadoop delegation tokens $e")
+        throw e
+    }
+  }
+
+  private val keytabFile: Option[String] = conf.get(config.KEYTAB)
+
+  scheduleTokenRenewal()
+
+  private def scheduleTokenRenewal(): Unit = {
+    if (keytabFile.isDefined) {
+      require(principal != null, "Principal is required for Keytab-based authentication")
+      logInfo(s"Using keytab: ${keytabFile.get} and principal $principal")
+    } else {
+      logInfo("Using ticket cache for Kerberos authentication, no token renewal.")
+      return
+    }
+
+    def scheduleRenewal(runnable: Runnable): Unit = {
+      val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+      if (remainingTime <= 0) {
+        logInfo("Credentials have expired, creating new ones now.")
+        runnable.run()
+      } else {
+        logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+        credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS)
+      }
+    }
+
+    val credentialRenewerRunnable =
+      new Runnable {
+        override def run(): Unit = {
+          try {
+            getNewDelegationTokens()
+            broadcastDelegationTokens(tokens)
+          } catch {
+            case e: Exception =>
+              // Log the error and try to write new tokens back in an hour
+              logWarning("Couldn't broadcast tokens, trying again in an hour", e)
+              credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS)
+              return
+          }
+          scheduleRenewal(this)
+        }
+      }
+    scheduleRenewal(credentialRenewerRunnable)
+  }
+
+  private def getNewDelegationTokens(): Unit = {
+    logInfo(s"Attempting to login to KDC with principal ${principal}")
+    // Get new delegation tokens by logging in with a new UGI inspired by AMCredentialRenewer.scala
+    // Don't protect against keytabFile being empty because it's guarded above.
+    val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytabFile.get)
+    logInfo("Successfully logged into KDC")
+    val tempCreds = ugi.getCredentials
+    val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+    val nextRenewalTime = ugi.doAs(new PrivilegedExceptionAction[Long] {
+      override def run(): Long = {
+        tokenManager.obtainDelegationTokens(hadoopConf, tempCreds)
+      }
+    })
+
+    val currTime = System.currentTimeMillis()
+    timeOfNextRenewal = if (nextRenewalTime <= currTime) {
+      logWarning(s"Next credential renewal time ($nextRenewalTime) is earlier than " +
+        s"current time ($currTime), which is unexpected, please check your credential renewal " +
+        "related configurations in the target services.")
+      currTime
+    } else {
+      SparkHadoopUtil.getDateOfNextUpdate(nextRenewalTime, 0.75)
+    }
+    logInfo(s"Time of next renewal is in ${timeOfNextRenewal - System.currentTimeMillis()} ms")
+
+    // Add the temp credentials back to the original ones.
+    UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
+    // update tokens for late or dynamically added executors
+    tokens = SparkHadoopUtil.get.serialize(tempCreds)
+  }
+
+  private def broadcastDelegationTokens(tokens: Array[Byte]) = {
+    logInfo("Sending new tokens to all executors")
+    driverEndpoint.send(UpdateDelegationTokens(tokens))
+  }
+
+  def getTokens(): Array[Byte] = {
+    tokens
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
index 68a2e9e..6134757 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.deploy.yarn.security
 
 import java.security.PrivilegedExceptionAction
-import java.util.concurrent.{Executors, TimeUnit}
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
@@ -25,6 +25,7 @@ import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
@@ -58,9 +59,8 @@ private[yarn] class AMCredentialRenewer(
 
   private var lastCredentialsFileSuffix = 0
 
-  private val credentialRenewer =
-    Executors.newSingleThreadScheduledExecutor(
-      ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+  private val credentialRenewerThread: ScheduledExecutorService =
+    ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh Thread")
 
   private val hadoopUtil = YarnSparkHadoopUtil.get
 
@@ -70,7 +70,7 @@ private[yarn] class AMCredentialRenewer(
   private val freshHadoopConf =
     hadoopUtil.getConfBypassingFSCache(hadoopConf, new Path(credentialsFile).toUri.getScheme)
 
-  @volatile private var timeOfNextRenewal = sparkConf.get(CREDENTIALS_RENEWAL_TIME)
+  @volatile private var timeOfNextRenewal: Long = sparkConf.get(CREDENTIALS_RENEWAL_TIME)
 
   /**
    * Schedule a login from the keytab and principal set using the --principal and --keytab
@@ -95,7 +95,7 @@ private[yarn] class AMCredentialRenewer(
         runnable.run()
       } else {
         logInfo(s"Scheduling login from keytab in $remainingTime millis.")
-        credentialRenewer.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS)
+        credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS)
       }
     }
 
@@ -111,7 +111,7 @@ private[yarn] class AMCredentialRenewer(
               // Log the error and try to write new tokens back in an hour
               logWarning("Failed to write out new credentials to HDFS, will try again in an " +
                 "hour! If this happens too often tasks will fail.", e)
-              credentialRenewer.schedule(this, 1, TimeUnit.HOURS)
+              credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS)
               return
           }
           scheduleRenewal(this)
@@ -195,8 +195,9 @@ private[yarn] class AMCredentialRenewer(
     } else {
       // Next valid renewal time is about 75% of credential renewal time, and update time is
       // slightly later than valid renewal time (80% of renewal time).
-      timeOfNextRenewal = ((nearestNextRenewalTime - currTime) * 0.75 + currTime).toLong
-      ((nearestNextRenewalTime - currTime) * 0.8 + currTime).toLong
+      timeOfNextRenewal =
+        SparkHadoopUtil.getDateOfNextUpdate(nearestNextRenewalTime, 0.75)
+      SparkHadoopUtil.getDateOfNextUpdate(nearestNextRenewalTime, 0.8)
     }
 
     // Add the temp credentials back to the original ones.
@@ -232,6 +233,6 @@ private[yarn] class AMCredentialRenewer(
   }
 
   def stop(): Unit = {
-    credentialRenewer.shutdown()
+    credentialRenewerThread.shutdown()
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org