You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/11/15 23:53:16 UTC
spark git commit: [SPARK-21842][MESOS] Support Kerberos ticket
renewal and creation in Mesos
Repository: spark
Updated Branches:
refs/heads/master 2014e7a78 -> 1e8233541
[SPARK-21842][MESOS] Support Kerberos ticket renewal and creation in Mesos
## What changes were proposed in this pull request?
tl;dr: Add a class, `MesosHadoopDelegationTokenManager` that updates delegation tokens on a schedule on the behalf of Spark Drivers. Broadcast renewed credentials to the executors.
## The problem
We recently added Kerberos support to Mesos-based Spark jobs as well as Secrets support to the Mesos Dispatcher (SPARK-16742, SPARK-20812, respectively). However the delegation tokens have a defined expiration. This poses a problem for long running Spark jobs (e.g. Spark Streaming applications). YARN has a solution for this where a thread is scheduled to renew the tokens they reach 75% of their way to expiration. It then writes the tokens to HDFS for the executors to find (uses a monotonically increasing suffix).
## This solution
We replace the current method in `CoarseGrainedSchedulerBackend` which used to discard the token renewal time with a protected method `fetchHadoopDelegationTokens`. Now the individual cluster backends are responsible for overriding this method to fetch and manage token renewal. The delegation tokens themselves, are still part of the `CoarseGrainedSchedulerBackend` as before.
In the case of Mesos renewed Credentials are broadcasted to the executors. This maintains all transfer of Credentials within Spark (as opposed to Spark-to-HDFS). It also does not require any writing of Credentials to disk. It also does not require any GC of old files.
## How was this patch tested?
Manually against a Kerberized HDFS cluster.
Thank you for the reviews.
Author: ArtRand <ar...@soe.ucsc.edu>
Closes #19272 from ArtRand/spark-21842-450-kerberos-ticket-renewal.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e823354
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e823354
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e823354
Branch: refs/heads/master
Commit: 1e82335413bc2384073ead0d6d581c862036d0f5
Parents: 2014e7a
Author: ArtRand <ar...@soe.ucsc.edu>
Authored: Wed Nov 15 15:53:05 2017 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Wed Nov 15 15:53:05 2017 -0800
----------------------------------------------------------------------
.../apache/spark/deploy/SparkHadoopUtil.scala | 28 +++-
.../security/HadoopDelegationTokenManager.scala | 3 +
.../executor/CoarseGrainedExecutorBackend.scala | 9 +-
.../cluster/CoarseGrainedClusterMessage.scala | 3 +
.../cluster/CoarseGrainedSchedulerBackend.scala | 30 +---
.../cluster/mesos/MesosClusterManager.scala | 2 +-
.../MesosCoarseGrainedSchedulerBackend.scala | 19 ++-
.../MesosHadoopDelegationTokenManager.scala | 157 +++++++++++++++++++
.../yarn/security/AMCredentialRenewer.scala | 21 +--
9 files changed, 228 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 1fa10ab..17c7319 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -140,13 +140,24 @@ class SparkHadoopUtil extends Logging {
if (!new File(keytabFilename).exists()) {
throw new SparkException(s"Keytab file: ${keytabFilename} does not exist")
} else {
- logInfo("Attempting to login to Kerberos" +
- s" using principal: ${principalName} and keytab: ${keytabFilename}")
+ logInfo("Attempting to login to Kerberos " +
+ s"using principal: ${principalName} and keytab: ${keytabFilename}")
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
}
}
/**
+ * Add or overwrite current user's credentials with serialized delegation tokens,
+ * also confirms correct hadoop configuration is set.
+ */
+ private[spark] def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) {
+ UserGroupInformation.setConfiguration(newConfiguration(sparkConf))
+ val creds = deserialize(tokens)
+ logInfo(s"Adding/updating delegation tokens ${dumpTokens(creds)}")
+ addCurrentUserCredentials(creds)
+ }
+
+ /**
* Returns a function that can be called to find Hadoop FileSystem bytes read. If
* getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will
* return the bytes read on r since t.
@@ -463,6 +474,19 @@ object SparkHadoopUtil {
}
/**
+ * Given an expiration date (e.g. for Hadoop Delegation Tokens) return a the date
+ * when a given fraction of the duration until the expiration date has passed.
+ * Formula: current time + (fraction * (time until expiration))
+ * @param expirationDate Drop-dead expiration date
+ * @param fraction fraction of the time until expiration return
+ * @return Date when the fraction of the time until expiration has passed
+ */
+ private[spark] def getDateOfNextUpdate(expirationDate: Long, fraction: Double): Long = {
+ val ct = System.currentTimeMillis
+ (ct + (fraction * (expirationDate - ct))).toLong
+ }
+
+ /**
* Returns a Configuration object with Spark configuration applied on top. Unlike
* the instance method, this will always return a Configuration instance, and not a
* cluster manager-specific type.
http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index 483d0de..116a686 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -109,6 +109,8 @@ private[spark] class HadoopDelegationTokenManager(
* Writes delegation tokens to creds. Delegation tokens are fetched from all registered
* providers.
*
+ * @param hadoopConf hadoop Configuration
+ * @param creds Credentials that will be updated in place (overwritten)
* @return Time after which the fetched delegation tokens should be renewed.
*/
def obtainDelegationTokens(
@@ -125,3 +127,4 @@ private[spark] class HadoopDelegationTokenManager(
}.foldLeft(Long.MaxValue)(math.min)
}
}
+
http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index d27362a..acefc9d 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -123,6 +123,10 @@ private[spark] class CoarseGrainedExecutorBackend(
executor.stop()
}
}.start()
+
+ case UpdateDelegationTokens(tokenBytes) =>
+ logInfo(s"Received tokens of ${tokenBytes.length} bytes")
+ SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
@@ -219,9 +223,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
SparkHadoopUtil.get.startCredentialUpdater(driverConf)
}
- cfg.hadoopDelegationCreds.foreach { hadoopCreds =>
- val creds = SparkHadoopUtil.get.deserialize(hadoopCreds)
- SparkHadoopUtil.get.addCurrentUserCredentials(creds)
+ cfg.hadoopDelegationCreds.foreach { tokens =>
+ SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf)
}
val env = SparkEnv.createExecutorEnv(
http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 5d65731..e8b7fc0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -54,6 +54,9 @@ private[spark] object CoarseGrainedClusterMessages {
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
with RegisterExecutorResponse
+ case class UpdateDelegationTokens(tokens: Array[Byte])
+ extends CoarseGrainedClusterMessage
+
// Executors to driver
case class RegisterExecutor(
executorId: String,
http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 424e43b..22d9c4c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -24,11 +24,7 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Future
-import org.apache.hadoop.security.UserGroupInformation
-
import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.rpc._
import org.apache.spark.scheduler._
@@ -99,12 +95,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// The num of current max ExecutorId used to re-register appMaster
@volatile protected var currentExecutorIdCounter = 0
- // hadoop token manager used by some sub-classes (e.g. Mesos)
- def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None
-
- // Hadoop delegation tokens to be sent to the executors.
- val hadoopDelegationCreds: Option[Array[Byte]] = getHadoopDelegationCreds()
-
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
@@ -159,6 +149,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
killExecutors(exec.toSeq, replace = true, force = true)
}
+
+ case UpdateDelegationTokens(newDelegationTokens) =>
+ executorDataMap.values.foreach { ed =>
+ ed.executorEndpoint.send(UpdateDelegationTokens(newDelegationTokens))
+ }
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -236,7 +231,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
val reply = SparkAppConfig(
sparkProperties,
SparkEnv.get.securityManager.getIOEncryptionKey(),
- hadoopDelegationCreds)
+ fetchHadoopDelegationTokens())
context.reply(reply)
}
@@ -686,18 +681,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
true
}
- protected def getHadoopDelegationCreds(): Option[Array[Byte]] = {
- if (UserGroupInformation.isSecurityEnabled && hadoopDelegationTokenManager.isDefined) {
- hadoopDelegationTokenManager.map { manager =>
- val creds = UserGroupInformation.getCurrentUser.getCredentials
- val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
- manager.obtainDelegationTokens(hadoopConf, creds)
- SparkHadoopUtil.get.serialize(creds)
- }
- } else {
- None
- }
- }
+ protected def fetchHadoopDelegationTokens(): Option[Array[Byte]] = { None }
}
private[spark] object CoarseGrainedSchedulerBackend {
http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala
index 911a085..da71f8f 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler.cluster.mesos
-import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.SparkContext
import org.apache.spark.internal.config._
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 104ed01..c392061 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -22,15 +22,16 @@ import java.util.{Collections, List => JList}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.locks.ReentrantLock
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
-import org.apache.mesos.SchedulerDriver
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.Future
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
+import org.apache.mesos.SchedulerDriver
+
import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException, TaskState}
import org.apache.spark.deploy.mesos.config._
-import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.network.netty.SparkTransportConf
@@ -58,8 +59,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
with org.apache.mesos.Scheduler with MesosSchedulerUtils {
- override def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] =
- Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration))
+ private lazy val hadoopDelegationTokenManager: MesosHadoopDelegationTokenManager =
+ new MesosHadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint)
// Blacklist a slave after this many failures
private val MAX_SLAVE_FAILURES = 2
@@ -772,6 +773,14 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
offer.getHostname
}
}
+
+ override def fetchHadoopDelegationTokens(): Option[Array[Byte]] = {
+ if (UserGroupInformation.isSecurityEnabled) {
+ Some(hadoopDelegationTokenManager.getTokens())
+ } else {
+ None
+ }
+ }
}
private class Slave(val hostname: String) {
http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
new file mode 100644
index 0000000..325dc17
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal interval has passed.
+ * The principal difference is that instead of writing the new credentials to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+ conf: SparkConf,
+ hadoopConfig: Configuration,
+ driverEndpoint: RpcEndpointRef)
+ extends Logging {
+
+ require(driverEndpoint != null, "DriverEndpoint is not initialized")
+
+ private val credentialRenewerThread: ScheduledExecutorService =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
+
+ private val tokenManager: HadoopDelegationTokenManager =
+ new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+ private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+ private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+ try {
+ val creds = UserGroupInformation.getCurrentUser.getCredentials
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+ val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+ logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}")
+ (SparkHadoopUtil.get.serialize(creds), rt)
+ } catch {
+ case e: Exception =>
+ logError(s"Failed to fetch Hadoop delegation tokens $e")
+ throw e
+ }
+ }
+
+ private val keytabFile: Option[String] = conf.get(config.KEYTAB)
+
+ scheduleTokenRenewal()
+
+ private def scheduleTokenRenewal(): Unit = {
+ if (keytabFile.isDefined) {
+ require(principal != null, "Principal is required for Keytab-based authentication")
+ logInfo(s"Using keytab: ${keytabFile.get} and principal $principal")
+ } else {
+ logInfo("Using ticket cache for Kerberos authentication, no token renewal.")
+ return
+ }
+
+ def scheduleRenewal(runnable: Runnable): Unit = {
+ val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+ if (remainingTime <= 0) {
+ logInfo("Credentials have expired, creating new ones now.")
+ runnable.run()
+ } else {
+ logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+ credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS)
+ }
+ }
+
+ val credentialRenewerRunnable =
+ new Runnable {
+ override def run(): Unit = {
+ try {
+ getNewDelegationTokens()
+ broadcastDelegationTokens(tokens)
+ } catch {
+ case e: Exception =>
+ // Log the error and try to write new tokens back in an hour
+ logWarning("Couldn't broadcast tokens, trying again in an hour", e)
+ credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS)
+ return
+ }
+ scheduleRenewal(this)
+ }
+ }
+ scheduleRenewal(credentialRenewerRunnable)
+ }
+
+ private def getNewDelegationTokens(): Unit = {
+ logInfo(s"Attempting to login to KDC with principal ${principal}")
+ // Get new delegation tokens by logging in with a new UGI inspired by AMCredentialRenewer.scala
+ // Don't protect against keytabFile being empty because it's guarded above.
+ val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytabFile.get)
+ logInfo("Successfully logged into KDC")
+ val tempCreds = ugi.getCredentials
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+ val nextRenewalTime = ugi.doAs(new PrivilegedExceptionAction[Long] {
+ override def run(): Long = {
+ tokenManager.obtainDelegationTokens(hadoopConf, tempCreds)
+ }
+ })
+
+ val currTime = System.currentTimeMillis()
+ timeOfNextRenewal = if (nextRenewalTime <= currTime) {
+ logWarning(s"Next credential renewal time ($nextRenewalTime) is earlier than " +
+ s"current time ($currTime), which is unexpected, please check your credential renewal " +
+ "related configurations in the target services.")
+ currTime
+ } else {
+ SparkHadoopUtil.getDateOfNextUpdate(nextRenewalTime, 0.75)
+ }
+ logInfo(s"Time of next renewal is in ${timeOfNextRenewal - System.currentTimeMillis()} ms")
+
+ // Add the temp credentials back to the original ones.
+ UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
+ // update tokens for late or dynamically added executors
+ tokens = SparkHadoopUtil.get.serialize(tempCreds)
+ }
+
+ private def broadcastDelegationTokens(tokens: Array[Byte]) = {
+ logInfo("Sending new tokens to all executors")
+ driverEndpoint.send(UpdateDelegationTokens(tokens))
+ }
+
+ def getTokens(): Array[Byte] = {
+ tokens
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/spark/blob/1e823354/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
index 68a2e9e..6134757 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.yarn.security
import java.security.PrivilegedExceptionAction
-import java.util.concurrent.{Executors, TimeUnit}
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -25,6 +25,7 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
@@ -58,9 +59,8 @@ private[yarn] class AMCredentialRenewer(
private var lastCredentialsFileSuffix = 0
- private val credentialRenewer =
- Executors.newSingleThreadScheduledExecutor(
- ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+ private val credentialRenewerThread: ScheduledExecutorService =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh Thread")
private val hadoopUtil = YarnSparkHadoopUtil.get
@@ -70,7 +70,7 @@ private[yarn] class AMCredentialRenewer(
private val freshHadoopConf =
hadoopUtil.getConfBypassingFSCache(hadoopConf, new Path(credentialsFile).toUri.getScheme)
- @volatile private var timeOfNextRenewal = sparkConf.get(CREDENTIALS_RENEWAL_TIME)
+ @volatile private var timeOfNextRenewal: Long = sparkConf.get(CREDENTIALS_RENEWAL_TIME)
/**
* Schedule a login from the keytab and principal set using the --principal and --keytab
@@ -95,7 +95,7 @@ private[yarn] class AMCredentialRenewer(
runnable.run()
} else {
logInfo(s"Scheduling login from keytab in $remainingTime millis.")
- credentialRenewer.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS)
+ credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS)
}
}
@@ -111,7 +111,7 @@ private[yarn] class AMCredentialRenewer(
// Log the error and try to write new tokens back in an hour
logWarning("Failed to write out new credentials to HDFS, will try again in an " +
"hour! If this happens too often tasks will fail.", e)
- credentialRenewer.schedule(this, 1, TimeUnit.HOURS)
+ credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS)
return
}
scheduleRenewal(this)
@@ -195,8 +195,9 @@ private[yarn] class AMCredentialRenewer(
} else {
// Next valid renewal time is about 75% of credential renewal time, and update time is
// slightly later than valid renewal time (80% of renewal time).
- timeOfNextRenewal = ((nearestNextRenewalTime - currTime) * 0.75 + currTime).toLong
- ((nearestNextRenewalTime - currTime) * 0.8 + currTime).toLong
+ timeOfNextRenewal =
+ SparkHadoopUtil.getDateOfNextUpdate(nearestNextRenewalTime, 0.75)
+ SparkHadoopUtil.getDateOfNextUpdate(nearestNextRenewalTime, 0.8)
}
// Add the temp credentials back to the original ones.
@@ -232,6 +233,6 @@ private[yarn] class AMCredentialRenewer(
}
def stop(): Unit = {
- credentialRenewer.shutdown()
+ credentialRenewerThread.shutdown()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org