You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by vanzin <gi...@git.apache.org> on 2018/02/22 02:20:33 UTC

[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

GitHub user vanzin opened a pull request:

    https://github.com/apache/spark/pull/20657

    [SPARK-23361][yarn] Allow AM to restart after initial tokens expire.

    Currently, the Spark AM relies on the initial set of tokens created by
    the submission client to be able to talk to HDFS and other services that
    require delegation tokens. This means that after those tokens expire, a
    new AM will fail to start (e.g. when there is an application failure and
    re-attempts are enabled).
    
    This PR makes it so that the first thing the AM does when the user provides
    a principal and keytab is to create new delegation tokens for use. This
    makes sure that the AM can be started irrespective of how old the original
    token set is. It also allows all of the token management to be done by the
    AM - there is no need for the submission client to set configuration values
    to tell the AM when to renew tokens.
    
    Note that even though in this case the AM will not be using the delegation
    tokens created by the submission client, those tokens still need to be provided
    to YARN, since they are used to do log aggregation.
    
    To be able to re-use the code in the AMCredentialRenewal for the above
    purposes, I refactored that class a bit so that it can fetch tokens into
    a pre-defined UGI, insted of always logging in.
    
    Another issue with re-attempts is that, after the fix that allows the AM
    to restart correctly, new executors would get confused about when to
    update credentials, because the credential updater used the update time
    initially set up by the submission code. This could make the executor
    fail to update credentials in time, since that value would be very out
    of date in the situation described in the bug.
    
    To fix that, I changed the YARN code to use the new RPC-based mechanism
    for distributing tokens to executors. This allowed the old credential
    updater code to be removed, and a lot of code in the renewer to be
    simplified.
    
    I also made two currently hardcoded values (the renewal time ratio, and
    the retry wait) configurable; while this probably never needs to be set
    by anyone in a production environment, it helps with testing; that's also
    why they're not documented.
    
    Tested on real cluster with a specially crafted application to test this
    functionality: checked proper access to HDFS, Hive and HBase in cluster
    mode with token renewal on and AM restarts. Tested things still work in
    client mode too.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vanzin/spark SPARK-23361

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20657.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20657
    
----
commit 2c3448dd3aa4071234a65a1c9317b1a3c4fe8d24
Author: Marcelo Vanzin <va...@...>
Date:   2018-02-09T22:33:05Z

    [SPARK-23361][yarn] Allow AM to restart after initial tokens expire.
    
    Currently, the Spark AM relies on the initial set of tokens created by
    the submission client to be able to talk to HDFS and other services that
    require delegation tokens. This means that after those tokens expire, a
    new AM will fail to start (e.g. when there is an application failure and
    re-attempts are enabled).
    
    This PR makes it so that the first thing the AM does when the user provides
    a principal and keytab is to create new delegation tokens for use. This
    makes sure that the AM can be started irrespective of how old the original
    token set is. It also allows all of the token management to be done by the
    AM - there is no need for the submission client to set configuration values
    to tell the AM when to renew tokens.
    
    Note that even though in this case the AM will not be using the delegation
    tokens created by the submission client, those tokens still need to be provided
    to YARN, since they are used to do log aggregation.
    
    To be able to re-use the code in the AMCredentialRenewal for the above
    purposes, I refactored that class a bit so that it can fetch tokens into
    a pre-defined UGI, insted of always logging in.
    
    Another issue with re-attempts is that, after the fix that allows the AM
    to restart correctly, new executors would get confused about when to
    update credentials, because the credential updater used the update time
    initially set up by the submission code. This could make the executor
    fail to update credentials in time, since that value would be very out
    of date in the situation described in the bug.
    
    To fix that, I changed the YARN code to use the new RPC-based mechanism
    for distributing tokens to executors. This allowed the old credential
    updater code to be removed, and a lot of code in the renewer to be
    simplified.
    
    I also made two currently hardcoded values (the renewal time ratio, and
    the retry wait) configurable; while this probably never needs to be set
    by anyone in a production environment, it helps with testing; that's also
    why they're not documented.
    
    Tested on real cluster with a specially crafted application to test this
    functionality: checked proper access to HDFS, Hive and HBase in cluster
    mode with token renewal on and AM restarts. Tested things still work in
    client mode too.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    **[Test build #87621 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87621/testReport)** for PR 20657 at commit [`2c3448d`](https://github.com/apache/spark/commit/2c3448dd3aa4071234a65a1c9317b1a3c4fe8d24).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Ping (also adding @squito to try to move this forward).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20657#discussion_r173522390
  
    --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala ---
    @@ -18,221 +18,160 @@ package org.apache.spark.deploy.yarn.security
     
     import java.security.PrivilegedExceptionAction
     import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
     
     import org.apache.hadoop.conf.Configuration
    -import org.apache.hadoop.fs.{FileSystem, Path}
    -import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
     import org.apache.spark.deploy.SparkHadoopUtil
    -import org.apache.spark.deploy.security.HadoopDelegationTokenManager
    -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
     import org.apache.spark.deploy.yarn.config._
     import org.apache.spark.internal.Logging
     import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
     import org.apache.spark.util.ThreadUtils
     
     /**
    - * The following methods are primarily meant to make sure long-running apps like Spark
    - * Streaming apps can run without interruption while accessing secured services. The
    - * scheduleLoginFromKeytab method is called on the AM to get the new credentials.
    - * This method wakes up a thread that logs into the KDC
    - * once 75% of the renewal interval of the original credentials used for the container
    - * has elapsed. It then obtains new credentials and writes them to HDFS in a
    - * pre-specified location - the prefix of which is specified in the sparkConf by
    - * spark.yarn.credentials.file (so the file(s) would be named c-timestamp1-1, c-timestamp2-2 etc.
    - * - each update goes to a new file, with a monotonically increasing suffix), also the
    - * timestamp1, timestamp2 here indicates the time of next update for CredentialUpdater.
    - * After this, the credentials are renewed once 75% of the new tokens renewal interval has elapsed.
    + * A manager tasked with periodically updating delegation tokens needed by the application.
      *
    - * On the executor and driver (yarn client mode) side, the updateCredentialsIfRequired method is
    - * called once 80% of the validity of the original credentials has elapsed. At that time the
    - * executor finds the credentials file with the latest timestamp and checks if it has read those
    - * credentials before (by keeping track of the suffix of the last file it read). If a new file has
    - * appeared, it will read the credentials and update the currently running UGI with it. This
    - * process happens again once 80% of the validity of this has expired.
    + * This manager is meant to make sure long-running apps (such as Spark Streaming apps) can run
    + * without interruption while accessing secured services. It periodically logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * This class will manage the kerberos login, by renewing the TGT when needed. Because the UGI API
    + * does not expose the TTL of the TGT, a configuration controls how often to check that a relogin is
    + * necessary. This is done reasonably often since the check is a no-op when the relogin is not yet
    + * needed. The check period can be overridden in the configuration.
    + *
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
      */
     private[yarn] class AMCredentialRenewer(
         sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
    +    hadoopConf: Configuration) extends Logging {
     
    -  private var lastCredentialsFileSuffix = 0
    +  private val principal = sparkConf.get(PRINCIPAL).get
    +  private val keytab = sparkConf.get(KEYTAB).get
    +  private val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
     
    -  private val credentialRenewerThread: ScheduledExecutorService =
    +  private val renewalExecutor: ScheduledExecutorService =
         ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh Thread")
     
    -  private val hadoopUtil = SparkHadoopUtil.get
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
     
    -  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
    -  private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
    -  private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
    -  private val freshHadoopConf =
    --- End diff --
    
    Because the new code doesn't need to write to HDFS at all.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    **[Test build #87605 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87605/testReport)** for PR 20657 at commit [`2c3448d`](https://github.com/apache/spark/commit/2c3448dd3aa4071234a65a1c9317b1a3c4fe8d24).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20657#discussion_r173383032
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -520,4 +520,16 @@ package object config {
           .checkValue(v => v > 0, "The threshold should be positive.")
           .createWithDefault(10000000)
     
    +  private[spark] val CREDENTIALS_RENEWAL_INTERVAL_RATIO =
    +    ConfigBuilder("spark.security.credentials.renewalRatio")
    +      .doc("Ratio of the credential's expiration time when Spark should fetch new credentials.")
    +      .doubleConf
    +      .createWithDefault(0.75d)
    +
    +  private[spark] val CREDENTIALS_RENEWAL_RETRY_WAIT =
    +    ConfigBuilder("spark.security.credentials.retryWait")
    +      .doc("How long to wait before retrying to fetch new credentials after a failure.")
    +      .timeConf(TimeUnit.SECONDS)
    +      .createWithDefaultString("1h")
    --- End diff --
    
    Is this "1h" too big if the token expire time is small, for example 8 hours, or even smaller, which will make the next retry directly fail.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20657#discussion_r173078047
  
    --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala ---
    @@ -18,221 +18,160 @@ package org.apache.spark.deploy.yarn.security
     
     import java.security.PrivilegedExceptionAction
     import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
     
     import org.apache.hadoop.conf.Configuration
    -import org.apache.hadoop.fs.{FileSystem, Path}
    -import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
     import org.apache.spark.deploy.SparkHadoopUtil
    -import org.apache.spark.deploy.security.HadoopDelegationTokenManager
    -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
     import org.apache.spark.deploy.yarn.config._
     import org.apache.spark.internal.Logging
     import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
     import org.apache.spark.util.ThreadUtils
     
     /**
    - * The following methods are primarily meant to make sure long-running apps like Spark
    - * Streaming apps can run without interruption while accessing secured services. The
    - * scheduleLoginFromKeytab method is called on the AM to get the new credentials.
    - * This method wakes up a thread that logs into the KDC
    - * once 75% of the renewal interval of the original credentials used for the container
    - * has elapsed. It then obtains new credentials and writes them to HDFS in a
    - * pre-specified location - the prefix of which is specified in the sparkConf by
    - * spark.yarn.credentials.file (so the file(s) would be named c-timestamp1-1, c-timestamp2-2 etc.
    - * - each update goes to a new file, with a monotonically increasing suffix), also the
    - * timestamp1, timestamp2 here indicates the time of next update for CredentialUpdater.
    - * After this, the credentials are renewed once 75% of the new tokens renewal interval has elapsed.
    + * A manager tasked with periodically updating delegation tokens needed by the application.
      *
    - * On the executor and driver (yarn client mode) side, the updateCredentialsIfRequired method is
    - * called once 80% of the validity of the original credentials has elapsed. At that time the
    - * executor finds the credentials file with the latest timestamp and checks if it has read those
    - * credentials before (by keeping track of the suffix of the last file it read). If a new file has
    - * appeared, it will read the credentials and update the currently running UGI with it. This
    - * process happens again once 80% of the validity of this has expired.
    + * This manager is meant to make sure long-running apps (such as Spark Streaming apps) can run
    + * without interruption while accessing secured services. It periodically logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * This class will manage the kerberos login, by renewing the TGT when needed. Because the UGI API
    + * does not expose the TTL of the TGT, a configuration controls how often to check that a relogin is
    + * necessary. This is done reasonably often since the check is a no-op when the relogin is not yet
    + * needed. The check period can be overridden in the configuration.
    + *
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
      */
     private[yarn] class AMCredentialRenewer(
         sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
    +    hadoopConf: Configuration) extends Logging {
     
    -  private var lastCredentialsFileSuffix = 0
    +  private val principal = sparkConf.get(PRINCIPAL).get
    +  private val keytab = sparkConf.get(KEYTAB).get
    +  private val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
     
    -  private val credentialRenewerThread: ScheduledExecutorService =
    +  private val renewalExecutor: ScheduledExecutorService =
         ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh Thread")
     
    -  private val hadoopUtil = SparkHadoopUtil.get
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
     
    -  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
    -  private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
    -  private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
    -  private val freshHadoopConf =
    -    hadoopUtil.getConfBypassingFSCache(hadoopConf, new Path(credentialsFile).toUri.getScheme)
    +  private val renewalTask = new Runnable() {
    +    override def run(): Unit = {
    +      updateTokensTask()
    +    }
    +  }
     
    -  @volatile private var timeOfNextRenewal: Long = sparkConf.get(CREDENTIALS_RENEWAL_TIME)
    +  def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
    +  }
     
       /**
    -   * Schedule a login from the keytab and principal set using the --principal and --keytab
    -   * arguments to spark-submit. This login happens only when the credentials of the current user
    -   * are about to expire. This method reads spark.yarn.principal and spark.yarn.keytab from
    -   * SparkConf to do the login. This method is a no-op in non-YARN mode.
    +   * Start the token renewer. Upon start, the renewer will:
        *
    +   * - log in the configured user, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * @return The newly logged in user.
        */
    -  private[spark] def scheduleLoginFromKeytab(): Unit = {
    -    val principal = sparkConf.get(PRINCIPAL).get
    -    val keytab = sparkConf.get(KEYTAB).get
    -
    -    /**
    -     * Schedule re-login and creation of new credentials. If credentials have already expired, this
    -     * method will synchronously create new ones.
    -     */
    -    def scheduleRenewal(runnable: Runnable): Unit = {
    -      // Run now!
    -      val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
    -      if (remainingTime <= 0) {
    -        logInfo("Credentials have expired, creating new ones now.")
    -        runnable.run()
    -      } else {
    -        logInfo(s"Scheduling login from keytab in $remainingTime millis.")
    -        credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS)
    +  def start(): UserGroupInformation = {
    +    val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
    +    val ugi = doLogin()
    +
    +    val tgtRenewalTask = new Runnable() {
    +      override def run(): Unit = {
    +        ugi.checkTGTAndReloginFromKeytab()
           }
         }
    +    val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +    renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
    +      TimeUnit.SECONDS)
     
    -    // This thread periodically runs on the AM to update the credentials on HDFS.
    -    val credentialRenewerRunnable =
    -      new Runnable {
    -        override def run(): Unit = {
    -          try {
    -            writeNewCredentialsToHDFS(principal, keytab)
    -            cleanupOldFiles()
    -          } catch {
    -            case e: Exception =>
    -              // Log the error and try to write new tokens back in an hour
    -              logWarning("Failed to write out new credentials to HDFS, will try again in an " +
    -                "hour! If this happens too often tasks will fail.", e)
    -              credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS)
    -              return
    -          }
    -          scheduleRenewal(this)
    -        }
    -      }
    -    // Schedule update of credentials. This handles the case of updating the credentials right now
    -    // as well, since the renewal interval will be 0, and the thread will get scheduled
    -    // immediately.
    -    scheduleRenewal(credentialRenewerRunnable)
    +    val creds = obtainTokensAndScheduleRenewal(ugi)
    +    ugi.addCredentials(creds)
    +
    +    // Transfer the original user's tokens to the new user, since that's needed to connect to
    +    // YARN. Explicitly avoid overwriting tokens that already exist in the current user's
    +    // credentials, since those were freshly obtained above (see SPARK-23361).
    +    val existing = ugi.getCredentials()
    +    existing.mergeAll(originalCreds)
    +    ugi.addCredentials(existing)
    +
    +    ugi
    +  }
    +
    +  def stop(): Unit = {
    +    renewalExecutor.shutdown()
    +  }
    +
    +  private def scheduleRenewal(delay: Long): Unit = {
    +    val _delay = math.max(0, delay)
    +    logInfo(s"Scheduling login from keytab in ${UIUtils.formatDuration(delay)}.")
    +    renewalExecutor.schedule(renewalTask, _delay, TimeUnit.MILLISECONDS)
       }
     
    -  // Keeps only files that are newer than daysToKeepFiles days, and deletes everything else. At
    -  // least numFilesToKeep files are kept for safety
    -  private def cleanupOldFiles(): Unit = {
    -    import scala.concurrent.duration._
    +  /**
    +   * Periodic task to login to the KDC and create new delegation tokens. Re-schedules itself
    +   * to fetch the next set of tokens when needed.
    +   */
    +  private def updateTokensTask(): Unit = {
         try {
    -      val remoteFs = FileSystem.get(freshHadoopConf)
    -      val credentialsPath = new Path(credentialsFile)
    -      val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles.days).toMillis
    -      hadoopUtil.listFilesSorted(
    -        remoteFs, credentialsPath.getParent,
    -        credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
    -        .dropRight(numFilesToKeep)
    -        .takeWhile(_.getModificationTime < thresholdTime)
    -        .foreach(x => remoteFs.delete(x.getPath, true))
    +      val freshUGI = doLogin()
    +      val creds = obtainTokensAndScheduleRenewal(freshUGI)
    +      val tokens = SparkHadoopUtil.get.serialize(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        logInfo("Updating delegation tokens.")
    +        driver.send(UpdateDelegationTokens(tokens))
    +      } else {
    +        // This shouldn't really happen, since the driver should register way before tokens expire
    +        // (or the AM should time out the application).
    +        logWarning("Delegation tokens close to expiration but no driver has registered yet.")
    +        SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
    +      }
         } catch {
    -      // Such errors are not fatal, so don't throw. Make sure they are logged though
           case e: Exception =>
    -        logWarning("Error while attempting to cleanup old credentials. If you are seeing many " +
    -          "such warnings there may be an issue with your HDFS cluster.", e)
    +        val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
    +        logWarning(s"Failed to update tokens, will try again in ${UIUtils.formatDuration(delay)}!" +
    +          " If this happens too often tasks will fail.", e)
    +        scheduleRenewal(delay)
         }
       }
     
    -  private def writeNewCredentialsToHDFS(principal: String, keytab: String): Unit = {
    -    // Keytab is copied by YARN to the working directory of the AM, so full path is
    -    // not needed.
    -
    -    // HACK:
    --- End diff --
    
    Do we still make this hack work?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20657#discussion_r172954706
  
    --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---
    @@ -93,11 +93,24 @@ private[spark] class Client(
     
       private val distCacheMgr = new ClientDistributedCacheManager()
     
    -  private var loginFromKeytab = false
    -  private var principal: String = null
    -  private var keytab: String = null
    -  private var credentials: Credentials = null
    -  private var amKeytabFileName: String = null
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +  private val loginFromKeytab = principal != null
    +  private val amKeytabFileName: String = {
    +    require((principal == null) == (keytab == null),
    +      "Both principal and keytab must be defined, or neither.")
    +    if (loginFromKeytab) {
    +      logInfo(s"Kerberos credentials: principal = $principal, keytab = $keytab")
    +      // Generate a file name that can be used for the keytab file, that does not conflict
    +      // with any user file.
    +      new File(keytab).getName() + "-" + UUID.randomUUID().toString
    +    } else {
    +      null
    +    }
    +  }
    +
    +  // Defensive copy of the credentials
    +  private val credentials = new Credentials(UserGroupInformation.getCurrentUser.getCredentials)
    --- End diff --
    
    this appears to be unused.  did you mean to use this in `setupSecurityToken()`?  not really sure what you're defending against with the copy, perhaps that should go in the comment as well ... I see it was just in the old code though.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1011/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1646/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Thanks, merging to master branch!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88427/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    I'm really sorry about the delay @vanzin @squito . I will take another review today and back to you.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    @jerryshao I know you said you wanted to take a deeper look, but its been a while.  otherwise I'll merge in the next day or two


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1296/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    **[Test build #87621 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87621/testReport)** for PR 20657 at commit [`2c3448d`](https://github.com/apache/spark/commit/2c3448dd3aa4071234a65a1c9317b1a3c4fe8d24).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Will review it soon. 😄 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    **[Test build #87605 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87605/testReport)** for PR 20657 at commit [`2c3448d`](https://github.com/apache/spark/commit/2c3448dd3aa4071234a65a1c9317b1a3c4fe8d24).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20657#discussion_r172321966
  
    --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---
    @@ -1009,7 +987,7 @@ private[spark] class Client(
       }
     
       def setupCredentials(): Unit = {
    -    loginFromKeytab = sparkConf.contains(PRINCIPAL.key)
    +    loginFromKeytab = sparkConf.contains(PRINCIPAL)
    --- End diff --
    
    if a user only specifies keytab, but no principal, I don't think this will fail in a friendly way.  This will be a no-op, so it'll succeed, and then in ApplicationMaster / AMCredentialRenewer, you'll get an error trying to do `sparkConf.get(PRINCIPAL).get`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Ping


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87983/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    I plan to open a bug for cleaning things up after this code goes in.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20657#discussion_r172319244
  
    --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala ---
    @@ -18,221 +18,156 @@ package org.apache.spark.deploy.yarn.security
     
     import java.security.PrivilegedExceptionAction
     import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
     
     import org.apache.hadoop.conf.Configuration
    -import org.apache.hadoop.fs.{FileSystem, Path}
    -import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
     import org.apache.spark.deploy.SparkHadoopUtil
    -import org.apache.spark.deploy.security.HadoopDelegationTokenManager
    -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
     import org.apache.spark.deploy.yarn.config._
     import org.apache.spark.internal.Logging
     import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
     import org.apache.spark.util.ThreadUtils
     
     /**
    - * The following methods are primarily meant to make sure long-running apps like Spark
    - * Streaming apps can run without interruption while accessing secured services. The
    - * scheduleLoginFromKeytab method is called on the AM to get the new credentials.
    - * This method wakes up a thread that logs into the KDC
    - * once 75% of the renewal interval of the original credentials used for the container
    - * has elapsed. It then obtains new credentials and writes them to HDFS in a
    - * pre-specified location - the prefix of which is specified in the sparkConf by
    - * spark.yarn.credentials.file (so the file(s) would be named c-timestamp1-1, c-timestamp2-2 etc.
    - * - each update goes to a new file, with a monotonically increasing suffix), also the
    - * timestamp1, timestamp2 here indicates the time of next update for CredentialUpdater.
    - * After this, the credentials are renewed once 75% of the new tokens renewal interval has elapsed.
    + * A manager tasked with periodically updating delegation tokens needed by the application.
      *
    - * On the executor and driver (yarn client mode) side, the updateCredentialsIfRequired method is
    - * called once 80% of the validity of the original credentials has elapsed. At that time the
    - * executor finds the credentials file with the latest timestamp and checks if it has read those
    - * credentials before (by keeping track of the suffix of the last file it read). If a new file has
    - * appeared, it will read the credentials and update the currently running UGI with it. This
    - * process happens again once 80% of the validity of this has expired.
    + * This manager is meant to make sure long-running apps (such as Spark Streaming apps) can run
    + * without interruption while accessing secured services. It periodically logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are both added to the current user, and also sent to the Spark driver
    + * once it's registered with the AM. The driver is tasked with distributing the tokens to other
    + * processes that might need them.
      */
     private[yarn] class AMCredentialRenewer(
         sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
    +    hadoopConf: Configuration) extends Logging {
     
    -  private var lastCredentialsFileSuffix = 0
    +  private val principal = sparkConf.get(PRINCIPAL).get
    +  private val keytab = sparkConf.get(KEYTAB).get
    +  private val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
     
    -  private val credentialRenewerThread: ScheduledExecutorService =
    +  private val renewalExecutor: ScheduledExecutorService =
         ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh Thread")
     
    -  private val hadoopUtil = SparkHadoopUtil.get
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
     
    -  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
    -  private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
    -  private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
    -  private val freshHadoopConf =
    -    hadoopUtil.getConfBypassingFSCache(hadoopConf, new Path(credentialsFile).toUri.getScheme)
    +  private val renewalTask = new Runnable() {
    +    override def run(): Unit = {
    +      updateTokensTask()
    +    }
    +  }
     
    -  @volatile private var timeOfNextRenewal: Long = sparkConf.get(CREDENTIALS_RENEWAL_TIME)
    +  def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
    +  }
     
       /**
    -   * Schedule a login from the keytab and principal set using the --principal and --keytab
    -   * arguments to spark-submit. This login happens only when the credentials of the current user
    -   * are about to expire. This method reads spark.yarn.principal and spark.yarn.keytab from
    -   * SparkConf to do the login. This method is a no-op in non-YARN mode.
    +   * Start the token renewer. Upon start, the renewer will:
    +   *
    +   * - log in the configured user, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
        *
    +   * @return The newly logged in user.
        */
    -  private[spark] def scheduleLoginFromKeytab(): Unit = {
    -    val principal = sparkConf.get(PRINCIPAL).get
    -    val keytab = sparkConf.get(KEYTAB).get
    -
    -    /**
    -     * Schedule re-login and creation of new credentials. If credentials have already expired, this
    -     * method will synchronously create new ones.
    -     */
    -    def scheduleRenewal(runnable: Runnable): Unit = {
    -      // Run now!
    -      val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
    -      if (remainingTime <= 0) {
    -        logInfo("Credentials have expired, creating new ones now.")
    -        runnable.run()
    -      } else {
    -        logInfo(s"Scheduling login from keytab in $remainingTime millis.")
    -        credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS)
    +  def start(): UserGroupInformation = {
    +    val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
    +    val ugi = doLogin()
    +
    +    val tgtRenewalTask = new Runnable() {
    +      override def run(): Unit = {
    +        ugi.checkTGTAndReloginFromKeytab()
           }
         }
    +    val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +    renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
    +      TimeUnit.SECONDS)
     
    -    // This thread periodically runs on the AM to update the credentials on HDFS.
    -    val credentialRenewerRunnable =
    -      new Runnable {
    -        override def run(): Unit = {
    -          try {
    -            writeNewCredentialsToHDFS(principal, keytab)
    -            cleanupOldFiles()
    -          } catch {
    -            case e: Exception =>
    -              // Log the error and try to write new tokens back in an hour
    -              logWarning("Failed to write out new credentials to HDFS, will try again in an " +
    -                "hour! If this happens too often tasks will fail.", e)
    -              credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS)
    -              return
    -          }
    -          scheduleRenewal(this)
    -        }
    -      }
    -    // Schedule update of credentials. This handles the case of updating the credentials right now
    -    // as well, since the renewal interval will be 0, and the thread will get scheduled
    -    // immediately.
    -    scheduleRenewal(credentialRenewerRunnable)
    +    val creds = obtainTokensAndScheduleRenewal(ugi)
    +    ugi.addCredentials(creds)
    +
    +    // Transfer the original user's tokens to the new user, since that's needed to connect to
    +    // YARN. Explicitly avoid overwriting tokens that already exist in the current user's
    +    // credentials, since those were freshly obtained above (see SPARK-23361).
    +    val existing = ugi.getCredentials()
    +    existing.mergeAll(originalCreds)
    +    ugi.addCredentials(existing)
    +
    +    ugi
    +  }
    +
    +  def stop(): Unit = {
    +    renewalExecutor.shutdown()
       }
     
    -  // Keeps only files that are newer than daysToKeepFiles days, and deletes everything else. At
    -  // least numFilesToKeep files are kept for safety
    -  private def cleanupOldFiles(): Unit = {
    -    import scala.concurrent.duration._
    +  private def scheduleRenewal(delay: Long): Unit = {
    +    val _delay = math.max(0, delay)
    +    logInfo(s"Scheduling login from keytab in ${UIUtils.formatDuration(delay)}.")
    +    renewalExecutor.schedule(renewalTask, _delay, TimeUnit.MILLISECONDS)
    +  }
    +
    +  /**
    +   * Periodic task to login to the KDC and create new delegation tokens. Re-schedules itself
    +   * to fetch the next set of tokens when needed.
    +   */
    +  private def updateTokensTask(): Unit = {
         try {
    -      val remoteFs = FileSystem.get(freshHadoopConf)
    -      val credentialsPath = new Path(credentialsFile)
    -      val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles.days).toMillis
    -      hadoopUtil.listFilesSorted(
    -        remoteFs, credentialsPath.getParent,
    -        credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
    -        .dropRight(numFilesToKeep)
    -        .takeWhile(_.getModificationTime < thresholdTime)
    -        .foreach(x => remoteFs.delete(x.getPath, true))
    +      val freshUGI = doLogin()
    +      val creds = obtainTokensAndScheduleRenewal(freshUGI)
    +      val tokens = SparkHadoopUtil.get.serialize(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        logInfo("Updating delegation tokens.")
    +        driver.send(UpdateDelegationTokens(tokens))
    +      } else {
    +        // This shouldn't really happen, since the driver should register way before tokens expire
    +        // (or the AM should time out the application).
    +        logWarning("Delegation tokens close to expiration but no driver has registered yet.")
    +        SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
    +      }
         } catch {
    -      // Such errors are not fatal, so don't throw. Make sure they are logged though
           case e: Exception =>
    -        logWarning("Error while attempting to cleanup old credentials. If you are seeing many " +
    -          "such warnings there may be an issue with your HDFS cluster.", e)
    +        val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
    +        logWarning(s"Failed to update tokens, will try again ${UIUtils.formatDuration(delay)}! " +
    --- End diff --
    
    nit: will try again _in_ ...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87605/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20657#discussion_r172322650
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -520,4 +520,16 @@ package object config {
           .checkValue(v => v > 0, "The threshold should be positive.")
           .createWithDefault(10000000)
     
    +  private[spark] val CREDENTIALS_RENEWAL_INTERVAL_RATIO =
    +    ConfigBuilder("spark.security.credentials.renewalRatio")
    +      .doc("Ratio of the credential's expiration time when Spark should fetch new credentials.")
    +      .doubleConf
    +      .createWithDefault(0.75d)
    +
    +  private[spark] val CREDENTIALS_RENEWAL_RETRY_WAIT =
    +    ConfigBuilder("spark.security.credentials.retryWait")
    +      .doc("How long to wait before retrying to fetch new credentials after a failure.")
    +      .timeConf(TimeUnit.SECONDS)
    +      .createWithDefaultString("1h")
    --- End diff --
    
    `.internal()` for both


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20657#discussion_r175638637
  
    --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala ---
    @@ -105,7 +105,8 @@ private[spark] class MesosHadoopDelegationTokenManager(
                 case e: Exception =>
                   // Log the error and try to write new tokens back in an hour
                   logWarning("Couldn't broadcast tokens, trying again in an hour", e)
    --- End diff --
    
    Shall we update the log to reflect the configured waiting hour.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88066/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20657#discussion_r172323592
  
    --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala ---
    @@ -18,221 +18,156 @@ package org.apache.spark.deploy.yarn.security
     
     import java.security.PrivilegedExceptionAction
     import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
     
     import org.apache.hadoop.conf.Configuration
    -import org.apache.hadoop.fs.{FileSystem, Path}
    -import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
     import org.apache.spark.deploy.SparkHadoopUtil
    -import org.apache.spark.deploy.security.HadoopDelegationTokenManager
    -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
     import org.apache.spark.deploy.yarn.config._
     import org.apache.spark.internal.Logging
     import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
     import org.apache.spark.util.ThreadUtils
     
     /**
    - * The following methods are primarily meant to make sure long-running apps like Spark
    - * Streaming apps can run without interruption while accessing secured services. The
    - * scheduleLoginFromKeytab method is called on the AM to get the new credentials.
    - * This method wakes up a thread that logs into the KDC
    - * once 75% of the renewal interval of the original credentials used for the container
    - * has elapsed. It then obtains new credentials and writes them to HDFS in a
    - * pre-specified location - the prefix of which is specified in the sparkConf by
    - * spark.yarn.credentials.file (so the file(s) would be named c-timestamp1-1, c-timestamp2-2 etc.
    - * - each update goes to a new file, with a monotonically increasing suffix), also the
    - * timestamp1, timestamp2 here indicates the time of next update for CredentialUpdater.
    - * After this, the credentials are renewed once 75% of the new tokens renewal interval has elapsed.
    + * A manager tasked with periodically updating delegation tokens needed by the application.
      *
    - * On the executor and driver (yarn client mode) side, the updateCredentialsIfRequired method is
    - * called once 80% of the validity of the original credentials has elapsed. At that time the
    - * executor finds the credentials file with the latest timestamp and checks if it has read those
    - * credentials before (by keeping track of the suffix of the last file it read). If a new file has
    - * appeared, it will read the credentials and update the currently running UGI with it. This
    - * process happens again once 80% of the validity of this has expired.
    + * This manager is meant to make sure long-running apps (such as Spark Streaming apps) can run
    + * without interruption while accessing secured services. It periodically logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are both added to the current user, and also sent to the Spark driver
    + * once it's registered with the AM. The driver is tasked with distributing the tokens to other
    + * processes that might need them.
      */
     private[yarn] class AMCredentialRenewer(
         sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
    +    hadoopConf: Configuration) extends Logging {
     
    -  private var lastCredentialsFileSuffix = 0
    +  private val principal = sparkConf.get(PRINCIPAL).get
    +  private val keytab = sparkConf.get(KEYTAB).get
    +  private val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
     
    -  private val credentialRenewerThread: ScheduledExecutorService =
    +  private val renewalExecutor: ScheduledExecutorService =
         ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh Thread")
     
    -  private val hadoopUtil = SparkHadoopUtil.get
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
     
    -  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
    -  private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
    -  private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
    -  private val freshHadoopConf =
    -    hadoopUtil.getConfBypassingFSCache(hadoopConf, new Path(credentialsFile).toUri.getScheme)
    +  private val renewalTask = new Runnable() {
    +    override def run(): Unit = {
    +      updateTokensTask()
    +    }
    +  }
     
    -  @volatile private var timeOfNextRenewal: Long = sparkConf.get(CREDENTIALS_RENEWAL_TIME)
    +  def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
    +  }
     
       /**
    -   * Schedule a login from the keytab and principal set using the --principal and --keytab
    -   * arguments to spark-submit. This login happens only when the credentials of the current user
    -   * are about to expire. This method reads spark.yarn.principal and spark.yarn.keytab from
    -   * SparkConf to do the login. This method is a no-op in non-YARN mode.
    +   * Start the token renewer. Upon start, the renewer will:
    +   *
    +   * - log in the configured user, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
        *
    +   * @return The newly logged in user.
        */
    -  private[spark] def scheduleLoginFromKeytab(): Unit = {
    -    val principal = sparkConf.get(PRINCIPAL).get
    -    val keytab = sparkConf.get(KEYTAB).get
    -
    -    /**
    -     * Schedule re-login and creation of new credentials. If credentials have already expired, this
    -     * method will synchronously create new ones.
    -     */
    -    def scheduleRenewal(runnable: Runnable): Unit = {
    -      // Run now!
    -      val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
    -      if (remainingTime <= 0) {
    -        logInfo("Credentials have expired, creating new ones now.")
    -        runnable.run()
    -      } else {
    -        logInfo(s"Scheduling login from keytab in $remainingTime millis.")
    -        credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS)
    +  def start(): UserGroupInformation = {
    +    val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
    +    val ugi = doLogin()
    +
    +    val tgtRenewalTask = new Runnable() {
    +      override def run(): Unit = {
    +        ugi.checkTGTAndReloginFromKeytab()
           }
         }
    +    val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +    renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
    +      TimeUnit.SECONDS)
     
    -    // This thread periodically runs on the AM to update the credentials on HDFS.
    -    val credentialRenewerRunnable =
    -      new Runnable {
    -        override def run(): Unit = {
    -          try {
    -            writeNewCredentialsToHDFS(principal, keytab)
    -            cleanupOldFiles()
    -          } catch {
    -            case e: Exception =>
    -              // Log the error and try to write new tokens back in an hour
    -              logWarning("Failed to write out new credentials to HDFS, will try again in an " +
    -                "hour! If this happens too often tasks will fail.", e)
    -              credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS)
    -              return
    -          }
    -          scheduleRenewal(this)
    -        }
    -      }
    -    // Schedule update of credentials. This handles the case of updating the credentials right now
    -    // as well, since the renewal interval will be 0, and the thread will get scheduled
    -    // immediately.
    -    scheduleRenewal(credentialRenewerRunnable)
    +    val creds = obtainTokensAndScheduleRenewal(ugi)
    +    ugi.addCredentials(creds)
    +
    +    // Transfer the original user's tokens to the new user, since that's needed to connect to
    +    // YARN. Explicitly avoid overwriting tokens that already exist in the current user's
    +    // credentials, since those were freshly obtained above (see SPARK-23361).
    +    val existing = ugi.getCredentials()
    +    existing.mergeAll(originalCreds)
    +    ugi.addCredentials(existing)
    +
    +    ugi
    +  }
    +
    +  def stop(): Unit = {
    +    renewalExecutor.shutdown()
       }
     
    -  // Keeps only files that are newer than daysToKeepFiles days, and deletes everything else. At
    -  // least numFilesToKeep files are kept for safety
    -  private def cleanupOldFiles(): Unit = {
    -    import scala.concurrent.duration._
    +  private def scheduleRenewal(delay: Long): Unit = {
    +    val _delay = math.max(0, delay)
    +    logInfo(s"Scheduling login from keytab in ${UIUtils.formatDuration(delay)}.")
    +    renewalExecutor.schedule(renewalTask, _delay, TimeUnit.MILLISECONDS)
    +  }
    +
    +  /**
    +   * Periodic task to login to the KDC and create new delegation tokens. Re-schedules itself
    +   * to fetch the next set of tokens when needed.
    +   */
    +  private def updateTokensTask(): Unit = {
         try {
    -      val remoteFs = FileSystem.get(freshHadoopConf)
    -      val credentialsPath = new Path(credentialsFile)
    -      val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles.days).toMillis
    -      hadoopUtil.listFilesSorted(
    -        remoteFs, credentialsPath.getParent,
    -        credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
    -        .dropRight(numFilesToKeep)
    -        .takeWhile(_.getModificationTime < thresholdTime)
    -        .foreach(x => remoteFs.delete(x.getPath, true))
    +      val freshUGI = doLogin()
    +      val creds = obtainTokensAndScheduleRenewal(freshUGI)
    +      val tokens = SparkHadoopUtil.get.serialize(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        logInfo("Updating delegation tokens.")
    +        driver.send(UpdateDelegationTokens(tokens))
    +      } else {
    +        // This shouldn't really happen, since the driver should register way before tokens expire
    +        // (or the AM should time out the application).
    +        logWarning("Delegation tokens close to expiration but no driver has registered yet.")
    +        SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
    +      }
         } catch {
    -      // Such errors are not fatal, so don't throw. Make sure they are logged though
           case e: Exception =>
    -        logWarning("Error while attempting to cleanup old credentials. If you are seeing many " +
    -          "such warnings there may be an issue with your HDFS cluster.", e)
    +        val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
    +        logWarning(s"Failed to update tokens, will try again ${UIUtils.formatDuration(delay)}! " +
    +          "If this happens too often tasks will fail.", e)
    +        scheduleRenewal(delay)
         }
       }
     
    -  private def writeNewCredentialsToHDFS(principal: String, keytab: String): Unit = {
    -    // Keytab is copied by YARN to the working directory of the AM, so full path is
    -    // not needed.
    -
    -    // HACK:
    -    // HDFS will not issue new delegation tokens, if the Credentials object
    -    // passed in already has tokens for that FS even if the tokens are expired (it really only
    -    // checks if there are tokens for the service, and not if they are valid). So the only real
    -    // way to get new tokens is to make sure a different Credentials object is used each time to
    -    // get new tokens and then the new tokens are copied over the current user's Credentials.
    -    // So:
    -    // - we login as a different user and get the UGI
    -    // - use that UGI to get the tokens (see doAs block below)
    -    // - copy the tokens over to the current user's credentials (this will overwrite the tokens
    -    // in the current user's Credentials object for this FS).
    -    // The login to KDC happens each time new tokens are required, but this is rare enough to not
    -    // have to worry about (like once every day or so). This makes this code clearer than having
    -    // to login and then relogin every time (the HDFS API may not relogin since we don't use this
    -    // UGI directly for HDFS communication.
    -    logInfo(s"Attempting to login to KDC using principal: $principal")
    -    val keytabLoggedInUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
    -    logInfo("Successfully logged into KDC.")
    -    val tempCreds = keytabLoggedInUGI.getCredentials
    -    val credentialsPath = new Path(credentialsFile)
    -    val dst = credentialsPath.getParent
    -    var nearestNextRenewalTime = Long.MaxValue
    -    keytabLoggedInUGI.doAs(new PrivilegedExceptionAction[Void] {
    -      // Get a copy of the credentials
    -      override def run(): Void = {
    -        nearestNextRenewalTime = credentialManager.obtainDelegationTokens(
    -          freshHadoopConf,
    -          tempCreds)
    -        null
    +  /**
    +   * Obtain new delegation tokens from the available providers. Schedules a new task to fetch
    +   * new tokens before the new set expires.
    +   *
    +   * @return Credentials containing the new tokens.
    +   */
    +  private def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): Credentials = {
    +    ugi.doAs(new PrivilegedExceptionAction[Credentials]() {
    +      override def run(): Credentials = {
    +        val creds = new Credentials()
    +        val nextRenewal = credentialManager.obtainDelegationTokens(hadoopConf, creds)
    +
    +        val timeToWait = SparkHadoopUtil.nextCredentialRenewalTime(nextRenewal, sparkConf) -
    +          System.currentTimeMillis()
    +        scheduleRenewal(timeToWait)
    --- End diff --
    
    the old code had more guards against bad renewal times: https://github.com/apache/spark/blob/ba622f45caa808a9320c1f7ba4a4f344365dcf90/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L403-L404
    
    mostly just an observation, I don't think those extra would actually help (a bad renewer could still schedule renewal every 1ms).  


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/20657


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20657#discussion_r172362119
  
    --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala ---
    @@ -18,221 +18,156 @@ package org.apache.spark.deploy.yarn.security
     
     import java.security.PrivilegedExceptionAction
     import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
     
     import org.apache.hadoop.conf.Configuration
    -import org.apache.hadoop.fs.{FileSystem, Path}
    -import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
     import org.apache.spark.deploy.SparkHadoopUtil
    -import org.apache.spark.deploy.security.HadoopDelegationTokenManager
    -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
     import org.apache.spark.deploy.yarn.config._
     import org.apache.spark.internal.Logging
     import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
     import org.apache.spark.util.ThreadUtils
     
     /**
    - * The following methods are primarily meant to make sure long-running apps like Spark
    - * Streaming apps can run without interruption while accessing secured services. The
    - * scheduleLoginFromKeytab method is called on the AM to get the new credentials.
    - * This method wakes up a thread that logs into the KDC
    - * once 75% of the renewal interval of the original credentials used for the container
    - * has elapsed. It then obtains new credentials and writes them to HDFS in a
    - * pre-specified location - the prefix of which is specified in the sparkConf by
    - * spark.yarn.credentials.file (so the file(s) would be named c-timestamp1-1, c-timestamp2-2 etc.
    - * - each update goes to a new file, with a monotonically increasing suffix), also the
    - * timestamp1, timestamp2 here indicates the time of next update for CredentialUpdater.
    - * After this, the credentials are renewed once 75% of the new tokens renewal interval has elapsed.
    + * A manager tasked with periodically updating delegation tokens needed by the application.
      *
    - * On the executor and driver (yarn client mode) side, the updateCredentialsIfRequired method is
    - * called once 80% of the validity of the original credentials has elapsed. At that time the
    - * executor finds the credentials file with the latest timestamp and checks if it has read those
    - * credentials before (by keeping track of the suffix of the last file it read). If a new file has
    - * appeared, it will read the credentials and update the currently running UGI with it. This
    - * process happens again once 80% of the validity of this has expired.
    + * This manager is meant to make sure long-running apps (such as Spark Streaming apps) can run
    + * without interruption while accessing secured services. It periodically logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are both added to the current user, and also sent to the Spark driver
    + * once it's registered with the AM. The driver is tasked with distributing the tokens to other
    + * processes that might need them.
      */
     private[yarn] class AMCredentialRenewer(
         sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
    +    hadoopConf: Configuration) extends Logging {
     
    -  private var lastCredentialsFileSuffix = 0
    +  private val principal = sparkConf.get(PRINCIPAL).get
    +  private val keytab = sparkConf.get(KEYTAB).get
    +  private val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
     
    -  private val credentialRenewerThread: ScheduledExecutorService =
    +  private val renewalExecutor: ScheduledExecutorService =
         ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh Thread")
     
    -  private val hadoopUtil = SparkHadoopUtil.get
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
     
    -  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
    -  private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
    -  private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
    -  private val freshHadoopConf =
    -    hadoopUtil.getConfBypassingFSCache(hadoopConf, new Path(credentialsFile).toUri.getScheme)
    +  private val renewalTask = new Runnable() {
    +    override def run(): Unit = {
    +      updateTokensTask()
    +    }
    +  }
     
    -  @volatile private var timeOfNextRenewal: Long = sparkConf.get(CREDENTIALS_RENEWAL_TIME)
    +  def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
    +  }
     
       /**
    -   * Schedule a login from the keytab and principal set using the --principal and --keytab
    -   * arguments to spark-submit. This login happens only when the credentials of the current user
    -   * are about to expire. This method reads spark.yarn.principal and spark.yarn.keytab from
    -   * SparkConf to do the login. This method is a no-op in non-YARN mode.
    +   * Start the token renewer. Upon start, the renewer will:
    +   *
    +   * - log in the configured user, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
        *
    +   * @return The newly logged in user.
        */
    -  private[spark] def scheduleLoginFromKeytab(): Unit = {
    -    val principal = sparkConf.get(PRINCIPAL).get
    -    val keytab = sparkConf.get(KEYTAB).get
    -
    -    /**
    -     * Schedule re-login and creation of new credentials. If credentials have already expired, this
    -     * method will synchronously create new ones.
    -     */
    -    def scheduleRenewal(runnable: Runnable): Unit = {
    -      // Run now!
    -      val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
    -      if (remainingTime <= 0) {
    -        logInfo("Credentials have expired, creating new ones now.")
    -        runnable.run()
    -      } else {
    -        logInfo(s"Scheduling login from keytab in $remainingTime millis.")
    -        credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS)
    +  def start(): UserGroupInformation = {
    +    val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
    +    val ugi = doLogin()
    +
    +    val tgtRenewalTask = new Runnable() {
    +      override def run(): Unit = {
    +        ugi.checkTGTAndReloginFromKeytab()
           }
         }
    +    val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +    renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
    +      TimeUnit.SECONDS)
     
    -    // This thread periodically runs on the AM to update the credentials on HDFS.
    -    val credentialRenewerRunnable =
    -      new Runnable {
    -        override def run(): Unit = {
    -          try {
    -            writeNewCredentialsToHDFS(principal, keytab)
    -            cleanupOldFiles()
    -          } catch {
    -            case e: Exception =>
    -              // Log the error and try to write new tokens back in an hour
    -              logWarning("Failed to write out new credentials to HDFS, will try again in an " +
    -                "hour! If this happens too often tasks will fail.", e)
    -              credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS)
    -              return
    -          }
    -          scheduleRenewal(this)
    -        }
    -      }
    -    // Schedule update of credentials. This handles the case of updating the credentials right now
    -    // as well, since the renewal interval will be 0, and the thread will get scheduled
    -    // immediately.
    -    scheduleRenewal(credentialRenewerRunnable)
    +    val creds = obtainTokensAndScheduleRenewal(ugi)
    +    ugi.addCredentials(creds)
    +
    +    // Transfer the original user's tokens to the new user, since that's needed to connect to
    +    // YARN. Explicitly avoid overwriting tokens that already exist in the current user's
    +    // credentials, since those were freshly obtained above (see SPARK-23361).
    +    val existing = ugi.getCredentials()
    +    existing.mergeAll(originalCreds)
    +    ugi.addCredentials(existing)
    +
    +    ugi
    +  }
    +
    +  def stop(): Unit = {
    +    renewalExecutor.shutdown()
       }
     
    -  // Keeps only files that are newer than daysToKeepFiles days, and deletes everything else. At
    -  // least numFilesToKeep files are kept for safety
    -  private def cleanupOldFiles(): Unit = {
    -    import scala.concurrent.duration._
    +  private def scheduleRenewal(delay: Long): Unit = {
    +    val _delay = math.max(0, delay)
    +    logInfo(s"Scheduling login from keytab in ${UIUtils.formatDuration(delay)}.")
    +    renewalExecutor.schedule(renewalTask, _delay, TimeUnit.MILLISECONDS)
    +  }
    +
    +  /**
    +   * Periodic task to login to the KDC and create new delegation tokens. Re-schedules itself
    +   * to fetch the next set of tokens when needed.
    +   */
    +  private def updateTokensTask(): Unit = {
         try {
    -      val remoteFs = FileSystem.get(freshHadoopConf)
    -      val credentialsPath = new Path(credentialsFile)
    -      val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles.days).toMillis
    -      hadoopUtil.listFilesSorted(
    -        remoteFs, credentialsPath.getParent,
    -        credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
    -        .dropRight(numFilesToKeep)
    -        .takeWhile(_.getModificationTime < thresholdTime)
    -        .foreach(x => remoteFs.delete(x.getPath, true))
    +      val freshUGI = doLogin()
    +      val creds = obtainTokensAndScheduleRenewal(freshUGI)
    +      val tokens = SparkHadoopUtil.get.serialize(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        logInfo("Updating delegation tokens.")
    +        driver.send(UpdateDelegationTokens(tokens))
    +      } else {
    +        // This shouldn't really happen, since the driver should register way before tokens expire
    +        // (or the AM should time out the application).
    +        logWarning("Delegation tokens close to expiration but no driver has registered yet.")
    +        SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
    +      }
         } catch {
    -      // Such errors are not fatal, so don't throw. Make sure they are logged though
           case e: Exception =>
    -        logWarning("Error while attempting to cleanup old credentials. If you are seeing many " +
    -          "such warnings there may be an issue with your HDFS cluster.", e)
    +        val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
    +        logWarning(s"Failed to update tokens, will try again ${UIUtils.formatDuration(delay)}! " +
    +          "If this happens too often tasks will fail.", e)
    +        scheduleRenewal(delay)
         }
       }
     
    -  private def writeNewCredentialsToHDFS(principal: String, keytab: String): Unit = {
    -    // Keytab is copied by YARN to the working directory of the AM, so full path is
    -    // not needed.
    -
    -    // HACK:
    -    // HDFS will not issue new delegation tokens, if the Credentials object
    -    // passed in already has tokens for that FS even if the tokens are expired (it really only
    -    // checks if there are tokens for the service, and not if they are valid). So the only real
    -    // way to get new tokens is to make sure a different Credentials object is used each time to
    -    // get new tokens and then the new tokens are copied over the current user's Credentials.
    -    // So:
    -    // - we login as a different user and get the UGI
    -    // - use that UGI to get the tokens (see doAs block below)
    -    // - copy the tokens over to the current user's credentials (this will overwrite the tokens
    -    // in the current user's Credentials object for this FS).
    -    // The login to KDC happens each time new tokens are required, but this is rare enough to not
    -    // have to worry about (like once every day or so). This makes this code clearer than having
    -    // to login and then relogin every time (the HDFS API may not relogin since we don't use this
    -    // UGI directly for HDFS communication.
    -    logInfo(s"Attempting to login to KDC using principal: $principal")
    -    val keytabLoggedInUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
    -    logInfo("Successfully logged into KDC.")
    -    val tempCreds = keytabLoggedInUGI.getCredentials
    -    val credentialsPath = new Path(credentialsFile)
    -    val dst = credentialsPath.getParent
    -    var nearestNextRenewalTime = Long.MaxValue
    -    keytabLoggedInUGI.doAs(new PrivilegedExceptionAction[Void] {
    -      // Get a copy of the credentials
    -      override def run(): Void = {
    -        nearestNextRenewalTime = credentialManager.obtainDelegationTokens(
    -          freshHadoopConf,
    -          tempCreds)
    -        null
    +  /**
    +   * Obtain new delegation tokens from the available providers. Schedules a new task to fetch
    +   * new tokens before the new set expires.
    +   *
    +   * @return Credentials containing the new tokens.
    +   */
    +  private def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): Credentials = {
    +    ugi.doAs(new PrivilegedExceptionAction[Credentials]() {
    +      override def run(): Credentials = {
    +        val creds = new Credentials()
    +        val nextRenewal = credentialManager.obtainDelegationTokens(hadoopConf, creds)
    +
    +        val timeToWait = SparkHadoopUtil.nextCredentialRenewalTime(nextRenewal, sparkConf) -
    +          System.currentTimeMillis()
    +        scheduleRenewal(timeToWait)
    --- End diff --
    
    The new code has pretty much the same check in `scheduleRenewal`:
    
    ```
        val _delay = math.max(0, delay)
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20657#discussion_r172361945
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -520,4 +520,16 @@ package object config {
           .checkValue(v => v > 0, "The threshold should be positive.")
           .createWithDefault(10000000)
     
    +  private[spark] val CREDENTIALS_RENEWAL_INTERVAL_RATIO =
    +    ConfigBuilder("spark.security.credentials.renewalRatio")
    +      .doc("Ratio of the credential's expiration time when Spark should fetch new credentials.")
    +      .doubleConf
    +      .createWithDefault(0.75d)
    +
    +  private[spark] val CREDENTIALS_RENEWAL_RETRY_WAIT =
    +    ConfigBuilder("spark.security.credentials.retryWait")
    +      .doc("How long to wait before retrying to fetch new credentials after a failure.")
    +      .timeConf(TimeUnit.SECONDS)
    +      .createWithDefaultString("1h")
    --- End diff --
    
    They're not really internal, just not documented.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    **[Test build #88066 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88066/testReport)** for PR 20657 at commit [`3eb6a32`](https://github.com/apache/spark/commit/3eb6a32f281816753db1007d1b673ba92dabaef5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20657#discussion_r173073703
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -520,4 +520,16 @@ package object config {
           .checkValue(v => v > 0, "The threshold should be positive.")
           .createWithDefault(10000000)
     
    +  private[spark] val CREDENTIALS_RENEWAL_INTERVAL_RATIO =
    +    ConfigBuilder("spark.security.credentials.renewalRatio")
    +      .doc("Ratio of the credential's expiration time when Spark should fetch new credentials.")
    +      .doubleConf
    +      .createWithDefault(0.75d)
    +
    +  private[spark] val CREDENTIALS_RENEWAL_RETRY_WAIT =
    +    ConfigBuilder("spark.security.credentials.retryWait")
    +      .doc("How long to wait before retrying to fetch new credentials after a failure.")
    +      .timeConf(TimeUnit.SECONDS)
    +      .createWithDefaultString("1h")
    --- End diff --
    
    Be better to make them undocumented, so that developers still could adjust them to test. But end users don't need to touch them.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87621/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    **[Test build #87983 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87983/testReport)** for PR 20657 at commit [`3294596`](https://github.com/apache/spark/commit/329459652fb40eb82b81ef66ad93cec05b9dd016).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    @jerryshao @tgravescs 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20657#discussion_r172362293
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala ---
    @@ -144,7 +145,8 @@ class SparkHadoopUtil extends Logging {
       private[spark] def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) {
         UserGroupInformation.setConfiguration(newConfiguration(sparkConf))
         val creds = deserialize(tokens)
    -    logInfo(s"Adding/updating delegation tokens ${dumpTokens(creds)}")
    +    logInfo("Updating delegation tokens for current user.")
    --- End diff --
    
    That information is already logged by the AM, that's enough for debugging.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20657#discussion_r172604366
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -520,4 +520,16 @@ package object config {
           .checkValue(v => v > 0, "The threshold should be positive.")
           .createWithDefault(10000000)
     
    +  private[spark] val CREDENTIALS_RENEWAL_INTERVAL_RATIO =
    +    ConfigBuilder("spark.security.credentials.renewalRatio")
    +      .doc("Ratio of the credential's expiration time when Spark should fetch new credentials.")
    +      .doubleConf
    +      .createWithDefault(0.75d)
    +
    +  private[spark] val CREDENTIALS_RENEWAL_RETRY_WAIT =
    +    ConfigBuilder("spark.security.credentials.retryWait")
    +      .doc("How long to wait before retrying to fetch new credentials after a failure.")
    +      .timeConf(TimeUnit.SECONDS)
    +      .createWithDefaultString("1h")
    --- End diff --
    
    To me internal is something that is for internal Spark use, e.g. the configs I'm removing which are set by Spark itself.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    btw I took a look at the code in `MesosHadoopDelegationTokenManager`, there seems to be a lot of duplication that could probably be factored out, and I wonder if the things that are different really should be the same.  Eg. mesos doesn't expose a conf for KERBEROS_RELOGIN, its just using the renewal time from the delegation tokens.  Seems pretty easy for that to be wrong.
    
    I can open a separate ticket for that but wanted to see if this makes sense


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20657#discussion_r172963955
  
    --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala ---
    @@ -18,221 +18,156 @@ package org.apache.spark.deploy.yarn.security
     
     import java.security.PrivilegedExceptionAction
     import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
     
     import org.apache.hadoop.conf.Configuration
    -import org.apache.hadoop.fs.{FileSystem, Path}
    -import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
     import org.apache.spark.deploy.SparkHadoopUtil
    -import org.apache.spark.deploy.security.HadoopDelegationTokenManager
    -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
     import org.apache.spark.deploy.yarn.config._
     import org.apache.spark.internal.Logging
     import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
     import org.apache.spark.util.ThreadUtils
     
     /**
    - * The following methods are primarily meant to make sure long-running apps like Spark
    - * Streaming apps can run without interruption while accessing secured services. The
    - * scheduleLoginFromKeytab method is called on the AM to get the new credentials.
    - * This method wakes up a thread that logs into the KDC
    - * once 75% of the renewal interval of the original credentials used for the container
    - * has elapsed. It then obtains new credentials and writes them to HDFS in a
    - * pre-specified location - the prefix of which is specified in the sparkConf by
    - * spark.yarn.credentials.file (so the file(s) would be named c-timestamp1-1, c-timestamp2-2 etc.
    - * - each update goes to a new file, with a monotonically increasing suffix), also the
    - * timestamp1, timestamp2 here indicates the time of next update for CredentialUpdater.
    - * After this, the credentials are renewed once 75% of the new tokens renewal interval has elapsed.
    + * A manager tasked with periodically updating delegation tokens needed by the application.
      *
    - * On the executor and driver (yarn client mode) side, the updateCredentialsIfRequired method is
    - * called once 80% of the validity of the original credentials has elapsed. At that time the
    - * executor finds the credentials file with the latest timestamp and checks if it has read those
    - * credentials before (by keeping track of the suffix of the last file it read). If a new file has
    - * appeared, it will read the credentials and update the currently running UGI with it. This
    - * process happens again once 80% of the validity of this has expired.
    + * This manager is meant to make sure long-running apps (such as Spark Streaming apps) can run
    + * without interruption while accessing secured services. It periodically logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    --- End diff --
    
    for folks like me less familiar with this, this seems like a good spot to explain the overall flow a little bit more.  Eg.
    
    The KDC provides a ticket granting ticket (tgt), which is then used to obtain delegation tokens for each service.  The KDC does not expose the tgt's expiry time, so renewal is controlled by a conf (by default 1m, much more frequent than usual expiry times).  Each providers delegation token provider should determine the expiry time of the delegation token, so they can be renewed appropriately.
    
    (in particular I needed an extra read to figure out why the tgt had its own renewal mechanism)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    **[Test build #88427 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88427/testReport)** for PR 20657 at commit [`ab60dda`](https://github.com/apache/spark/commit/ab60dda1feb5c68f9ce6e67e14b777dd657e99a7).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    at a high level looks good.  I'm glad to see us passing these to the executors over rpc now.  I haven't looked in detail at the mesos side.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20657#discussion_r172325576
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala ---
    @@ -144,7 +145,8 @@ class SparkHadoopUtil extends Logging {
       private[spark] def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) {
         UserGroupInformation.setConfiguration(newConfiguration(sparkConf))
         val creds = deserialize(tokens)
    -    logInfo(s"Adding/updating delegation tokens ${dumpTokens(creds)}")
    +    logInfo("Updating delegation tokens for current user.")
    --- End diff --
    
    just a thought -- rather than just serializing the Credentials, would it be helpful to serialize a timestamp when the tokens were obtained and when they will be refreshed as well, so it could be logged here?
    you have spent more time debugging cases with problems so you will probably have a better idea if that would be helpful


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    **[Test build #88066 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88066/testReport)** for PR 20657 at commit [`3eb6a32`](https://github.com/apache/spark/commit/3eb6a32f281816753db1007d1b673ba92dabaef5).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Known flaky (SPARK-23458).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20657#discussion_r172579936
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -520,4 +520,16 @@ package object config {
           .checkValue(v => v > 0, "The threshold should be positive.")
           .createWithDefault(10000000)
     
    +  private[spark] val CREDENTIALS_RENEWAL_INTERVAL_RATIO =
    +    ConfigBuilder("spark.security.credentials.renewalRatio")
    +      .doc("Ratio of the credential's expiration time when Spark should fetch new credentials.")
    +      .doubleConf
    +      .createWithDefault(0.75d)
    +
    +  private[spark] val CREDENTIALS_RENEWAL_RETRY_WAIT =
    +    ConfigBuilder("spark.security.credentials.retryWait")
    +      .doc("How long to wait before retrying to fetch new credentials after a failure.")
    +      .timeConf(TimeUnit.SECONDS)
    +      .createWithDefaultString("1h")
    --- End diff --
    
    I thought that is what internal meant ... a user *could* specify them, but we don't document them at all, so not a stable part of the api etc.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20657#discussion_r172581601
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala ---
    @@ -144,7 +145,8 @@ class SparkHadoopUtil extends Logging {
       private[spark] def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) {
         UserGroupInformation.setConfiguration(newConfiguration(sparkConf))
         val creds = deserialize(tokens)
    -    logInfo(s"Adding/updating delegation tokens ${dumpTokens(creds)}")
    +    logInfo("Updating delegation tokens for current user.")
    --- End diff --
    
    yeah I was thinking it might be handy to have it logged in the executors and driver as well, sort of as an RPC id, so you could correlate the log lines, in case there was ever a delay in propagation or a failure to get to one executor or something, since you're choosing to always log something here.  Still, your call.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20657#discussion_r173380826
  
    --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala ---
    @@ -18,221 +18,160 @@ package org.apache.spark.deploy.yarn.security
     
     import java.security.PrivilegedExceptionAction
     import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
     
     import org.apache.hadoop.conf.Configuration
    -import org.apache.hadoop.fs.{FileSystem, Path}
    -import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
     import org.apache.spark.deploy.SparkHadoopUtil
    -import org.apache.spark.deploy.security.HadoopDelegationTokenManager
    -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
     import org.apache.spark.deploy.yarn.config._
     import org.apache.spark.internal.Logging
     import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
     import org.apache.spark.util.ThreadUtils
     
     /**
    - * The following methods are primarily meant to make sure long-running apps like Spark
    - * Streaming apps can run without interruption while accessing secured services. The
    - * scheduleLoginFromKeytab method is called on the AM to get the new credentials.
    - * This method wakes up a thread that logs into the KDC
    - * once 75% of the renewal interval of the original credentials used for the container
    - * has elapsed. It then obtains new credentials and writes them to HDFS in a
    - * pre-specified location - the prefix of which is specified in the sparkConf by
    - * spark.yarn.credentials.file (so the file(s) would be named c-timestamp1-1, c-timestamp2-2 etc.
    - * - each update goes to a new file, with a monotonically increasing suffix), also the
    - * timestamp1, timestamp2 here indicates the time of next update for CredentialUpdater.
    - * After this, the credentials are renewed once 75% of the new tokens renewal interval has elapsed.
    + * A manager tasked with periodically updating delegation tokens needed by the application.
      *
    - * On the executor and driver (yarn client mode) side, the updateCredentialsIfRequired method is
    - * called once 80% of the validity of the original credentials has elapsed. At that time the
    - * executor finds the credentials file with the latest timestamp and checks if it has read those
    - * credentials before (by keeping track of the suffix of the last file it read). If a new file has
    - * appeared, it will read the credentials and update the currently running UGI with it. This
    - * process happens again once 80% of the validity of this has expired.
    + * This manager is meant to make sure long-running apps (such as Spark Streaming apps) can run
    + * without interruption while accessing secured services. It periodically logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * This class will manage the kerberos login, by renewing the TGT when needed. Because the UGI API
    + * does not expose the TTL of the TGT, a configuration controls how often to check that a relogin is
    + * necessary. This is done reasonably often since the check is a no-op when the relogin is not yet
    + * needed. The check period can be overridden in the configuration.
    + *
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
      */
     private[yarn] class AMCredentialRenewer(
         sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
    +    hadoopConf: Configuration) extends Logging {
     
    -  private var lastCredentialsFileSuffix = 0
    +  private val principal = sparkConf.get(PRINCIPAL).get
    +  private val keytab = sparkConf.get(KEYTAB).get
    +  private val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
     
    -  private val credentialRenewerThread: ScheduledExecutorService =
    +  private val renewalExecutor: ScheduledExecutorService =
         ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh Thread")
     
    -  private val hadoopUtil = SparkHadoopUtil.get
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
     
    -  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
    -  private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
    -  private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
    -  private val freshHadoopConf =
    --- End diff --
    
    Why for now we don't need to create a new Hadoop configuration by disabling fs cache?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    LGTM, just one small comment.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    **[Test build #87983 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87983/testReport)** for PR 20657 at commit [`3294596`](https://github.com/apache/spark/commit/329459652fb40eb82b81ef66ad93cec05b9dd016).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    **[Test build #88427 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88427/testReport)** for PR 20657 at commit [`ab60dda`](https://github.com/apache/spark/commit/ab60dda1feb5c68f9ce6e67e14b777dd657e99a7).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20657#discussion_r173236591
  
    --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala ---
    @@ -18,221 +18,160 @@ package org.apache.spark.deploy.yarn.security
     
     import java.security.PrivilegedExceptionAction
     import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
     
     import org.apache.hadoop.conf.Configuration
    -import org.apache.hadoop.fs.{FileSystem, Path}
    -import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
     import org.apache.spark.deploy.SparkHadoopUtil
    -import org.apache.spark.deploy.security.HadoopDelegationTokenManager
    -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
     import org.apache.spark.deploy.yarn.config._
     import org.apache.spark.internal.Logging
     import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
     import org.apache.spark.util.ThreadUtils
     
     /**
    - * The following methods are primarily meant to make sure long-running apps like Spark
    - * Streaming apps can run without interruption while accessing secured services. The
    - * scheduleLoginFromKeytab method is called on the AM to get the new credentials.
    - * This method wakes up a thread that logs into the KDC
    - * once 75% of the renewal interval of the original credentials used for the container
    - * has elapsed. It then obtains new credentials and writes them to HDFS in a
    - * pre-specified location - the prefix of which is specified in the sparkConf by
    - * spark.yarn.credentials.file (so the file(s) would be named c-timestamp1-1, c-timestamp2-2 etc.
    - * - each update goes to a new file, with a monotonically increasing suffix), also the
    - * timestamp1, timestamp2 here indicates the time of next update for CredentialUpdater.
    - * After this, the credentials are renewed once 75% of the new tokens renewal interval has elapsed.
    + * A manager tasked with periodically updating delegation tokens needed by the application.
      *
    - * On the executor and driver (yarn client mode) side, the updateCredentialsIfRequired method is
    - * called once 80% of the validity of the original credentials has elapsed. At that time the
    - * executor finds the credentials file with the latest timestamp and checks if it has read those
    - * credentials before (by keeping track of the suffix of the last file it read). If a new file has
    - * appeared, it will read the credentials and update the currently running UGI with it. This
    - * process happens again once 80% of the validity of this has expired.
    + * This manager is meant to make sure long-running apps (such as Spark Streaming apps) can run
    + * without interruption while accessing secured services. It periodically logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * This class will manage the kerberos login, by renewing the TGT when needed. Because the UGI API
    + * does not expose the TTL of the TGT, a configuration controls how often to check that a relogin is
    + * necessary. This is done reasonably often since the check is a no-op when the relogin is not yet
    + * needed. The check period can be overridden in the configuration.
    + *
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
      */
     private[yarn] class AMCredentialRenewer(
         sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
    +    hadoopConf: Configuration) extends Logging {
     
    -  private var lastCredentialsFileSuffix = 0
    +  private val principal = sparkConf.get(PRINCIPAL).get
    +  private val keytab = sparkConf.get(KEYTAB).get
    +  private val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
     
    -  private val credentialRenewerThread: ScheduledExecutorService =
    +  private val renewalExecutor: ScheduledExecutorService =
         ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh Thread")
     
    -  private val hadoopUtil = SparkHadoopUtil.get
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
     
    -  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
    -  private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
    -  private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
    -  private val freshHadoopConf =
    -    hadoopUtil.getConfBypassingFSCache(hadoopConf, new Path(credentialsFile).toUri.getScheme)
    +  private val renewalTask = new Runnable() {
    +    override def run(): Unit = {
    +      updateTokensTask()
    +    }
    +  }
     
    -  @volatile private var timeOfNextRenewal: Long = sparkConf.get(CREDENTIALS_RENEWAL_TIME)
    +  def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
    +  }
     
       /**
    -   * Schedule a login from the keytab and principal set using the --principal and --keytab
    -   * arguments to spark-submit. This login happens only when the credentials of the current user
    -   * are about to expire. This method reads spark.yarn.principal and spark.yarn.keytab from
    -   * SparkConf to do the login. This method is a no-op in non-YARN mode.
    +   * Start the token renewer. Upon start, the renewer will:
        *
    +   * - log in the configured user, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * @return The newly logged in user.
        */
    -  private[spark] def scheduleLoginFromKeytab(): Unit = {
    -    val principal = sparkConf.get(PRINCIPAL).get
    -    val keytab = sparkConf.get(KEYTAB).get
    -
    -    /**
    -     * Schedule re-login and creation of new credentials. If credentials have already expired, this
    -     * method will synchronously create new ones.
    -     */
    -    def scheduleRenewal(runnable: Runnable): Unit = {
    -      // Run now!
    -      val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
    -      if (remainingTime <= 0) {
    -        logInfo("Credentials have expired, creating new ones now.")
    -        runnable.run()
    -      } else {
    -        logInfo(s"Scheduling login from keytab in $remainingTime millis.")
    -        credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS)
    +  def start(): UserGroupInformation = {
    +    val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
    +    val ugi = doLogin()
    +
    +    val tgtRenewalTask = new Runnable() {
    +      override def run(): Unit = {
    +        ugi.checkTGTAndReloginFromKeytab()
           }
         }
    +    val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +    renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
    +      TimeUnit.SECONDS)
     
    -    // This thread periodically runs on the AM to update the credentials on HDFS.
    -    val credentialRenewerRunnable =
    -      new Runnable {
    -        override def run(): Unit = {
    -          try {
    -            writeNewCredentialsToHDFS(principal, keytab)
    -            cleanupOldFiles()
    -          } catch {
    -            case e: Exception =>
    -              // Log the error and try to write new tokens back in an hour
    -              logWarning("Failed to write out new credentials to HDFS, will try again in an " +
    -                "hour! If this happens too often tasks will fail.", e)
    -              credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS)
    -              return
    -          }
    -          scheduleRenewal(this)
    -        }
    -      }
    -    // Schedule update of credentials. This handles the case of updating the credentials right now
    -    // as well, since the renewal interval will be 0, and the thread will get scheduled
    -    // immediately.
    -    scheduleRenewal(credentialRenewerRunnable)
    +    val creds = obtainTokensAndScheduleRenewal(ugi)
    +    ugi.addCredentials(creds)
    +
    +    // Transfer the original user's tokens to the new user, since that's needed to connect to
    +    // YARN. Explicitly avoid overwriting tokens that already exist in the current user's
    +    // credentials, since those were freshly obtained above (see SPARK-23361).
    +    val existing = ugi.getCredentials()
    +    existing.mergeAll(originalCreds)
    +    ugi.addCredentials(existing)
    +
    +    ugi
    +  }
    +
    +  def stop(): Unit = {
    +    renewalExecutor.shutdown()
    +  }
    +
    +  private def scheduleRenewal(delay: Long): Unit = {
    +    val _delay = math.max(0, delay)
    +    logInfo(s"Scheduling login from keytab in ${UIUtils.formatDuration(delay)}.")
    +    renewalExecutor.schedule(renewalTask, _delay, TimeUnit.MILLISECONDS)
       }
     
    -  // Keeps only files that are newer than daysToKeepFiles days, and deletes everything else. At
    -  // least numFilesToKeep files are kept for safety
    -  private def cleanupOldFiles(): Unit = {
    -    import scala.concurrent.duration._
    +  /**
    +   * Periodic task to login to the KDC and create new delegation tokens. Re-schedules itself
    +   * to fetch the next set of tokens when needed.
    +   */
    +  private def updateTokensTask(): Unit = {
         try {
    -      val remoteFs = FileSystem.get(freshHadoopConf)
    -      val credentialsPath = new Path(credentialsFile)
    -      val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles.days).toMillis
    -      hadoopUtil.listFilesSorted(
    -        remoteFs, credentialsPath.getParent,
    -        credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
    -        .dropRight(numFilesToKeep)
    -        .takeWhile(_.getModificationTime < thresholdTime)
    -        .foreach(x => remoteFs.delete(x.getPath, true))
    +      val freshUGI = doLogin()
    +      val creds = obtainTokensAndScheduleRenewal(freshUGI)
    +      val tokens = SparkHadoopUtil.get.serialize(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        logInfo("Updating delegation tokens.")
    +        driver.send(UpdateDelegationTokens(tokens))
    +      } else {
    +        // This shouldn't really happen, since the driver should register way before tokens expire
    +        // (or the AM should time out the application).
    +        logWarning("Delegation tokens close to expiration but no driver has registered yet.")
    +        SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
    +      }
         } catch {
    -      // Such errors are not fatal, so don't throw. Make sure they are logged though
           case e: Exception =>
    -        logWarning("Error while attempting to cleanup old credentials. If you are seeing many " +
    -          "such warnings there may be an issue with your HDFS cluster.", e)
    +        val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
    +        logWarning(s"Failed to update tokens, will try again in ${UIUtils.formatDuration(delay)}!" +
    +          " If this happens too often tasks will fail.", e)
    +        scheduleRenewal(delay)
         }
       }
     
    -  private def writeNewCredentialsToHDFS(principal: String, keytab: String): Unit = {
    -    // Keytab is copied by YARN to the working directory of the AM, so full path is
    -    // not needed.
    -
    -    // HACK:
    --- End diff --
    
    Not sure I understand the question. This comment talks about a lot of things. The only thing that really applies still is the using a new UGI to get new delegation tokens. That's not really a hack, that's just how the API works...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1372/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20657
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1000/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org