You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by vanzin <gi...@git.apache.org> on 2018/10/03 22:43:31 UTC

[GitHub] spark pull request #22624: [SPARK-23781][CORE] Add base class for token rene...

GitHub user vanzin opened a pull request:

    https://github.com/apache/spark/pull/22624

    [SPARK-23781][CORE] Add base class for token renewal functionality.

    The base class is based on YARN's token renewer, with some new
    functionality to support the current usage in Mesos. It supports
    both renewal (which you get when you provide a principal and keytab
    to Spark) and just creating an arbitrary set of tokens, which is
    used by Mesos in the non-renewal case.
    
    The internal API is a little sub-optimal because YARN needs to defer
    some of the initialization of the renewer; so the code needs to handle
    the case where the renewer doesn't yet have a reference to the driver
    endpoint, for example.
    
    YARN was tested with custom app to stress the token renewal code in
    a real cluster. Tested the driver behavior with a hacked up Mesos
    cluster (access to Hive and HDFS worked both with TGT and keytab,
    delegation tokens were obtained, but I couldn't get executors to run).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vanzin/spark SPARK-23781

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22624.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22624
    
----
commit 01e39466d622665b63576e90b8fd0da7cae7e43f
Author: Marcelo Vanzin <va...@...>
Date:   2018-10-02T20:24:21Z

    [SPARK-23781][CORE] Add base class for token renewal functionality.
    
    The base class is based on YARN's token renewer, with some new
    functionality to support the current usage in Mesos. It supports
    both renewal (which you get when you provide a principal and keytab
    to Spark) and just creating an arbitrary set of tokens, which is
    used by Mesos in the non-renewal case.
    
    The internal API is a little sub-optimal because YARN needs to defer
    some of the initialization of the renewer; so the code needs to handle
    the case where the renewer doesn't yet have a reference to the driver
    endpoint, for example.
    
    YARN was tested with custom app to stress the token renewal code in
    a real cluster. Tested the driver behavior with a hacked up Mesos
    clusters (access to Hive and HDFS worked both with TGT and keytab,
    delegation tokens were obtained, but I couldn't get executors to run).

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Add base class for token rene...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r223766218
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/AbstractCredentialRenewer.scala ---
    @@ -0,0 +1,224 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.deploy.security
    +
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
    +
    +/**
    + * Base class for periodically updating delegation tokens needed by the application.
    + *
    + * When configured with a principal and a keytab, this manager will make sure long-running apps
    + * (such as Spark Streaming apps) can run without interruption while accessing secured services. It
    + * periodically logs in to the KDC with user-provided credentials, and contacts all the configured
    + * secure services to obtain delegation tokens to be distributed to the rest of the application.
    + *
    + * This class will manage the kerberos login, by renewing the TGT when needed. Because the UGI API
    + * does not expose the TTL of the TGT, a configuration controls how often to check that a relogin is
    + * necessary. This is done reasonably often since the check is a no-op when the relogin is not yet
    + * needed. The check period can be overridden in the configuration.
    + *
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
    + *
    + * This class can also be used when without a principal and keytab, in which case token renewal will
    + * not be available. It provides a different API in that case (see `createAndUpdateTokens()`), which
    + * automates the distribution of tokens to the different processes in the Spark app.
    + */
    +private[spark] abstract class AbstractCredentialRenewer(
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
    +
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a keytab.")
    +    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +  }
    +
    +  private val renewalExecutor: ScheduledExecutorService =
    +    if (principal != null) {
    +      ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
    +    } else {
    +      null
    +    }
    +
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
    +
    +  protected def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
    +  }
    +
    +  protected def renewalEnabled: Boolean = principal != null
    +
    +  /**
    +   * Start the token renewer. Upon start, if a principal has been configured, the renewer will:
    +   *
    +   * - log in the configured principal, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * @return The newly logged in user.
    +   */
    +  def start(): UserGroupInformation = {
    +    require(renewalEnabled, "Token renewal is disabled.")
    +
    +    val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
    +    val ugi = doLogin()
    +
    +    val tgtRenewalTask = new Runnable() {
    +      override def run(): Unit = {
    +        ugi.checkTGTAndReloginFromKeytab()
    +      }
    +    }
    +    val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +    renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
    +      TimeUnit.SECONDS)
    +
    +    val creds = obtainTokensAndScheduleRenewal(ugi)
    +    ugi.addCredentials(creds)
    +
    +    val driver = driverRef.get()
    +    if (driver != null) {
    --- End diff --
    
    > What about extending this start() method with a boolean flag sendingInitialToken
    
    That is just as awkward, because then you'd have to set the driver ref before calling start with that flag set to true.
    
    The API is just going to be a little awkward until we refactor the way the YARN code deals with tokens. The goal here is not to fix the API, but to share all the rest of the logic.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3826/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r229067463
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -230,7 +242,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
             val reply = SparkAppConfig(
               sparkProperties,
               SparkEnv.get.securityManager.getIOEncryptionKey(),
    -          fetchHadoopDelegationTokens())
    +          Option(delegationTokens.get()))
    --- End diff --
    
    Not sure I follow.  to be clear, I definitely think this is extremely unlikely, but I don't see what totally prevents it.
    
    >  So the message to set these tokens is the first message queued on the driver, and this reference will already be set when the first executor asks for this info.
    
    I see that the message will get queued from the TokenManager to get sent to the SchedulerBackend before anything else happens.  But I was a bit hazy on what happens after that -- I guess because both are local, the way the rpc env works, nothing else can squeeze in-between?  its definitely not obvious -- seems its worth at least a short comment like "Because the  token manager is local, we know we will have already received the initial set of tokens (if any)"


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Add base class for token rene...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r223767033
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/AbstractCredentialRenewer.scala ---
    @@ -0,0 +1,224 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.deploy.security
    +
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
    +
    +/**
    + * Base class for periodically updating delegation tokens needed by the application.
    + *
    + * When configured with a principal and a keytab, this manager will make sure long-running apps
    + * (such as Spark Streaming apps) can run without interruption while accessing secured services. It
    + * periodically logs in to the KDC with user-provided credentials, and contacts all the configured
    + * secure services to obtain delegation tokens to be distributed to the rest of the application.
    + *
    + * This class will manage the kerberos login, by renewing the TGT when needed. Because the UGI API
    + * does not expose the TTL of the TGT, a configuration controls how often to check that a relogin is
    + * necessary. This is done reasonably often since the check is a no-op when the relogin is not yet
    + * needed. The check period can be overridden in the configuration.
    + *
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
    + *
    + * This class can also be used when without a principal and keytab, in which case token renewal will
    + * not be available. It provides a different API in that case (see `createAndUpdateTokens()`), which
    + * automates the distribution of tokens to the different processes in the Spark app.
    + */
    +private[spark] abstract class AbstractCredentialRenewer(
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
    +
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a keytab.")
    +    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +  }
    +
    +  private val renewalExecutor: ScheduledExecutorService =
    +    if (principal != null) {
    +      ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
    +    } else {
    +      null
    +    }
    +
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
    +
    +  protected def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
    +  }
    +
    +  protected def renewalEnabled: Boolean = principal != null
    +
    +  /**
    +   * Start the token renewer. Upon start, if a principal has been configured, the renewer will:
    +   *
    +   * - log in the configured principal, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * @return The newly logged in user.
    +   */
    +  def start(): UserGroupInformation = {
    +    require(renewalEnabled, "Token renewal is disabled.")
    +
    +    val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
    +    val ugi = doLogin()
    +
    +    val tgtRenewalTask = new Runnable() {
    +      override def run(): Unit = {
    +        ugi.checkTGTAndReloginFromKeytab()
    +      }
    +    }
    +    val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +    renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
    +      TimeUnit.SECONDS)
    +
    +    val creds = obtainTokensAndScheduleRenewal(ugi)
    +    ugi.addCredentials(creds)
    +
    +    val driver = driverRef.get()
    +    if (driver != null) {
    +      val tokens = SparkHadoopUtil.get.serialize(creds)
    +      driver.send(UpdateDelegationTokens(tokens))
    +    }
    +
    +    // Transfer the original user's tokens to the new user, since it may contain needed tokens
    +    // (such as those user to connect to YARN). Explicitly avoid overwriting tokens that already
    +    // exist in the current user's credentials, since those were freshly obtained above
    +    // (see SPARK-23361).
    +    val existing = ugi.getCredentials()
    +    existing.mergeAll(originalCreds)
    +    ugi.addCredentials(existing)
    +    ugi
    +  }
    +
    +  def stop(): Unit = {
    +    if (renewalExecutor != null) {
    +      renewalExecutor.shutdown()
    +    }
    +  }
    +
    +  /** Create new tokens for the current user and distribute them to the driver. */
    +  protected def createAndUpdateTokens(): Unit = {
    +    val driver = driverRef.get()
    +    require(driver != null, "Driver endpoint not set.")
    +
    +    val creds = new Credentials()
    +    obtainDelegationTokens(creds)
    +    UserGroupInformation.getCurrentUser.addCredentials(creds)
    +
    +    val tokens = SparkHadoopUtil.get.serialize(creds)
    +    driver.send(UpdateDelegationTokens(tokens))
    +  }
    +
    +  /**
    +   * Fetch new delegation tokens for configured services, storing them in the given credentials.
    +   *
    +   * @return The time by which the tokens must be renewed.
    +   */
    +  protected def obtainDelegationTokens(creds: Credentials): Long
    --- End diff --
    
    This makes it more flexible. I also really dislike returning tuples.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    **[Test build #97506 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97506/testReport)** for PR 22624 at commit [`b58dd8c`](https://github.com/apache/spark/commit/b58dd8c7f16624ec3f47f7c1b149b6644a6512ac).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    There's stuff that I need to fix for the recent changes in the kubernetes code; also I'm going to do the work I meant to do for SPARK-25693 here, since it requires as much testing and isn't that much more code. So hang on a bit.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    **[Test build #96913 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96913/testReport)** for PR 22624 at commit [`01e3946`](https://github.com/apache/spark/commit/01e39466d622665b63576e90b8fd0da7cae7e43f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Add base class for token rene...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r223815272
  
    --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala ---
    @@ -34,15 +34,9 @@ private[spark] class MesosHadoopDelegationTokenManager(
     
       private val tokenManager = new HadoopDelegationTokenManager(sparkConf, hadoopConf)
     
    -  def start(driverEndpoint: RpcEndpointRef): Unit = {
    -    require(driverEndpoint != null, "DriverEndpoint is not initialized")
    -    setDriverRef(driverEndpoint)
    -    if (renewalEnabled) {
    -      super.start()
    -    } else {
    -      logInfo("Using ticket cache for Kerberos authentication, no token renewal.")
    -      createAndUpdateTokens()
    -    }
    +  override def start(driver: Option[RpcEndpointRef]): UserGroupInformation = {
    +    require(driver.orNull != null, "Driver endpoint is not initialized")
    --- End diff --
    
    `Some(null).isDefined` is true. And if you get initialization order wrong in these classes, you can get into that situation. So this is safer.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Add base class for token rene...

Posted by attilapiros <gi...@git.apache.org>.
Github user attilapiros commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r223605049
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/AbstractCredentialRenewer.scala ---
    @@ -0,0 +1,224 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.deploy.security
    +
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
    +
    +/**
    + * Base class for periodically updating delegation tokens needed by the application.
    + *
    + * When configured with a principal and a keytab, this manager will make sure long-running apps
    + * (such as Spark Streaming apps) can run without interruption while accessing secured services. It
    + * periodically logs in to the KDC with user-provided credentials, and contacts all the configured
    + * secure services to obtain delegation tokens to be distributed to the rest of the application.
    + *
    + * This class will manage the kerberos login, by renewing the TGT when needed. Because the UGI API
    + * does not expose the TTL of the TGT, a configuration controls how often to check that a relogin is
    + * necessary. This is done reasonably often since the check is a no-op when the relogin is not yet
    + * needed. The check period can be overridden in the configuration.
    + *
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
    + *
    + * This class can also be used when without a principal and keytab, in which case token renewal will
    + * not be available. It provides a different API in that case (see `createAndUpdateTokens()`), which
    + * automates the distribution of tokens to the different processes in the Spark app.
    + */
    +private[spark] abstract class AbstractCredentialRenewer(
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
    +
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a keytab.")
    +    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +  }
    +
    +  private val renewalExecutor: ScheduledExecutorService =
    +    if (principal != null) {
    +      ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
    +    } else {
    +      null
    +    }
    +
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
    +
    +  protected def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
    +  }
    +
    +  protected def renewalEnabled: Boolean = principal != null
    +
    +  /**
    +   * Start the token renewer. Upon start, if a principal has been configured, the renewer will:
    +   *
    +   * - log in the configured principal, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * @return The newly logged in user.
    +   */
    +  def start(): UserGroupInformation = {
    +    require(renewalEnabled, "Token renewal is disabled.")
    +
    +    val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
    +    val ugi = doLogin()
    +
    +    val tgtRenewalTask = new Runnable() {
    +      override def run(): Unit = {
    +        ugi.checkTGTAndReloginFromKeytab()
    +      }
    +    }
    +    val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +    renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
    +      TimeUnit.SECONDS)
    +
    +    val creds = obtainTokensAndScheduleRenewal(ugi)
    +    ugi.addCredentials(creds)
    +
    +    val driver = driverRef.get()
    +    if (driver != null) {
    +      val tokens = SparkHadoopUtil.get.serialize(creds)
    +      driver.send(UpdateDelegationTokens(tokens))
    +    }
    +
    +    // Transfer the original user's tokens to the new user, since it may contain needed tokens
    +    // (such as those user to connect to YARN). Explicitly avoid overwriting tokens that already
    +    // exist in the current user's credentials, since those were freshly obtained above
    +    // (see SPARK-23361).
    +    val existing = ugi.getCredentials()
    +    existing.mergeAll(originalCreds)
    +    ugi.addCredentials(existing)
    +    ugi
    +  }
    +
    +  def stop(): Unit = {
    +    if (renewalExecutor != null) {
    +      renewalExecutor.shutdown()
    +    }
    +  }
    +
    +  /** Create new tokens for the current user and distribute them to the driver. */
    +  protected def createAndUpdateTokens(): Unit = {
    +    val driver = driverRef.get()
    +    require(driver != null, "Driver endpoint not set.")
    +
    +    val creds = new Credentials()
    +    obtainDelegationTokens(creds)
    +    UserGroupInformation.getCurrentUser.addCredentials(creds)
    +
    +    val tokens = SparkHadoopUtil.get.serialize(creds)
    +    driver.send(UpdateDelegationTokens(tokens))
    +  }
    +
    +  /**
    +   * Fetch new delegation tokens for configured services, storing them in the given credentials.
    +   *
    +   * @return The time by which the tokens must be renewed.
    +   */
    +  protected def obtainDelegationTokens(creds: Credentials): Long
    --- End diff --
    
    As the obtainDelegationTokens always called with a freshly created Credentials (which is more like an output variable) does it make sense to move Credentials construction within this method and giving back a tuple of Credentials and Long? (If such a function is passed as an argument even the inheritance can be saved.)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3834/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Another small change to rename AbstractCredentialRenewer -> AbstractCredentialManager. I prefer that name better since I'm adding more than just renewal functionality to that class, but it sort of clashes with the existing classes. I'll file a separate bug to rename  those classes instead.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    one minor comment about the `start` api, but otherwise lgtm from the yarn side.  would need a bit more time to look at the other cluster managers if nobody else can vouch for those.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r228398489
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala ---
    @@ -110,32 +209,105 @@ private[spark] class HadoopDelegationTokenManager(
       }
     
       /**
    -   * Get delegation token provider for the specified service.
    +   * List of file systems for which to obtain delegation tokens. The base implementation
    +   * returns just the default file system in the given Hadoop configuration.
        */
    -  def getServiceDelegationTokenProvider(service: String): Option[HadoopDelegationTokenProvider] = {
    -    delegationTokenProviders.get(service)
    +  protected def fileSystemsToAccess(): Set[FileSystem] = {
    +    Set(FileSystem.get(hadoopConf))
    +  }
    +
    +  private def scheduleRenewal(delay: Long): Unit = {
    +    val _delay = math.max(0, delay)
    +    logInfo(s"Scheduling login from keytab in ${UIUtils.formatDuration(delay)}.")
    +
    +    val renewalTask = new Runnable() {
    +      override def run(): Unit = {
    +        updateTokensTask()
    +      }
    +    }
    +    renewalExecutor.schedule(renewalTask, _delay, TimeUnit.MILLISECONDS)
       }
     
       /**
    -   * Writes delegation tokens to creds.  Delegation tokens are fetched from all registered
    -   * providers.
    -   *
    -   * @param hadoopConf hadoop Configuration
    -   * @param creds Credentials that will be updated in place (overwritten)
    -   * @return Time after which the fetched delegation tokens should be renewed.
    +   * Periodic task to login to the KDC and create new delegation tokens. Re-schedules itself
    +   * to fetch the next set of tokens when needed.
        */
    -  def obtainDelegationTokens(
    -      hadoopConf: Configuration,
    -      creds: Credentials): Long = {
    -    delegationTokenProviders.values.flatMap { provider =>
    -      if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
    -        provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
    +  private def updateTokensTask(): Unit = {
    +    try {
    +      val freshUGI = doLogin()
    +      val creds = obtainTokensAndScheduleRenewal(freshUGI)
    +      val tokens = SparkHadoopUtil.get.serialize(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        logInfo("Updating delegation tokens.")
    +        driver.send(UpdateDelegationTokens(tokens))
           } else {
    -        logDebug(s"Service ${provider.serviceName} does not require a token." +
    -          s" Check your configuration to see if security is disabled or not.")
    -        None
    +        // This shouldn't really happen, since the driver should register way before tokens expire.
    +        logWarning("Delegation tokens close to expiration but no driver has registered yet.")
    +        SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
           }
    -    }.foldLeft(Long.MaxValue)(math.min)
    +    } catch {
    +      case e: Exception =>
    +        val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
    +        logWarning(s"Failed to update tokens, will try again in ${UIUtils.formatDuration(delay)}!" +
    +          " If this happens too often tasks will fail.", e)
    +        scheduleRenewal(delay)
    +    }
    +  }
    +
    +  /**
    +   * Obtain new delegation tokens from the available providers. Schedules a new task to fetch
    +   * new tokens before the new set expires.
    +   *
    +   * @return Credentials containing the new tokens.
    +   */
    +  private def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): Credentials = {
    +    ugi.doAs(new PrivilegedExceptionAction[Credentials]() {
    +      override def run(): Credentials = {
    +        val creds = new Credentials()
    +        val nextRenewal = obtainDelegationTokens(creds)
    +
    +        // Calculate the time when new credentials should be created, based on the configured
    +        // ratio.
    +        val now = System.currentTimeMillis
    +        val ratio = sparkConf.get(CREDENTIALS_RENEWAL_INTERVAL_RATIO)
    +        val adjustedNextRenewal = (now + (ratio * (nextRenewal - now))).toLong
    +
    +        scheduleRenewal(adjustedNextRenewal - now)
    --- End diff --
    
    you're adding `now` and subtracting it off again, instead you could do
    
    ```scala
    val adjustedRenewalDelay = (ratio * (nextRenewal - now)).toLong
    scheduleRenewal(adjustedRenewalDelay)
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Add base class for token rene...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r223767268
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/AbstractCredentialRenewer.scala ---
    @@ -0,0 +1,224 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.deploy.security
    +
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
    +
    +/**
    + * Base class for periodically updating delegation tokens needed by the application.
    + *
    + * When configured with a principal and a keytab, this manager will make sure long-running apps
    + * (such as Spark Streaming apps) can run without interruption while accessing secured services. It
    + * periodically logs in to the KDC with user-provided credentials, and contacts all the configured
    + * secure services to obtain delegation tokens to be distributed to the rest of the application.
    + *
    + * This class will manage the kerberos login, by renewing the TGT when needed. Because the UGI API
    + * does not expose the TTL of the TGT, a configuration controls how often to check that a relogin is
    + * necessary. This is done reasonably often since the check is a no-op when the relogin is not yet
    + * needed. The check period can be overridden in the configuration.
    + *
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
    + *
    + * This class can also be used when without a principal and keytab, in which case token renewal will
    --- End diff --
    
    The goal is to make it simple for scheduler backends to use token functionality. Adding more classes that the backends need to deal with complicates things instead of making them simpler.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    I tested the last update with yarn cluster and client, both seem happy.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97163/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    **[Test build #97164 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97164/testReport)** for PR 22624 at commit [`90e878d`](https://github.com/apache/spark/commit/90e878ddfdbe1048f01983bcfa8cecfb526a81dc).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    > ...calls fetchDelegationTokens() twice since the tokenRenewalInterval...
    
    That is true - the second fetch is only needed for YARN because reasons. But that's also completely unrelated to this change, so please file a separate bug for it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    > Is there any place which documents this overall flow?
    
    Not yet. I plan to write a README.md explaining this, separately, since there's a lot of
    confusion not just about the flow, but about exactly what this code does and why.
    
    > add delegations tokens inside CoarseGrainedSchedulerBackend instead of YarnSchedulerBackend
    
    I'm not 100% sure that's necessary. In YARN client mode + keytab, it's needed because
    the AM handles the login and sends DTs back to the driver. But in other modes, the driver
    is expected to have a valid TGT at all times, so it should not need DTs.
    
    But I see that the old Mesos code did what you're saying, so perhaps it's better to keep
    that and avoid a regression. I just need to test that doesn't do anything funny on YARN.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    **[Test build #96913 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96913/testReport)** for PR 22624 at commit [`01e3946`](https://github.com/apache/spark/commit/01e39466d622665b63576e90b8fd0da7cae7e43f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    **[Test build #97164 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97164/testReport)** for PR 22624 at commit [`90e878d`](https://github.com/apache/spark/commit/90e878ddfdbe1048f01983bcfa8cecfb526a81dc).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Add base class for token rene...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r223766731
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/AbstractCredentialRenewer.scala ---
    @@ -0,0 +1,224 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.deploy.security
    +
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
    +
    +/**
    + * Base class for periodically updating delegation tokens needed by the application.
    + *
    + * When configured with a principal and a keytab, this manager will make sure long-running apps
    + * (such as Spark Streaming apps) can run without interruption while accessing secured services. It
    + * periodically logs in to the KDC with user-provided credentials, and contacts all the configured
    + * secure services to obtain delegation tokens to be distributed to the rest of the application.
    + *
    + * This class will manage the kerberos login, by renewing the TGT when needed. Because the UGI API
    + * does not expose the TTL of the TGT, a configuration controls how often to check that a relogin is
    + * necessary. This is done reasonably often since the check is a no-op when the relogin is not yet
    + * needed. The check period can be overridden in the configuration.
    + *
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
    + *
    + * This class can also be used when without a principal and keytab, in which case token renewal will
    + * not be available. It provides a different API in that case (see `createAndUpdateTokens()`), which
    + * automates the distribution of tokens to the different processes in the Spark app.
    + */
    +private[spark] abstract class AbstractCredentialRenewer(
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
    +
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a keytab.")
    +    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +  }
    +
    +  private val renewalExecutor: ScheduledExecutorService =
    --- End diff --
    
    `Option` doesn't add anything useful here.
    
    I want `createAndUpdateTokens` here because it will be useful for k8s client mode and for when the YARN code is refactored. Just like it is in Mesos today.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    **[Test build #97173 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97173/testReport)** for PR 22624 at commit [`638bb07`](https://github.com/apache/spark/commit/638bb0700d3faa6753a857f70035fce8f95b6a8b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/4547/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r227985360
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala ---
    @@ -17,76 +17,175 @@
     
     package org.apache.spark.deploy.security
     
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
     import org.apache.hadoop.conf.Configuration
     import org.apache.hadoop.fs.FileSystem
    -import org.apache.hadoop.security.Credentials
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
     
     /**
    - * Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to
    - * obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]],
    - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not
    - * explicitly disabled.
    + * Manager for delegation tokens in a Spark application.
    + *
    + * This manager has two modes of operation:
    + *
    + * 1.  When configured with a principal and a keytab, it will make sure long-running apps can run
    + * without interruption while accessing secured services. It periodically logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often
    + * to check that a relogin is necessary. This is done reasonably often since the check is a no-op
    + * when the relogin is not yet needed. The check period can be overridden in the configuration.
      *
    - * Also, each HadoopDelegationTokenProvider is controlled by
    - * spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to
    - * false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be
    - * enabled/disabled by the configuration spark.security.credentials.hive.enabled.
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
      *
    - * @param sparkConf Spark configuration
    - * @param hadoopConf Hadoop configuration
    - * @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems.
    + * 2. When operating without an explicit principal and keytab, token renewal will not be available.
    + * Starting the manager will distribute an initial set of delegation tokens to the provided Spark
    + * driver, but the app will not get new tokens when those expire.
    + *
    + * It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens`
    + * method. This option does not require calling the `start` method, but leaves it up to the
    + * caller to distribute the tokens that were generated.
      */
     private[spark] class HadoopDelegationTokenManager(
    -    sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    fileSystems: Configuration => Set[FileSystem])
    -  extends Logging {
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
     
       private val deprecatedProviderEnabledConfigs = List(
         "spark.yarn.security.tokens.%s.enabled",
         "spark.yarn.security.credentials.%s.enabled")
       private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
     
    -  // Maintain all the registered delegation token providers
    -  private val delegationTokenProviders = getDelegationTokenProviders
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a keytab.")
    +    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +  }
    +
    +  private val delegationTokenProviders = loadProviders()
       logDebug("Using the following builtin delegation token providers: " +
         s"${delegationTokenProviders.keys.mkString(", ")}.")
     
    -  /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */
    -  def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
    -    this(
    -      sparkConf,
    -      hadoopConf,
    -      hadoopConf => Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf)))
    +  private var renewalExecutor: ScheduledExecutorService = _
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
    +
    +  protected def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
       }
     
    -  private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
    -    val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++
    -      safeCreateProvider(new HiveDelegationTokenProvider) ++
    -      safeCreateProvider(new HBaseDelegationTokenProvider)
    +  /**
    +   * Start the token renewer. Upon start, if a principal has been configured, the renewer will:
    +   *
    +   * - log in the configured principal, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * When token renewal is not enabled, this method will not start any periodic tasks. Instead, it
    +   * will generate tokens if the driver ref has been provided, update the current user, and send
    +   * those tokens to the driver. No future tokens will be generated, so when that initial set
    +   * expires, the app will stop working.
    +   *
    +   * @param driver If provided, the driver where to send the newly generated tokens.
    +   *               The same ref will also receive future token updates unless overridden later.
    +   * @return The newly logged in user, or null if a principal is not configured.
    +   */
    +  def start(driver: Option[RpcEndpointRef] = None): UserGroupInformation = {
    +    driver.foreach(setDriverRef)
     
    -    // Filter out providers for which spark.security.credentials.{service}.enabled is false.
    -    providers
    -      .filter { p => isServiceEnabled(p.serviceName) }
    -      .map { p => (p.serviceName, p) }
    -      .toMap
    +    if (principal != null) {
    +      renewalExecutor =
    +        ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
    +
    +      val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
    +      val ugi = doLogin()
    +
    +      val tgtRenewalTask = new Runnable() {
    +        override def run(): Unit = {
    +          ugi.checkTGTAndReloginFromKeytab()
    +        }
    +      }
    +      val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +      renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
    +        TimeUnit.SECONDS)
    +
    +      val creds = obtainTokensAndScheduleRenewal(ugi)
    +      ugi.addCredentials(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        val tokens = SparkHadoopUtil.get.serialize(creds)
    +        driver.send(UpdateDelegationTokens(tokens))
    +      }
    +
    +      // Transfer the original user's tokens to the new user, since it may contain needed tokens
    +      // (such as those user to connect to YARN). Explicitly avoid overwriting tokens that already
    +      // exist in the current user's credentials, since those were freshly obtained above
    +      // (see SPARK-23361).
    +      val existing = ugi.getCredentials()
    +      existing.mergeAll(originalCreds)
    +      ugi.addCredentials(existing)
    +      ugi
    +    } else {
    --- End diff --
    
    Yes, I'd prefer any k8s-specific enhancements to be made together with the corresponding k8s code.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by ifilonenko <gi...@git.apache.org>.
Github user ifilonenko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r227062715
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala ---
    @@ -17,76 +17,175 @@
     
     package org.apache.spark.deploy.security
     
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
     import org.apache.hadoop.conf.Configuration
     import org.apache.hadoop.fs.FileSystem
    -import org.apache.hadoop.security.Credentials
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
     
     /**
    - * Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to
    - * obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]],
    - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not
    - * explicitly disabled.
    + * Manager for delegation tokens in a Spark application.
    + *
    + * This manager has two modes of operation:
    + *
    + * 1.  When configured with a principal and a keytab, it will make sure long-running apps can run
    + * without interruption while accessing secured services. It periodically logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often
    + * to check that a relogin is necessary. This is done reasonably often since the check is a no-op
    + * when the relogin is not yet needed. The check period can be overridden in the configuration.
      *
    - * Also, each HadoopDelegationTokenProvider is controlled by
    - * spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to
    - * false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be
    - * enabled/disabled by the configuration spark.security.credentials.hive.enabled.
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
      *
    - * @param sparkConf Spark configuration
    - * @param hadoopConf Hadoop configuration
    - * @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems.
    + * 2. When operating without an explicit principal and keytab, token renewal will not be available.
    + * Starting the manager will distribute an initial set of delegation tokens to the provided Spark
    + * driver, but the app will not get new tokens when those expire.
    + *
    + * It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens`
    + * method. This option does not require calling the `start` method, but leaves it up to the
    + * caller to distribute the tokens that were generated.
      */
     private[spark] class HadoopDelegationTokenManager(
    -    sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    fileSystems: Configuration => Set[FileSystem])
    -  extends Logging {
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
     
       private val deprecatedProviderEnabledConfigs = List(
         "spark.yarn.security.tokens.%s.enabled",
         "spark.yarn.security.credentials.%s.enabled")
       private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
     
    -  // Maintain all the registered delegation token providers
    -  private val delegationTokenProviders = getDelegationTokenProviders
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a keytab.")
    +    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +  }
    +
    +  private val delegationTokenProviders = loadProviders()
       logDebug("Using the following builtin delegation token providers: " +
         s"${delegationTokenProviders.keys.mkString(", ")}.")
     
    -  /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */
    -  def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
    -    this(
    -      sparkConf,
    -      hadoopConf,
    -      hadoopConf => Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf)))
    +  private var renewalExecutor: ScheduledExecutorService = _
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
    +
    +  protected def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
       }
     
    -  private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
    -    val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++
    -      safeCreateProvider(new HiveDelegationTokenProvider) ++
    -      safeCreateProvider(new HBaseDelegationTokenProvider)
    +  /**
    +   * Start the token renewer. Upon start, if a principal has been configured, the renewer will:
    +   *
    +   * - log in the configured principal, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * When token renewal is not enabled, this method will not start any periodic tasks. Instead, it
    +   * will generate tokens if the driver ref has been provided, update the current user, and send
    +   * those tokens to the driver. No future tokens will be generated, so when that initial set
    +   * expires, the app will stop working.
    +   *
    +   * @param driver If provided, the driver where to send the newly generated tokens.
    +   *               The same ref will also receive future token updates unless overridden later.
    +   * @return The newly logged in user, or null if a principal is not configured.
    +   */
    +  def start(driver: Option[RpcEndpointRef] = None): UserGroupInformation = {
    --- End diff --
    
    Are there going to be unit tests for these kinds of methods?  Because trying to mock lines like `UserGroupInformation.getCurrentUser().getCredentials()` will be tricky. If not, then the way the methods are structured appear to be fine, given successful e2e tests. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    I'm trying to get things to work on Mesos in my hacked cluster, might need some small adjustments to the code.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r229068643
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala ---
    @@ -17,76 +17,175 @@
     
     package org.apache.spark.deploy.security
     
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
     import org.apache.hadoop.conf.Configuration
     import org.apache.hadoop.fs.FileSystem
    -import org.apache.hadoop.security.Credentials
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
     
     /**
    - * Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to
    - * obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]],
    - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not
    - * explicitly disabled.
    + * Manager for delegation tokens in a Spark application.
    + *
    + * This manager has two modes of operation:
    + *
    + * 1.  When configured with a principal and a keytab, it will make sure long-running apps can run
    + * without interruption while accessing secured services. It periodically logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often
    + * to check that a relogin is necessary. This is done reasonably often since the check is a no-op
    + * when the relogin is not yet needed. The check period can be overridden in the configuration.
      *
    - * Also, each HadoopDelegationTokenProvider is controlled by
    - * spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to
    - * false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be
    - * enabled/disabled by the configuration spark.security.credentials.hive.enabled.
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
      *
    - * @param sparkConf Spark configuration
    - * @param hadoopConf Hadoop configuration
    - * @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems.
    + * 2. When operating without an explicit principal and keytab, token renewal will not be available.
    + * Starting the manager will distribute an initial set of delegation tokens to the provided Spark
    + * driver, but the app will not get new tokens when those expire.
    + *
    + * It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens`
    + * method. This option does not require calling the `start` method, but leaves it up to the
    + * caller to distribute the tokens that were generated.
      */
     private[spark] class HadoopDelegationTokenManager(
    -    sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    fileSystems: Configuration => Set[FileSystem])
    -  extends Logging {
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
     
       private val deprecatedProviderEnabledConfigs = List(
         "spark.yarn.security.tokens.%s.enabled",
         "spark.yarn.security.credentials.%s.enabled")
       private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
     
    -  // Maintain all the registered delegation token providers
    -  private val delegationTokenProviders = getDelegationTokenProviders
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  require((principal == null) == (keytab == null),
    +    "Both principal and keytab must be defined, or neither.")
    +  require(keytab == null || new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +
    +  private val delegationTokenProviders = loadProviders()
       logDebug("Using the following builtin delegation token providers: " +
         s"${delegationTokenProviders.keys.mkString(", ")}.")
     
    -  /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */
    -  def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
    -    this(
    -      sparkConf,
    -      hadoopConf,
    -      hadoopConf => Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf)))
    +  private var renewalExecutor: ScheduledExecutorService = _
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
    +
    +  protected def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
       }
     
    -  private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
    -    val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++
    -      safeCreateProvider(new HiveDelegationTokenProvider) ++
    -      safeCreateProvider(new HBaseDelegationTokenProvider)
    +  /**
    +   * Start the token renewer. Upon start, if a principal has been configured, the renewer will:
    +   *
    +   * - log in the configured principal, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * When token renewal is not enabled, this method will not start any periodic tasks. Instead, it
    +   * will generate tokens if the driver ref has been provided, update the current user, and send
    +   * those tokens to the driver. No future tokens will be generated, so when that initial set
    +   * expires, the app will stop working.
    +   *
    +   * @param driverEndpoint If provided, the driver where to send the newly generated tokens.
    +   *                       The same ref will also receive future token updates unless overridden
    +   *                       later.
    +   * @return The newly logged in user, or null if a principal is not configured.
    +   */
    +  def start(driverEndpoint: Option[RpcEndpointRef] = None): UserGroupInformation = {
    +    driverEndpoint.foreach(setDriverRef)
     
    -    // Filter out providers for which spark.security.credentials.{service}.enabled is false.
    -    providers
    -      .filter { p => isServiceEnabled(p.serviceName) }
    -      .map { p => (p.serviceName, p) }
    -      .toMap
    +    if (principal != null) {
    +      renewalExecutor =
    +        ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
    +
    +      val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
    +      val ugi = doLogin()
    +
    +      val tgtRenewalTask = new Runnable() {
    +        override def run(): Unit = {
    +          ugi.checkTGTAndReloginFromKeytab()
    +        }
    +      }
    +      val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +      renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
    +        TimeUnit.SECONDS)
    +
    +      val creds = obtainTokensAndScheduleRenewal(ugi)
    +      ugi.addCredentials(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        val tokens = SparkHadoopUtil.get.serialize(creds)
    +        driver.send(UpdateDelegationTokens(tokens))
    +      }
    +
    +      // Transfer the original user's tokens to the new user, since it may contain needed tokens
    +      // (such as those user to connect to YARN). Explicitly avoid overwriting tokens that already
    +      // exist in the current user's credentials, since those were freshly obtained above
    +      // (see SPARK-23361).
    +      val existing = ugi.getCredentials()
    +      existing.mergeAll(originalCreds)
    +      ugi.addCredentials(existing)
    +      ugi
    +    } else {
    +      driverEndpoint.foreach { driver =>
    --- End diff --
    
    ok so we agree that is a bit ugly and maybe there isn't a better api currently -- but are you opposed to the other suggestions, till the other changes happen?  I know its a small thing, I just think it would help keep the usage clear (or another set of asserts to resolve the ambiguity another way is fine too, I just don't like the ambiguity).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r229010495
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -230,7 +242,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
             val reply = SparkAppConfig(
               sparkProperties,
               SparkEnv.get.securityManager.getIOEncryptionKey(),
    -          fetchHadoopDelegationTokens())
    +          Option(delegationTokens.get()))
    --- End diff --
    
    realized the other changes were small so I took a look -- this is the only part which concerns me.  This used to be a blocking call, so you were sure that you'd get tokens to respond to `RetrieveSparkAppConfig`.  However, now you're getting the delegation tokens async.  You will probably get them back from the TokenManager before sending them here, but there is a race, right?  Will it be OK if the executor doesn't get the tokens here?  I know you'll send the executor the tokens in an `UpdateDelegationTokens` msg later, but will that lead to some spurious failures?
    
    you could change the token manager to call `driver.ask(UpdateDelegationTokens(tokens))` inside `start()`, and ensure that future is completed here, though it makes things more complex.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by ifilonenko <gi...@git.apache.org>.
Github user ifilonenko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r227984918
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala ---
    @@ -17,76 +17,175 @@
     
     package org.apache.spark.deploy.security
     
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
     import org.apache.hadoop.conf.Configuration
     import org.apache.hadoop.fs.FileSystem
    -import org.apache.hadoop.security.Credentials
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
     
     /**
    - * Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to
    - * obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]],
    - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not
    - * explicitly disabled.
    + * Manager for delegation tokens in a Spark application.
    + *
    + * This manager has two modes of operation:
    + *
    + * 1.  When configured with a principal and a keytab, it will make sure long-running apps can run
    + * without interruption while accessing secured services. It periodically logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often
    + * to check that a relogin is necessary. This is done reasonably often since the check is a no-op
    + * when the relogin is not yet needed. The check period can be overridden in the configuration.
      *
    - * Also, each HadoopDelegationTokenProvider is controlled by
    - * spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to
    - * false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be
    - * enabled/disabled by the configuration spark.security.credentials.hive.enabled.
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
      *
    - * @param sparkConf Spark configuration
    - * @param hadoopConf Hadoop configuration
    - * @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems.
    + * 2. When operating without an explicit principal and keytab, token renewal will not be available.
    + * Starting the manager will distribute an initial set of delegation tokens to the provided Spark
    + * driver, but the app will not get new tokens when those expire.
    + *
    + * It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens`
    + * method. This option does not require calling the `start` method, but leaves it up to the
    + * caller to distribute the tokens that were generated.
      */
     private[spark] class HadoopDelegationTokenManager(
    -    sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    fileSystems: Configuration => Set[FileSystem])
    -  extends Logging {
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
     
       private val deprecatedProviderEnabledConfigs = List(
         "spark.yarn.security.tokens.%s.enabled",
         "spark.yarn.security.credentials.%s.enabled")
       private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
     
    -  // Maintain all the registered delegation token providers
    -  private val delegationTokenProviders = getDelegationTokenProviders
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a keytab.")
    +    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +  }
    +
    +  private val delegationTokenProviders = loadProviders()
       logDebug("Using the following builtin delegation token providers: " +
         s"${delegationTokenProviders.keys.mkString(", ")}.")
     
    -  /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */
    -  def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
    -    this(
    -      sparkConf,
    -      hadoopConf,
    -      hadoopConf => Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf)))
    +  private var renewalExecutor: ScheduledExecutorService = _
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
    +
    +  protected def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
       }
     
    -  private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
    -    val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++
    -      safeCreateProvider(new HiveDelegationTokenProvider) ++
    -      safeCreateProvider(new HBaseDelegationTokenProvider)
    +  /**
    +   * Start the token renewer. Upon start, if a principal has been configured, the renewer will:
    +   *
    +   * - log in the configured principal, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * When token renewal is not enabled, this method will not start any periodic tasks. Instead, it
    +   * will generate tokens if the driver ref has been provided, update the current user, and send
    +   * those tokens to the driver. No future tokens will be generated, so when that initial set
    +   * expires, the app will stop working.
    +   *
    +   * @param driver If provided, the driver where to send the newly generated tokens.
    +   *               The same ref will also receive future token updates unless overridden later.
    +   * @return The newly logged in user, or null if a principal is not configured.
    +   */
    +  def start(driver: Option[RpcEndpointRef] = None): UserGroupInformation = {
    +    driver.foreach(setDriverRef)
     
    -    // Filter out providers for which spark.security.credentials.{service}.enabled is false.
    -    providers
    -      .filter { p => isServiceEnabled(p.serviceName) }
    -      .map { p => (p.serviceName, p) }
    -      .toMap
    +    if (principal != null) {
    +      renewalExecutor =
    +        ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
    +
    +      val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
    +      val ugi = doLogin()
    +
    +      val tgtRenewalTask = new Runnable() {
    +        override def run(): Unit = {
    +          ugi.checkTGTAndReloginFromKeytab()
    +        }
    +      }
    +      val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +      renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
    +        TimeUnit.SECONDS)
    +
    +      val creds = obtainTokensAndScheduleRenewal(ugi)
    +      ugi.addCredentials(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        val tokens = SparkHadoopUtil.get.serialize(creds)
    +        driver.send(UpdateDelegationTokens(tokens))
    +      }
    +
    +      // Transfer the original user's tokens to the new user, since it may contain needed tokens
    +      // (such as those user to connect to YARN). Explicitly avoid overwriting tokens that already
    +      // exist in the current user's credentials, since those were freshly obtained above
    +      // (see SPARK-23361).
    +      val existing = ugi.getCredentials()
    +      existing.mergeAll(originalCreds)
    +      ugi.addCredentials(existing)
    +      ugi
    +    } else {
    --- End diff --
    
    Can we make this method overrideable since, despite these two cases being the only ones in Yarn and Mesos, Kubernetes does have a third option with pre-defined secrets, using Kubernetes specific Watchers, I wish to leverage that will also end with `ref.send(UpdateDelegationTokens(tokens)`? Unless you prefer to have the follow-up PR do this refactoring. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/98105/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r228400760
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala ---
    @@ -17,76 +17,175 @@
     
     package org.apache.spark.deploy.security
     
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
     import org.apache.hadoop.conf.Configuration
     import org.apache.hadoop.fs.FileSystem
    -import org.apache.hadoop.security.Credentials
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
     
     /**
    - * Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to
    - * obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]],
    - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not
    - * explicitly disabled.
    + * Manager for delegation tokens in a Spark application.
    + *
    + * This manager has two modes of operation:
    + *
    + * 1.  When configured with a principal and a keytab, it will make sure long-running apps can run
    + * without interruption while accessing secured services. It periodically logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often
    + * to check that a relogin is necessary. This is done reasonably often since the check is a no-op
    + * when the relogin is not yet needed. The check period can be overridden in the configuration.
      *
    - * Also, each HadoopDelegationTokenProvider is controlled by
    - * spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to
    - * false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be
    - * enabled/disabled by the configuration spark.security.credentials.hive.enabled.
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
      *
    - * @param sparkConf Spark configuration
    - * @param hadoopConf Hadoop configuration
    - * @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems.
    + * 2. When operating without an explicit principal and keytab, token renewal will not be available.
    + * Starting the manager will distribute an initial set of delegation tokens to the provided Spark
    + * driver, but the app will not get new tokens when those expire.
    + *
    + * It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens`
    + * method. This option does not require calling the `start` method, but leaves it up to the
    + * caller to distribute the tokens that were generated.
      */
     private[spark] class HadoopDelegationTokenManager(
    -    sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    fileSystems: Configuration => Set[FileSystem])
    -  extends Logging {
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
     
       private val deprecatedProviderEnabledConfigs = List(
         "spark.yarn.security.tokens.%s.enabled",
         "spark.yarn.security.credentials.%s.enabled")
       private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
     
    -  // Maintain all the registered delegation token providers
    -  private val delegationTokenProviders = getDelegationTokenProviders
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a keytab.")
    +    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +  }
    +
    +  private val delegationTokenProviders = loadProviders()
       logDebug("Using the following builtin delegation token providers: " +
         s"${delegationTokenProviders.keys.mkString(", ")}.")
     
    -  /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */
    -  def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
    -    this(
    -      sparkConf,
    -      hadoopConf,
    -      hadoopConf => Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf)))
    +  private var renewalExecutor: ScheduledExecutorService = _
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
    +
    +  protected def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
       }
     
    -  private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
    -    val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++
    -      safeCreateProvider(new HiveDelegationTokenProvider) ++
    -      safeCreateProvider(new HBaseDelegationTokenProvider)
    +  /**
    +   * Start the token renewer. Upon start, if a principal has been configured, the renewer will:
    +   *
    +   * - log in the configured principal, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * When token renewal is not enabled, this method will not start any periodic tasks. Instead, it
    +   * will generate tokens if the driver ref has been provided, update the current user, and send
    +   * those tokens to the driver. No future tokens will be generated, so when that initial set
    +   * expires, the app will stop working.
    +   *
    +   * @param driver If provided, the driver where to send the newly generated tokens.
    +   *               The same ref will also receive future token updates unless overridden later.
    +   * @return The newly logged in user, or null if a principal is not configured.
    +   */
    +  def start(driver: Option[RpcEndpointRef] = None): UserGroupInformation = {
    +    driver.foreach(setDriverRef)
     
    -    // Filter out providers for which spark.security.credentials.{service}.enabled is false.
    -    providers
    -      .filter { p => isServiceEnabled(p.serviceName) }
    -      .map { p => (p.serviceName, p) }
    -      .toMap
    +    if (principal != null) {
    +      renewalExecutor =
    +        ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
    +
    +      val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
    +      val ugi = doLogin()
    +
    +      val tgtRenewalTask = new Runnable() {
    +        override def run(): Unit = {
    +          ugi.checkTGTAndReloginFromKeytab()
    +        }
    +      }
    +      val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +      renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
    +        TimeUnit.SECONDS)
    +
    +      val creds = obtainTokensAndScheduleRenewal(ugi)
    +      ugi.addCredentials(creds)
    +
    +      val driver = driverRef.get()
    --- End diff --
    
    `driver` is also the name of the arg to this method, can you rename one or the other?  maybe rename the arg to `driverOpt`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    **[Test build #96915 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96915/testReport)** for PR 22624 at commit [`5b7604e`](https://github.com/apache/spark/commit/5b7604e77122b4c44048b14e8be47d17eeffd866).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96913/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Add base class for token rene...

Posted by attilapiros <gi...@git.apache.org>.
Github user attilapiros commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r223607997
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/AbstractCredentialRenewer.scala ---
    @@ -0,0 +1,224 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.deploy.security
    +
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
    +
    +/**
    + * Base class for periodically updating delegation tokens needed by the application.
    + *
    + * When configured with a principal and a keytab, this manager will make sure long-running apps
    + * (such as Spark Streaming apps) can run without interruption while accessing secured services. It
    + * periodically logs in to the KDC with user-provided credentials, and contacts all the configured
    + * secure services to obtain delegation tokens to be distributed to the rest of the application.
    + *
    + * This class will manage the kerberos login, by renewing the TGT when needed. Because the UGI API
    + * does not expose the TTL of the TGT, a configuration controls how often to check that a relogin is
    + * necessary. This is done reasonably often since the check is a no-op when the relogin is not yet
    + * needed. The check period can be overridden in the configuration.
    + *
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
    + *
    + * This class can also be used when without a principal and keytab, in which case token renewal will
    --- End diff --
    
    Why not extracting the `createAndUpdateTokens()` (along with `obtainDelegationTokens`) functionality into a separate class which would be used by this `Renewer` as well. So `Renewer` would be non-abstract class and the `renewalEnabled` and all the conditions using it would go away. Even giving a principal would be required so could be checked at the member val. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    **[Test build #97623 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97623/testReport)** for PR 22624 at commit [`b3a282e`](https://github.com/apache/spark/commit/b3a282e9d9cc0a6202091603e84c1ce0d50269b7).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r228400112
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala ---
    @@ -18,45 +18,20 @@
     package org.apache.spark.deploy.k8s.security
     
     import org.apache.hadoop.conf.Configuration
    -import org.apache.hadoop.fs.FileSystem
    -import org.apache.hadoop.security.{Credentials, UserGroupInformation}
    +import org.apache.hadoop.security.UserGroupInformation
     
     import org.apache.spark.SparkConf
    -import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.deploy.security.HadoopDelegationTokenManager
    -import org.apache.spark.internal.Logging
     
     /**
    - * The KubernetesHadoopDelegationTokenManager fetches Hadoop delegation tokens
    - * on the behalf of the Kubernetes submission client. The new credentials
    - * (called Tokens when they are serialized) are stored in Secrets accessible
    - * to the driver and executors, when new Tokens are received they overwrite the current Secrets.
    + * Adds Kubernetes-specific functionality to HadoopDelegationTokenManager.
      */
     private[spark] class KubernetesHadoopDelegationTokenManager(
    --- End diff --
    
    this class doesn't really seem necessary anymore, but not a big deal


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r228644038
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala ---
    @@ -17,76 +17,175 @@
     
     package org.apache.spark.deploy.security
     
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
     import org.apache.hadoop.conf.Configuration
     import org.apache.hadoop.fs.FileSystem
    -import org.apache.hadoop.security.Credentials
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
     
     /**
    - * Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to
    - * obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]],
    - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not
    - * explicitly disabled.
    + * Manager for delegation tokens in a Spark application.
    + *
    + * This manager has two modes of operation:
    + *
    + * 1.  When configured with a principal and a keytab, it will make sure long-running apps can run
    + * without interruption while accessing secured services. It periodically logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often
    + * to check that a relogin is necessary. This is done reasonably often since the check is a no-op
    + * when the relogin is not yet needed. The check period can be overridden in the configuration.
      *
    - * Also, each HadoopDelegationTokenProvider is controlled by
    - * spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to
    - * false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be
    - * enabled/disabled by the configuration spark.security.credentials.hive.enabled.
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
      *
    - * @param sparkConf Spark configuration
    - * @param hadoopConf Hadoop configuration
    - * @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems.
    + * 2. When operating without an explicit principal and keytab, token renewal will not be available.
    + * Starting the manager will distribute an initial set of delegation tokens to the provided Spark
    + * driver, but the app will not get new tokens when those expire.
    + *
    + * It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens`
    + * method. This option does not require calling the `start` method, but leaves it up to the
    + * caller to distribute the tokens that were generated.
      */
     private[spark] class HadoopDelegationTokenManager(
    -    sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    fileSystems: Configuration => Set[FileSystem])
    -  extends Logging {
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
     
       private val deprecatedProviderEnabledConfigs = List(
         "spark.yarn.security.tokens.%s.enabled",
         "spark.yarn.security.credentials.%s.enabled")
       private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
     
    -  // Maintain all the registered delegation token providers
    -  private val delegationTokenProviders = getDelegationTokenProviders
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a keytab.")
    --- End diff --
    
    yeah agree its weird that the check is in so many places -- that bugged me too but I wasn't really sure about whether there were other entry points.  but in any case, yes I think anyplace we're going to check, lets be consistent.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r228788072
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala ---
    @@ -17,76 +17,175 @@
     
     package org.apache.spark.deploy.security
     
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
     import org.apache.hadoop.conf.Configuration
     import org.apache.hadoop.fs.FileSystem
    -import org.apache.hadoop.security.Credentials
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
     
     /**
    - * Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to
    - * obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]],
    - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not
    - * explicitly disabled.
    + * Manager for delegation tokens in a Spark application.
    + *
    + * This manager has two modes of operation:
    + *
    + * 1.  When configured with a principal and a keytab, it will make sure long-running apps can run
    + * without interruption while accessing secured services. It periodically logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often
    + * to check that a relogin is necessary. This is done reasonably often since the check is a no-op
    + * when the relogin is not yet needed. The check period can be overridden in the configuration.
      *
    - * Also, each HadoopDelegationTokenProvider is controlled by
    - * spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to
    - * false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be
    - * enabled/disabled by the configuration spark.security.credentials.hive.enabled.
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
      *
    - * @param sparkConf Spark configuration
    - * @param hadoopConf Hadoop configuration
    - * @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems.
    + * 2. When operating without an explicit principal and keytab, token renewal will not be available.
    + * Starting the manager will distribute an initial set of delegation tokens to the provided Spark
    + * driver, but the app will not get new tokens when those expire.
    + *
    + * It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens`
    + * method. This option does not require calling the `start` method, but leaves it up to the
    + * caller to distribute the tokens that were generated.
      */
     private[spark] class HadoopDelegationTokenManager(
    -    sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    fileSystems: Configuration => Set[FileSystem])
    -  extends Logging {
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
     
       private val deprecatedProviderEnabledConfigs = List(
         "spark.yarn.security.tokens.%s.enabled",
         "spark.yarn.security.credentials.%s.enabled")
       private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
     
    -  // Maintain all the registered delegation token providers
    -  private val delegationTokenProviders = getDelegationTokenProviders
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  require((principal == null) == (keytab == null),
    +    "Both principal and keytab must be defined, or neither.")
    +  require(keytab == null || new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +
    +  private val delegationTokenProviders = loadProviders()
       logDebug("Using the following builtin delegation token providers: " +
         s"${delegationTokenProviders.keys.mkString(", ")}.")
     
    -  /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */
    -  def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
    -    this(
    -      sparkConf,
    -      hadoopConf,
    -      hadoopConf => Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf)))
    +  private var renewalExecutor: ScheduledExecutorService = _
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
    +
    +  protected def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
       }
     
    -  private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
    -    val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++
    -      safeCreateProvider(new HiveDelegationTokenProvider) ++
    -      safeCreateProvider(new HBaseDelegationTokenProvider)
    +  /**
    +   * Start the token renewer. Upon start, if a principal has been configured, the renewer will:
    +   *
    +   * - log in the configured principal, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * When token renewal is not enabled, this method will not start any periodic tasks. Instead, it
    +   * will generate tokens if the driver ref has been provided, update the current user, and send
    +   * those tokens to the driver. No future tokens will be generated, so when that initial set
    +   * expires, the app will stop working.
    +   *
    +   * @param driverEndpoint If provided, the driver where to send the newly generated tokens.
    +   *                       The same ref will also receive future token updates unless overridden
    +   *                       later.
    +   * @return The newly logged in user, or null if a principal is not configured.
    +   */
    +  def start(driverEndpoint: Option[RpcEndpointRef] = None): UserGroupInformation = {
    +    driverEndpoint.foreach(setDriverRef)
     
    -    // Filter out providers for which spark.security.credentials.{service}.enabled is false.
    -    providers
    -      .filter { p => isServiceEnabled(p.serviceName) }
    -      .map { p => (p.serviceName, p) }
    -      .toMap
    +    if (principal != null) {
    +      renewalExecutor =
    +        ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
    +
    +      val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
    +      val ugi = doLogin()
    +
    +      val tgtRenewalTask = new Runnable() {
    +        override def run(): Unit = {
    +          ugi.checkTGTAndReloginFromKeytab()
    +        }
    +      }
    +      val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +      renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
    +        TimeUnit.SECONDS)
    +
    +      val creds = obtainTokensAndScheduleRenewal(ugi)
    +      ugi.addCredentials(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        val tokens = SparkHadoopUtil.get.serialize(creds)
    +        driver.send(UpdateDelegationTokens(tokens))
    +      }
    +
    +      // Transfer the original user's tokens to the new user, since it may contain needed tokens
    +      // (such as those user to connect to YARN). Explicitly avoid overwriting tokens that already
    +      // exist in the current user's credentials, since those were freshly obtained above
    +      // (see SPARK-23361).
    +      val existing = ugi.getCredentials()
    +      existing.mergeAll(originalCreds)
    +      ugi.addCredentials(existing)
    +      ugi
    +    } else {
    +      driverEndpoint.foreach { driver =>
    --- End diff --
    
    though this isn't a concern with the way its currently used, the behavior is a little confusing if you first call `setDriverRef`, and then later call `start()` with no args.  Its also pointless to call the no arg `start()` if principal & keytab aren't set.  would it make sense to (a) here, also use `driverRef`, so calling this with no args still works if you set the driverRef earlier and (b) add an `assert (driverRef.get() != null || principal != null)`?
    
    I tried to also think if it made sense to change this to be a couple of different non-private methods, but didn't come up with anything that I thought would be better ...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/4613/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r227072541
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala ---
    @@ -17,76 +17,175 @@
     
     package org.apache.spark.deploy.security
     
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
     import org.apache.hadoop.conf.Configuration
     import org.apache.hadoop.fs.FileSystem
    -import org.apache.hadoop.security.Credentials
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
     
     /**
    - * Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to
    - * obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]],
    - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not
    - * explicitly disabled.
    + * Manager for delegation tokens in a Spark application.
    + *
    + * This manager has two modes of operation:
    + *
    + * 1.  When configured with a principal and a keytab, it will make sure long-running apps can run
    + * without interruption while accessing secured services. It periodically logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often
    + * to check that a relogin is necessary. This is done reasonably often since the check is a no-op
    + * when the relogin is not yet needed. The check period can be overridden in the configuration.
      *
    - * Also, each HadoopDelegationTokenProvider is controlled by
    - * spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to
    - * false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be
    - * enabled/disabled by the configuration spark.security.credentials.hive.enabled.
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
      *
    - * @param sparkConf Spark configuration
    - * @param hadoopConf Hadoop configuration
    - * @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems.
    + * 2. When operating without an explicit principal and keytab, token renewal will not be available.
    + * Starting the manager will distribute an initial set of delegation tokens to the provided Spark
    + * driver, but the app will not get new tokens when those expire.
    + *
    + * It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens`
    + * method. This option does not require calling the `start` method, but leaves it up to the
    + * caller to distribute the tokens that were generated.
      */
     private[spark] class HadoopDelegationTokenManager(
    -    sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    fileSystems: Configuration => Set[FileSystem])
    -  extends Logging {
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
     
       private val deprecatedProviderEnabledConfigs = List(
         "spark.yarn.security.tokens.%s.enabled",
         "spark.yarn.security.credentials.%s.enabled")
       private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
     
    -  // Maintain all the registered delegation token providers
    -  private val delegationTokenProviders = getDelegationTokenProviders
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a keytab.")
    +    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +  }
    +
    +  private val delegationTokenProviders = loadProviders()
       logDebug("Using the following builtin delegation token providers: " +
         s"${delegationTokenProviders.keys.mkString(", ")}.")
     
    -  /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */
    -  def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
    -    this(
    -      sparkConf,
    -      hadoopConf,
    -      hadoopConf => Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf)))
    +  private var renewalExecutor: ScheduledExecutorService = _
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
    +
    +  protected def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
       }
     
    -  private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
    -    val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++
    -      safeCreateProvider(new HiveDelegationTokenProvider) ++
    -      safeCreateProvider(new HBaseDelegationTokenProvider)
    +  /**
    +   * Start the token renewer. Upon start, if a principal has been configured, the renewer will:
    +   *
    +   * - log in the configured principal, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * When token renewal is not enabled, this method will not start any periodic tasks. Instead, it
    +   * will generate tokens if the driver ref has been provided, update the current user, and send
    +   * those tokens to the driver. No future tokens will be generated, so when that initial set
    +   * expires, the app will stop working.
    +   *
    +   * @param driver If provided, the driver where to send the newly generated tokens.
    +   *               The same ref will also receive future token updates unless overridden later.
    +   * @return The newly logged in user, or null if a principal is not configured.
    +   */
    +  def start(driver: Option[RpcEndpointRef] = None): UserGroupInformation = {
    --- End diff --
    
    This code has always lacked proper unit tests. In part it's because it's hard to tell what a good unit test is here, in part because no one ever bothered. It's also lacking integration tests, although I have a few internally that I generally run on these changes.
    
    That needs to be looked at separately; adding changes here without knowing what a good unit test for this code is would be kinda pointless.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/98238/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    **[Test build #98105 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98105/testReport)** for PR 22624 at commit [`da8be70`](https://github.com/apache/spark/commit/da8be70321dcc3d943535231d5eb26591f5115e0).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r229059902
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -230,7 +242,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
             val reply = SparkAppConfig(
               sparkProperties,
               SparkEnv.get.securityManager.getIOEncryptionKey(),
    -          fetchHadoopDelegationTokens())
    +          Option(delegationTokens.get()))
    --- End diff --
    
    Actually since I moved the token manager to `CoarseGrainedSchedulerBackend` I think I can simplify this and avoid having to trace how the RPC code works to prove this is fine. I'll try to update things this afternoon after I finish some other stuff.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    **[Test build #97506 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97506/testReport)** for PR 22624 at commit [`b58dd8c`](https://github.com/apache/spark/commit/b58dd8c7f16624ec3f47f7c1b149b6644a6512ac).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97623/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r228397209
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala ---
    @@ -17,76 +17,175 @@
     
     package org.apache.spark.deploy.security
     
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
     import org.apache.hadoop.conf.Configuration
     import org.apache.hadoop.fs.FileSystem
    -import org.apache.hadoop.security.Credentials
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
     
     /**
    - * Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to
    - * obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]],
    - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not
    - * explicitly disabled.
    + * Manager for delegation tokens in a Spark application.
    + *
    + * This manager has two modes of operation:
    + *
    + * 1.  When configured with a principal and a keytab, it will make sure long-running apps can run
    + * without interruption while accessing secured services. It periodically logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often
    + * to check that a relogin is necessary. This is done reasonably often since the check is a no-op
    + * when the relogin is not yet needed. The check period can be overridden in the configuration.
      *
    - * Also, each HadoopDelegationTokenProvider is controlled by
    - * spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to
    - * false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be
    - * enabled/disabled by the configuration spark.security.credentials.hive.enabled.
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
      *
    - * @param sparkConf Spark configuration
    - * @param hadoopConf Hadoop configuration
    - * @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems.
    + * 2. When operating without an explicit principal and keytab, token renewal will not be available.
    + * Starting the manager will distribute an initial set of delegation tokens to the provided Spark
    + * driver, but the app will not get new tokens when those expire.
    + *
    + * It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens`
    + * method. This option does not require calling the `start` method, but leaves it up to the
    + * caller to distribute the tokens that were generated.
      */
     private[spark] class HadoopDelegationTokenManager(
    -    sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    fileSystems: Configuration => Set[FileSystem])
    -  extends Logging {
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
     
       private val deprecatedProviderEnabledConfigs = List(
         "spark.yarn.security.tokens.%s.enabled",
         "spark.yarn.security.credentials.%s.enabled")
       private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
     
    -  // Maintain all the registered delegation token providers
    -  private val delegationTokenProviders = getDelegationTokenProviders
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a keytab.")
    --- End diff --
    
    what if the keytab is specified but not the principal? shoudl this be the same check as in Client.scala
    
    https://github.com/apache/spark/blob/79f3babcc6e189d7405464b9ac1eb1c017e51f5d/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L107-L108


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96915/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    >> Is there any place which documents this overall flow?
    >
    > Not yet. I plan to write a README.md explaining this, separately, since there's a lot of
    confusion not just about the flow, but about exactly what this code does and why.
    
    OK sounds great.  yeah would be really good to have something explaining the different scenarios, I'm often worried when I'm looking at this code I'm forgetting about some weird case (eg. the AM restarts after 7 days but everything else stays up, etc.)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Add base class for token rene...

Posted by attilapiros <gi...@git.apache.org>.
Github user attilapiros commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r223590938
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/AbstractCredentialRenewer.scala ---
    @@ -0,0 +1,224 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.deploy.security
    +
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
    +
    +/**
    + * Base class for periodically updating delegation tokens needed by the application.
    + *
    + * When configured with a principal and a keytab, this manager will make sure long-running apps
    + * (such as Spark Streaming apps) can run without interruption while accessing secured services. It
    + * periodically logs in to the KDC with user-provided credentials, and contacts all the configured
    + * secure services to obtain delegation tokens to be distributed to the rest of the application.
    + *
    + * This class will manage the kerberos login, by renewing the TGT when needed. Because the UGI API
    + * does not expose the TTL of the TGT, a configuration controls how often to check that a relogin is
    + * necessary. This is done reasonably often since the check is a no-op when the relogin is not yet
    + * needed. The check period can be overridden in the configuration.
    + *
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
    + *
    + * This class can also be used when without a principal and keytab, in which case token renewal will
    + * not be available. It provides a different API in that case (see `createAndUpdateTokens()`), which
    + * automates the distribution of tokens to the different processes in the Spark app.
    + */
    +private[spark] abstract class AbstractCredentialRenewer(
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
    +
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a keytab.")
    +    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +  }
    +
    +  private val renewalExecutor: ScheduledExecutorService =
    --- End diff --
    
    Why not a typesafe Option here? 
    If `createAndUpdateTokens()` is extracted this would be always defined.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r228628515
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala ---
    @@ -17,76 +17,175 @@
     
     package org.apache.spark.deploy.security
     
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
     import org.apache.hadoop.conf.Configuration
     import org.apache.hadoop.fs.FileSystem
    -import org.apache.hadoop.security.Credentials
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
     
     /**
    - * Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to
    - * obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]],
    - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not
    - * explicitly disabled.
    + * Manager for delegation tokens in a Spark application.
    + *
    + * This manager has two modes of operation:
    + *
    + * 1.  When configured with a principal and a keytab, it will make sure long-running apps can run
    + * without interruption while accessing secured services. It periodically logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often
    + * to check that a relogin is necessary. This is done reasonably often since the check is a no-op
    + * when the relogin is not yet needed. The check period can be overridden in the configuration.
      *
    - * Also, each HadoopDelegationTokenProvider is controlled by
    - * spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to
    - * false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be
    - * enabled/disabled by the configuration spark.security.credentials.hive.enabled.
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
      *
    - * @param sparkConf Spark configuration
    - * @param hadoopConf Hadoop configuration
    - * @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems.
    + * 2. When operating without an explicit principal and keytab, token renewal will not be available.
    + * Starting the manager will distribute an initial set of delegation tokens to the provided Spark
    + * driver, but the app will not get new tokens when those expire.
    + *
    + * It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens`
    + * method. This option does not require calling the `start` method, but leaves it up to the
    + * caller to distribute the tokens that were generated.
      */
     private[spark] class HadoopDelegationTokenManager(
    -    sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    fileSystems: Configuration => Set[FileSystem])
    -  extends Logging {
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
     
       private val deprecatedProviderEnabledConfigs = List(
         "spark.yarn.security.tokens.%s.enabled",
         "spark.yarn.security.credentials.%s.enabled")
       private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
     
    -  // Maintain all the registered delegation token providers
    -  private val delegationTokenProviders = getDelegationTokenProviders
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a keytab.")
    --- End diff --
    
    I actually dislike that this check exists in so many places... but sure I'll make it consistent.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r228400837
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala ---
    @@ -17,76 +17,175 @@
     
     package org.apache.spark.deploy.security
     
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
     import org.apache.hadoop.conf.Configuration
     import org.apache.hadoop.fs.FileSystem
    -import org.apache.hadoop.security.Credentials
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
     
     /**
    - * Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to
    - * obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]],
    - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not
    - * explicitly disabled.
    + * Manager for delegation tokens in a Spark application.
    + *
    + * This manager has two modes of operation:
    + *
    + * 1.  When configured with a principal and a keytab, it will make sure long-running apps can run
    + * without interruption while accessing secured services. It periodically logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often
    + * to check that a relogin is necessary. This is done reasonably often since the check is a no-op
    + * when the relogin is not yet needed. The check period can be overridden in the configuration.
      *
    - * Also, each HadoopDelegationTokenProvider is controlled by
    - * spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to
    - * false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be
    - * enabled/disabled by the configuration spark.security.credentials.hive.enabled.
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
      *
    - * @param sparkConf Spark configuration
    - * @param hadoopConf Hadoop configuration
    - * @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems.
    + * 2. When operating without an explicit principal and keytab, token renewal will not be available.
    + * Starting the manager will distribute an initial set of delegation tokens to the provided Spark
    + * driver, but the app will not get new tokens when those expire.
    + *
    + * It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens`
    + * method. This option does not require calling the `start` method, but leaves it up to the
    + * caller to distribute the tokens that were generated.
      */
     private[spark] class HadoopDelegationTokenManager(
    -    sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    fileSystems: Configuration => Set[FileSystem])
    -  extends Logging {
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
     
       private val deprecatedProviderEnabledConfigs = List(
         "spark.yarn.security.tokens.%s.enabled",
         "spark.yarn.security.credentials.%s.enabled")
       private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
     
    -  // Maintain all the registered delegation token providers
    -  private val delegationTokenProviders = getDelegationTokenProviders
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a keytab.")
    +    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +  }
    +
    +  private val delegationTokenProviders = loadProviders()
       logDebug("Using the following builtin delegation token providers: " +
         s"${delegationTokenProviders.keys.mkString(", ")}.")
     
    -  /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */
    -  def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
    -    this(
    -      sparkConf,
    -      hadoopConf,
    -      hadoopConf => Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf)))
    +  private var renewalExecutor: ScheduledExecutorService = _
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
    +
    +  protected def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
       }
     
    -  private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
    -    val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++
    -      safeCreateProvider(new HiveDelegationTokenProvider) ++
    -      safeCreateProvider(new HBaseDelegationTokenProvider)
    +  /**
    +   * Start the token renewer. Upon start, if a principal has been configured, the renewer will:
    +   *
    +   * - log in the configured principal, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * When token renewal is not enabled, this method will not start any periodic tasks. Instead, it
    +   * will generate tokens if the driver ref has been provided, update the current user, and send
    +   * those tokens to the driver. No future tokens will be generated, so when that initial set
    +   * expires, the app will stop working.
    +   *
    +   * @param driver If provided, the driver where to send the newly generated tokens.
    +   *               The same ref will also receive future token updates unless overridden later.
    +   * @return The newly logged in user, or null if a principal is not configured.
    +   */
    +  def start(driver: Option[RpcEndpointRef] = None): UserGroupInformation = {
    +    driver.foreach(setDriverRef)
     
    -    // Filter out providers for which spark.security.credentials.{service}.enabled is false.
    -    providers
    -      .filter { p => isServiceEnabled(p.serviceName) }
    -      .map { p => (p.serviceName, p) }
    -      .toMap
    +    if (principal != null) {
    +      renewalExecutor =
    +        ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
    +
    +      val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
    +      val ugi = doLogin()
    +
    +      val tgtRenewalTask = new Runnable() {
    +        override def run(): Unit = {
    +          ugi.checkTGTAndReloginFromKeytab()
    +        }
    +      }
    +      val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +      renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
    +        TimeUnit.SECONDS)
    +
    +      val creds = obtainTokensAndScheduleRenewal(ugi)
    +      ugi.addCredentials(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        val tokens = SparkHadoopUtil.get.serialize(creds)
    +        driver.send(UpdateDelegationTokens(tokens))
    +      }
    +
    +      // Transfer the original user's tokens to the new user, since it may contain needed tokens
    +      // (such as those user to connect to YARN). Explicitly avoid overwriting tokens that already
    +      // exist in the current user's credentials, since those were freshly obtained above
    +      // (see SPARK-23361).
    +      val existing = ugi.getCredentials()
    +      existing.mergeAll(originalCreds)
    +      ugi.addCredentials(existing)
    +      ugi
    +    } else {
    +      driver.foreach { ref =>
    --- End diff --
    
    and would nice to use the same naming as above here as well, eg. maybe `driverOpt.foreach { driver =>`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    **[Test build #97173 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97173/testReport)** for PR 22624 at commit [`638bb07`](https://github.com/apache/spark/commit/638bb0700d3faa6753a857f70035fce8f95b6a8b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    I merged start() with createAndUpdateTokens() in the last commit; the API is still awkward because of YARN's usage, but it simplifies the Mesos code a little bit. It can be easily reverted in any case.
    
    Tested on Mesos, testing on YARN at the moment.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3827/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    **[Test build #97623 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97623/testReport)** for PR 22624 at commit [`b3a282e`](https://github.com/apache/spark/commit/b3a282e9d9cc0a6202091603e84c1ce0d50269b7).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r227073121
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala ---
    @@ -17,76 +17,175 @@
     
     package org.apache.spark.deploy.security
     
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
     import org.apache.hadoop.conf.Configuration
     import org.apache.hadoop.fs.FileSystem
    -import org.apache.hadoop.security.Credentials
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
     
     /**
    - * Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to
    - * obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]],
    - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not
    - * explicitly disabled.
    + * Manager for delegation tokens in a Spark application.
    + *
    + * This manager has two modes of operation:
    + *
    + * 1.  When configured with a principal and a keytab, it will make sure long-running apps can run
    + * without interruption while accessing secured services. It periodically logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often
    + * to check that a relogin is necessary. This is done reasonably often since the check is a no-op
    + * when the relogin is not yet needed. The check period can be overridden in the configuration.
      *
    - * Also, each HadoopDelegationTokenProvider is controlled by
    - * spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to
    - * false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be
    - * enabled/disabled by the configuration spark.security.credentials.hive.enabled.
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
      *
    - * @param sparkConf Spark configuration
    - * @param hadoopConf Hadoop configuration
    - * @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems.
    + * 2. When operating without an explicit principal and keytab, token renewal will not be available.
    + * Starting the manager will distribute an initial set of delegation tokens to the provided Spark
    + * driver, but the app will not get new tokens when those expire.
    + *
    + * It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens`
    + * method. This option does not require calling the `start` method, but leaves it up to the
    + * caller to distribute the tokens that were generated.
      */
     private[spark] class HadoopDelegationTokenManager(
    -    sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    fileSystems: Configuration => Set[FileSystem])
    -  extends Logging {
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
     
       private val deprecatedProviderEnabledConfigs = List(
         "spark.yarn.security.tokens.%s.enabled",
         "spark.yarn.security.credentials.%s.enabled")
       private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
     
    -  // Maintain all the registered delegation token providers
    -  private val delegationTokenProviders = getDelegationTokenProviders
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a keytab.")
    +    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +  }
    +
    +  private val delegationTokenProviders = loadProviders()
       logDebug("Using the following builtin delegation token providers: " +
         s"${delegationTokenProviders.keys.mkString(", ")}.")
     
    -  /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */
    -  def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
    -    this(
    -      sparkConf,
    -      hadoopConf,
    -      hadoopConf => Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf)))
    +  private var renewalExecutor: ScheduledExecutorService = _
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
    +
    +  protected def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
       }
     
    -  private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
    -    val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++
    -      safeCreateProvider(new HiveDelegationTokenProvider) ++
    -      safeCreateProvider(new HBaseDelegationTokenProvider)
    +  /**
    +   * Start the token renewer. Upon start, if a principal has been configured, the renewer will:
    +   *
    +   * - log in the configured principal, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * When token renewal is not enabled, this method will not start any periodic tasks. Instead, it
    +   * will generate tokens if the driver ref has been provided, update the current user, and send
    +   * those tokens to the driver. No future tokens will be generated, so when that initial set
    +   * expires, the app will stop working.
    +   *
    +   * @param driver If provided, the driver where to send the newly generated tokens.
    +   *               The same ref will also receive future token updates unless overridden later.
    +   * @return The newly logged in user, or null if a principal is not configured.
    +   */
    +  def start(driver: Option[RpcEndpointRef] = None): UserGroupInformation = {
    +    driver.foreach(setDriverRef)
     
    -    // Filter out providers for which spark.security.credentials.{service}.enabled is false.
    -    providers
    -      .filter { p => isServiceEnabled(p.serviceName) }
    -      .map { p => (p.serviceName, p) }
    -      .toMap
    +    if (principal != null) {
    +      renewalExecutor =
    +        ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
    +
    +      val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
    +      val ugi = doLogin()
    +
    +      val tgtRenewalTask = new Runnable() {
    +        override def run(): Unit = {
    +          ugi.checkTGTAndReloginFromKeytab()
    +        }
    +      }
    +      val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +      renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
    +        TimeUnit.SECONDS)
    +
    +      val creds = obtainTokensAndScheduleRenewal(ugi)
    +      ugi.addCredentials(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        val tokens = SparkHadoopUtil.get.serialize(creds)
    --- End diff --
    
    > makes it simple to merely update the secret 
    
    The problem is that you still need Spark to load the updated secrets, and where is that logic?
    
    I'd rather keep all backends working the same way unless there is a good reason for them not to. If there is a good reason, then later you can make this change when you actually write code to implement that. But right now I don't see a reason for it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by ifilonenko <gi...@git.apache.org>.
Github user ifilonenko commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Thank you for this work @vanzin ! 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r228630414
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala ---
    @@ -18,45 +18,20 @@
     package org.apache.spark.deploy.k8s.security
     
     import org.apache.hadoop.conf.Configuration
    -import org.apache.hadoop.fs.FileSystem
    -import org.apache.hadoop.security.{Credentials, UserGroupInformation}
    +import org.apache.hadoop.security.UserGroupInformation
     
     import org.apache.spark.SparkConf
    -import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.deploy.security.HadoopDelegationTokenManager
    -import org.apache.spark.internal.Logging
     
     /**
    - * The KubernetesHadoopDelegationTokenManager fetches Hadoop delegation tokens
    - * on the behalf of the Kubernetes submission client. The new credentials
    - * (called Tokens when they are serialized) are stored in Secrets accessible
    - * to the driver and executors, when new Tokens are received they overwrite the current Secrets.
    + * Adds Kubernetes-specific functionality to HadoopDelegationTokenManager.
      */
     private[spark] class KubernetesHadoopDelegationTokenManager(
    --- End diff --
    
    This class (and a whole lot of code on the k8s side) is actually in my crosshairs for further cleanup; but that would mean changes that are unrelated to this one, so separate.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Add base class for token rene...

Posted by attilapiros <gi...@git.apache.org>.
Github user attilapiros commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r223398529
  
    --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala ---
    @@ -14,147 +14,39 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    -
     package org.apache.spark.scheduler.cluster.mesos
     
    -import java.security.PrivilegedExceptionAction
    -import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    -
     import org.apache.hadoop.conf.Configuration
    -import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
    -import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.deploy.security.AbstractCredentialRenewer
     import org.apache.spark.deploy.security.HadoopDelegationTokenManager
    -import org.apache.spark.internal.{config, Logging}
     import org.apache.spark.rpc.RpcEndpointRef
    -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    -import org.apache.spark.ui.UIUtils
    -import org.apache.spark.util.ThreadUtils
    -
     
     /**
    - * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf
    - * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer,
    - * and similarly will renew the Credentials when 75% of the renewal interval has passed.
    - * The principal difference is that instead of writing the new credentials to HDFS and
    - * incrementing the timestamp of the file, the new credentials (called Tokens when they are
    - * serialized) are broadcast to all running executors. On the executor side, when new Tokens are
    - * received they overwrite the current credentials.
    + * Mesos-specific implementation of AbstractCredentialRenewer.
      */
     private[spark] class MesosHadoopDelegationTokenManager(
    --- End diff --
    
    What about calling it MesosCredentialRenewer (following its base class naming and AMCredentialRenewer)? 
    And as it already has a HadoopDelegationTokenManager member with different responsibility.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97164/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Add base class for token rene...

Posted by attilapiros <gi...@git.apache.org>.
Github user attilapiros commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r223796403
  
    --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala ---
    @@ -34,15 +34,9 @@ private[spark] class MesosHadoopDelegationTokenManager(
     
       private val tokenManager = new HadoopDelegationTokenManager(sparkConf, hadoopConf)
     
    -  def start(driverEndpoint: RpcEndpointRef): Unit = {
    -    require(driverEndpoint != null, "DriverEndpoint is not initialized")
    -    setDriverRef(driverEndpoint)
    -    if (renewalEnabled) {
    -      super.start()
    -    } else {
    -      logInfo("Using ticket cache for Kerberos authentication, no token renewal.")
    -      createAndUpdateTokens()
    -    }
    +  override def start(driver: Option[RpcEndpointRef]): UserGroupInformation = {
    +    require(driver.orNull != null, "Driver endpoint is not initialized")
    --- End diff --
    
    What about `driver.isDefined` instead of `driver.orNull != null`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Add base class for token rene...

Posted by attilapiros <gi...@git.apache.org>.
Github user attilapiros commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r223622626
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/AbstractCredentialRenewer.scala ---
    @@ -0,0 +1,224 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.deploy.security
    +
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
    +
    +/**
    + * Base class for periodically updating delegation tokens needed by the application.
    + *
    + * When configured with a principal and a keytab, this manager will make sure long-running apps
    + * (such as Spark Streaming apps) can run without interruption while accessing secured services. It
    + * periodically logs in to the KDC with user-provided credentials, and contacts all the configured
    + * secure services to obtain delegation tokens to be distributed to the rest of the application.
    + *
    + * This class will manage the kerberos login, by renewing the TGT when needed. Because the UGI API
    + * does not expose the TTL of the TGT, a configuration controls how often to check that a relogin is
    + * necessary. This is done reasonably often since the check is a no-op when the relogin is not yet
    + * needed. The check period can be overridden in the configuration.
    + *
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
    + *
    + * This class can also be used when without a principal and keytab, in which case token renewal will
    + * not be available. It provides a different API in that case (see `createAndUpdateTokens()`), which
    + * automates the distribution of tokens to the different processes in the Spark app.
    + */
    +private[spark] abstract class AbstractCredentialRenewer(
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
    +
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a keytab.")
    +    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +  }
    +
    +  private val renewalExecutor: ScheduledExecutorService =
    +    if (principal != null) {
    +      ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
    +    } else {
    +      null
    +    }
    +
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
    +
    +  protected def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
    +  }
    +
    +  protected def renewalEnabled: Boolean = principal != null
    +
    +  /**
    +   * Start the token renewer. Upon start, if a principal has been configured, the renewer will:
    +   *
    +   * - log in the configured principal, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * @return The newly logged in user.
    +   */
    +  def start(): UserGroupInformation = {
    +    require(renewalEnabled, "Token renewal is disabled.")
    +
    +    val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
    +    val ugi = doLogin()
    +
    +    val tgtRenewalTask = new Runnable() {
    +      override def run(): Unit = {
    +        ugi.checkTGTAndReloginFromKeytab()
    +      }
    +    }
    +    val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +    renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
    +      TimeUnit.SECONDS)
    +
    +    val creds = obtainTokensAndScheduleRenewal(ugi)
    +    ugi.addCredentials(creds)
    +
    +    val driver = driverRef.get()
    +    if (driver != null) {
    --- End diff --
    
    What about extending this start() method with a boolean flag sendingInitialToken so it would be clear why this functionality is here and also this would not be controlled by the having driver registered beforehand (would be easy to see for Mesos this is a must have so the require can be used too in the if body).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Add base class for token rene...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r223767588
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/AbstractCredentialRenewer.scala ---
    @@ -0,0 +1,224 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.deploy.security
    +
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
    +
    +/**
    + * Base class for periodically updating delegation tokens needed by the application.
    + *
    + * When configured with a principal and a keytab, this manager will make sure long-running apps
    + * (such as Spark Streaming apps) can run without interruption while accessing secured services. It
    + * periodically logs in to the KDC with user-provided credentials, and contacts all the configured
    + * secure services to obtain delegation tokens to be distributed to the rest of the application.
    + *
    + * This class will manage the kerberos login, by renewing the TGT when needed. Because the UGI API
    + * does not expose the TTL of the TGT, a configuration controls how often to check that a relogin is
    + * necessary. This is done reasonably often since the check is a no-op when the relogin is not yet
    + * needed. The check period can be overridden in the configuration.
    + *
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
    + *
    + * This class can also be used when without a principal and keytab, in which case token renewal will
    + * not be available. It provides a different API in that case (see `createAndUpdateTokens()`), which
    + * automates the distribution of tokens to the different processes in the Spark app.
    + */
    +private[spark] abstract class AbstractCredentialRenewer(
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
    +
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a keytab.")
    +    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +  }
    +
    +  private val renewalExecutor: ScheduledExecutorService =
    +    if (principal != null) {
    +      ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
    +    } else {
    +      null
    +    }
    +
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
    +
    +  protected def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
    +  }
    +
    +  protected def renewalEnabled: Boolean = principal != null
    +
    +  /**
    +   * Start the token renewer. Upon start, if a principal has been configured, the renewer will:
    +   *
    +   * - log in the configured principal, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * @return The newly logged in user.
    +   */
    +  def start(): UserGroupInformation = {
    +    require(renewalEnabled, "Token renewal is disabled.")
    +
    +    val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
    +    val ugi = doLogin()
    +
    +    val tgtRenewalTask = new Runnable() {
    +      override def run(): Unit = {
    +        ugi.checkTGTAndReloginFromKeytab()
    +      }
    +    }
    +    val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +    renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
    +      TimeUnit.SECONDS)
    +
    +    val creds = obtainTokensAndScheduleRenewal(ugi)
    +    ugi.addCredentials(creds)
    +
    +    val driver = driverRef.get()
    +    if (driver != null) {
    +      val tokens = SparkHadoopUtil.get.serialize(creds)
    +      driver.send(UpdateDelegationTokens(tokens))
    +    }
    +
    +    // Transfer the original user's tokens to the new user, since it may contain needed tokens
    +    // (such as those user to connect to YARN). Explicitly avoid overwriting tokens that already
    +    // exist in the current user's credentials, since those were freshly obtained above
    +    // (see SPARK-23361).
    +    val existing = ugi.getCredentials()
    +    existing.mergeAll(originalCreds)
    +    ugi.addCredentials(existing)
    +    ugi
    +  }
    +
    +  def stop(): Unit = {
    +    if (renewalExecutor != null) {
    +      renewalExecutor.shutdown()
    +    }
    +  }
    +
    +  /** Create new tokens for the current user and distribute them to the driver. */
    +  protected def createAndUpdateTokens(): Unit = {
    +    val driver = driverRef.get()
    +    require(driver != null, "Driver endpoint not set.")
    +
    +    val creds = new Credentials()
    +    obtainDelegationTokens(creds)
    +    UserGroupInformation.getCurrentUser.addCredentials(creds)
    +
    +    val tokens = SparkHadoopUtil.get.serialize(creds)
    +    driver.send(UpdateDelegationTokens(tokens))
    +  }
    +
    +  /**
    +   * Fetch new delegation tokens for configured services, storing them in the given credentials.
    +   *
    +   * @return The time by which the tokens must be renewed.
    +   */
    +  protected def obtainDelegationTokens(creds: Credentials): Long
    +
    +  private def scheduleRenewal(delay: Long): Unit = {
    +    val _delay = math.max(0, delay)
    +    logInfo(s"Scheduling login from keytab in ${UIUtils.formatDuration(delay)}.")
    +
    +    val renewalTask = new Runnable() {
    +      override def run(): Unit = {
    +        updateTokensTask()
    +      }
    +    }
    +    renewalExecutor.schedule(renewalTask, _delay, TimeUnit.MILLISECONDS)
    +  }
    +
    +  /**
    +   * Periodic task to login to the KDC and create new delegation tokens. Re-schedules itself
    +   * to fetch the next set of tokens when needed.
    +   */
    +  private def updateTokensTask(): Unit = {
    +    try {
    +      val freshUGI = doLogin()
    +      val creds = obtainTokensAndScheduleRenewal(freshUGI)
    +      val tokens = SparkHadoopUtil.get.serialize(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        logInfo("Updating delegation tokens.")
    +        driver.send(UpdateDelegationTokens(tokens))
    +      } else {
    +        // This shouldn't really happen, since the driver should register way before tokens expire.
    +        logWarning("Delegation tokens close to expiration but no driver has registered yet.")
    +        SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
    +      }
    +    } catch {
    +      case e: Exception =>
    +        val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
    +        logWarning(s"Failed to update tokens, will try again in ${UIUtils.formatDuration(delay)}!" +
    +          " If this happens too often tasks will fail.", e)
    +        scheduleRenewal(delay)
    +    }
    +  }
    +
    +  /**
    +   * Obtain new delegation tokens from the available providers. Schedules a new task to fetch
    +   * new tokens before the new set expires.
    +   *
    +   * @return Credentials containing the new tokens.
    +   */
    +  private def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): Credentials = {
    +    ugi.doAs(new PrivilegedExceptionAction[Credentials]() {
    +      override def run(): Credentials = {
    +        val creds = new Credentials()
    +        val nextRenewal = obtainDelegationTokens(creds)
    +
    +        // Calculate the time when new credentials should be created, based on the configured
    +        // ratio.
    +        val now = System.currentTimeMillis
    +        val ratio = sparkConf.get(CREDENTIALS_RENEWAL_INTERVAL_RATIO)
    --- End diff --
    
    What is the advantage to stashing it in a variable? It just decouples the code that is reading the config from the code that is using it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by ifilonenko <gi...@git.apache.org>.
Github user ifilonenko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r228605725
  
    --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala ---
    @@ -18,45 +18,20 @@
     package org.apache.spark.deploy.k8s.security
     
     import org.apache.hadoop.conf.Configuration
    -import org.apache.hadoop.fs.FileSystem
    -import org.apache.hadoop.security.{Credentials, UserGroupInformation}
    +import org.apache.hadoop.security.UserGroupInformation
     
     import org.apache.spark.SparkConf
    -import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.deploy.security.HadoopDelegationTokenManager
    -import org.apache.spark.internal.Logging
     
     /**
    - * The KubernetesHadoopDelegationTokenManager fetches Hadoop delegation tokens
    - * on the behalf of the Kubernetes submission client. The new credentials
    - * (called Tokens when they are serialized) are stored in Secrets accessible
    - * to the driver and executors, when new Tokens are received they overwrite the current Secrets.
    + * Adds Kubernetes-specific functionality to HadoopDelegationTokenManager.
      */
     private[spark] class KubernetesHadoopDelegationTokenManager(
    --- End diff --
    
    Still needed for mock purposes, for unit testing, in the code's current state and will be adapted to override `start()` class in an upcoming PR. So it still remains relevant to keep. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97173/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Add base class for token rene...

Posted by attilapiros <gi...@git.apache.org>.
Github user attilapiros commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r223624353
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/AbstractCredentialRenewer.scala ---
    @@ -0,0 +1,224 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.deploy.security
    +
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
    +
    +/**
    + * Base class for periodically updating delegation tokens needed by the application.
    + *
    + * When configured with a principal and a keytab, this manager will make sure long-running apps
    + * (such as Spark Streaming apps) can run without interruption while accessing secured services. It
    + * periodically logs in to the KDC with user-provided credentials, and contacts all the configured
    + * secure services to obtain delegation tokens to be distributed to the rest of the application.
    + *
    + * This class will manage the kerberos login, by renewing the TGT when needed. Because the UGI API
    + * does not expose the TTL of the TGT, a configuration controls how often to check that a relogin is
    + * necessary. This is done reasonably often since the check is a no-op when the relogin is not yet
    + * needed. The check period can be overridden in the configuration.
    + *
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
    + *
    + * This class can also be used when without a principal and keytab, in which case token renewal will
    + * not be available. It provides a different API in that case (see `createAndUpdateTokens()`), which
    + * automates the distribution of tokens to the different processes in the Spark app.
    + */
    +private[spark] abstract class AbstractCredentialRenewer(
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
    +
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a keytab.")
    +    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +  }
    +
    +  private val renewalExecutor: ScheduledExecutorService =
    +    if (principal != null) {
    +      ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
    +    } else {
    +      null
    +    }
    +
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
    +
    +  protected def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
    +  }
    +
    +  protected def renewalEnabled: Boolean = principal != null
    +
    +  /**
    +   * Start the token renewer. Upon start, if a principal has been configured, the renewer will:
    +   *
    +   * - log in the configured principal, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * @return The newly logged in user.
    +   */
    +  def start(): UserGroupInformation = {
    +    require(renewalEnabled, "Token renewal is disabled.")
    +
    +    val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
    +    val ugi = doLogin()
    +
    +    val tgtRenewalTask = new Runnable() {
    +      override def run(): Unit = {
    +        ugi.checkTGTAndReloginFromKeytab()
    +      }
    +    }
    +    val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +    renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
    +      TimeUnit.SECONDS)
    +
    +    val creds = obtainTokensAndScheduleRenewal(ugi)
    +    ugi.addCredentials(creds)
    +
    +    val driver = driverRef.get()
    +    if (driver != null) {
    +      val tokens = SparkHadoopUtil.get.serialize(creds)
    +      driver.send(UpdateDelegationTokens(tokens))
    +    }
    +
    +    // Transfer the original user's tokens to the new user, since it may contain needed tokens
    +    // (such as those user to connect to YARN). Explicitly avoid overwriting tokens that already
    +    // exist in the current user's credentials, since those were freshly obtained above
    +    // (see SPARK-23361).
    +    val existing = ugi.getCredentials()
    +    existing.mergeAll(originalCreds)
    +    ugi.addCredentials(existing)
    +    ugi
    +  }
    +
    +  def stop(): Unit = {
    +    if (renewalExecutor != null) {
    +      renewalExecutor.shutdown()
    +    }
    +  }
    +
    +  /** Create new tokens for the current user and distribute them to the driver. */
    +  protected def createAndUpdateTokens(): Unit = {
    +    val driver = driverRef.get()
    +    require(driver != null, "Driver endpoint not set.")
    +
    +    val creds = new Credentials()
    +    obtainDelegationTokens(creds)
    +    UserGroupInformation.getCurrentUser.addCredentials(creds)
    +
    +    val tokens = SparkHadoopUtil.get.serialize(creds)
    +    driver.send(UpdateDelegationTokens(tokens))
    +  }
    +
    +  /**
    +   * Fetch new delegation tokens for configured services, storing them in the given credentials.
    +   *
    +   * @return The time by which the tokens must be renewed.
    +   */
    +  protected def obtainDelegationTokens(creds: Credentials): Long
    +
    +  private def scheduleRenewal(delay: Long): Unit = {
    +    val _delay = math.max(0, delay)
    +    logInfo(s"Scheduling login from keytab in ${UIUtils.formatDuration(delay)}.")
    +
    +    val renewalTask = new Runnable() {
    +      override def run(): Unit = {
    +        updateTokensTask()
    +      }
    +    }
    +    renewalExecutor.schedule(renewalTask, _delay, TimeUnit.MILLISECONDS)
    +  }
    +
    +  /**
    +   * Periodic task to login to the KDC and create new delegation tokens. Re-schedules itself
    +   * to fetch the next set of tokens when needed.
    +   */
    +  private def updateTokensTask(): Unit = {
    +    try {
    +      val freshUGI = doLogin()
    +      val creds = obtainTokensAndScheduleRenewal(freshUGI)
    +      val tokens = SparkHadoopUtil.get.serialize(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        logInfo("Updating delegation tokens.")
    +        driver.send(UpdateDelegationTokens(tokens))
    +      } else {
    +        // This shouldn't really happen, since the driver should register way before tokens expire.
    +        logWarning("Delegation tokens close to expiration but no driver has registered yet.")
    +        SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
    +      }
    +    } catch {
    +      case e: Exception =>
    +        val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
    +        logWarning(s"Failed to update tokens, will try again in ${UIUtils.formatDuration(delay)}!" +
    +          " If this happens too often tasks will fail.", e)
    +        scheduleRenewal(delay)
    +    }
    +  }
    +
    +  /**
    +   * Obtain new delegation tokens from the available providers. Schedules a new task to fetch
    +   * new tokens before the new set expires.
    +   *
    +   * @return Credentials containing the new tokens.
    +   */
    +  private def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): Credentials = {
    +    ugi.doAs(new PrivilegedExceptionAction[Credentials]() {
    +      override def run(): Credentials = {
    +        val creds = new Credentials()
    +        val nextRenewal = obtainDelegationTokens(creds)
    +
    +        // Calculate the time when new credentials should be created, based on the configured
    +        // ratio.
    +        val now = System.currentTimeMillis
    +        val ratio = sparkConf.get(CREDENTIALS_RENEWAL_INTERVAL_RATIO)
    --- End diff --
    
    why reading this config value every time? (As I have seen other places this is extracted to member vals).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin closed the pull request at:

    https://github.com/apache/spark/pull/22624


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by ifilonenko <gi...@git.apache.org>.
Github user ifilonenko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r227064934
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala ---
    @@ -17,76 +17,175 @@
     
     package org.apache.spark.deploy.security
     
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
     import org.apache.hadoop.conf.Configuration
     import org.apache.hadoop.fs.FileSystem
    -import org.apache.hadoop.security.Credentials
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
     
     /**
    - * Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to
    - * obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]],
    - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not
    - * explicitly disabled.
    + * Manager for delegation tokens in a Spark application.
    + *
    + * This manager has two modes of operation:
    + *
    + * 1.  When configured with a principal and a keytab, it will make sure long-running apps can run
    + * without interruption while accessing secured services. It periodically logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often
    + * to check that a relogin is necessary. This is done reasonably often since the check is a no-op
    + * when the relogin is not yet needed. The check period can be overridden in the configuration.
      *
    - * Also, each HadoopDelegationTokenProvider is controlled by
    - * spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to
    - * false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be
    - * enabled/disabled by the configuration spark.security.credentials.hive.enabled.
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
      *
    - * @param sparkConf Spark configuration
    - * @param hadoopConf Hadoop configuration
    - * @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems.
    + * 2. When operating without an explicit principal and keytab, token renewal will not be available.
    + * Starting the manager will distribute an initial set of delegation tokens to the provided Spark
    + * driver, but the app will not get new tokens when those expire.
    + *
    + * It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens`
    + * method. This option does not require calling the `start` method, but leaves it up to the
    + * caller to distribute the tokens that were generated.
      */
     private[spark] class HadoopDelegationTokenManager(
    -    sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    fileSystems: Configuration => Set[FileSystem])
    -  extends Logging {
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
     
       private val deprecatedProviderEnabledConfigs = List(
         "spark.yarn.security.tokens.%s.enabled",
         "spark.yarn.security.credentials.%s.enabled")
       private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
     
    -  // Maintain all the registered delegation token providers
    -  private val delegationTokenProviders = getDelegationTokenProviders
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a keytab.")
    +    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +  }
    +
    +  private val delegationTokenProviders = loadProviders()
       logDebug("Using the following builtin delegation token providers: " +
         s"${delegationTokenProviders.keys.mkString(", ")}.")
     
    -  /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */
    -  def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
    -    this(
    -      sparkConf,
    -      hadoopConf,
    -      hadoopConf => Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf)))
    +  private var renewalExecutor: ScheduledExecutorService = _
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
    +
    +  protected def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
       }
     
    -  private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
    -    val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++
    -      safeCreateProvider(new HiveDelegationTokenProvider) ++
    -      safeCreateProvider(new HBaseDelegationTokenProvider)
    +  /**
    +   * Start the token renewer. Upon start, if a principal has been configured, the renewer will:
    +   *
    +   * - log in the configured principal, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * When token renewal is not enabled, this method will not start any periodic tasks. Instead, it
    +   * will generate tokens if the driver ref has been provided, update the current user, and send
    +   * those tokens to the driver. No future tokens will be generated, so when that initial set
    +   * expires, the app will stop working.
    +   *
    +   * @param driver If provided, the driver where to send the newly generated tokens.
    +   *               The same ref will also receive future token updates unless overridden later.
    +   * @return The newly logged in user, or null if a principal is not configured.
    +   */
    +  def start(driver: Option[RpcEndpointRef] = None): UserGroupInformation = {
    +    driver.foreach(setDriverRef)
     
    -    // Filter out providers for which spark.security.credentials.{service}.enabled is false.
    -    providers
    -      .filter { p => isServiceEnabled(p.serviceName) }
    -      .map { p => (p.serviceName, p) }
    -      .toMap
    +    if (principal != null) {
    +      renewalExecutor =
    +        ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
    +
    +      val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
    +      val ugi = doLogin()
    +
    +      val tgtRenewalTask = new Runnable() {
    +        override def run(): Unit = {
    +          ugi.checkTGTAndReloginFromKeytab()
    +        }
    +      }
    +      val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +      renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
    +        TimeUnit.SECONDS)
    +
    +      val creds = obtainTokensAndScheduleRenewal(ugi)
    +      ugi.addCredentials(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        val tokens = SparkHadoopUtil.get.serialize(creds)
    --- End diff --
    
    Can these tokens be packaged in a tuple or case class with the UGI? i.e. 
    `(tokens, ugi)` as a response for this method 
    Reason being, (I know it is preferred to use driver.send(token)), but logic in Kubernetes makes it simple to merely update the secret (as updates to Secrets are automatically detected by Driver + Executor pods and this logic is already being run natively by the k8s api). Unless we are being strict in being idiomatic to Spark, I think leveraging the benefits of Kubernetes here would be beneficial. I'll leave it up to you in deciding how executors should detect a newly updated token. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97506/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Add base class for token rene...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r223493773
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/AbstractCredentialRenewer.scala ---
    @@ -0,0 +1,224 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.deploy.security
    +
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
    +
    +/**
    + * Base class for periodically updating delegation tokens needed by the application.
    + *
    + * When configured with a principal and a keytab, this manager will make sure long-running apps
    + * (such as Spark Streaming apps) can run without interruption while accessing secured services. It
    + * periodically logs in to the KDC with user-provided credentials, and contacts all the configured
    + * secure services to obtain delegation tokens to be distributed to the rest of the application.
    + *
    + * This class will manage the kerberos login, by renewing the TGT when needed. Because the UGI API
    + * does not expose the TTL of the TGT, a configuration controls how often to check that a relogin is
    + * necessary. This is done reasonably often since the check is a no-op when the relogin is not yet
    + * needed. The check period can be overridden in the configuration.
    + *
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
    + *
    + * This class can also be used when without a principal and keytab, in which case token renewal will
    + * not be available. It provides a different API in that case (see `createAndUpdateTokens()`), which
    + * automates the distribution of tokens to the different processes in the Spark app.
    + */
    +private[spark] abstract class AbstractCredentialRenewer(
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
    +
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a keytab.")
    +    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +  }
    +
    +  private val renewalExecutor: ScheduledExecutorService =
    +    if (principal != null) {
    +      ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
    +    } else {
    +      null
    +    }
    +
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
    +
    +  protected def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
    +  }
    +
    +  protected def renewalEnabled: Boolean = principal != null
    +
    +  /**
    +   * Start the token renewer. Upon start, if a principal has been configured, the renewer will:
    +   *
    +   * - log in the configured principal, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * @return The newly logged in user.
    +   */
    +  def start(): UserGroupInformation = {
    +    require(renewalEnabled, "Token renewal is disabled.")
    +
    +    val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
    +    val ugi = doLogin()
    +
    +    val tgtRenewalTask = new Runnable() {
    +      override def run(): Unit = {
    +        ugi.checkTGTAndReloginFromKeytab()
    +      }
    +    }
    +    val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +    renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
    +      TimeUnit.SECONDS)
    +
    +    val creds = obtainTokensAndScheduleRenewal(ugi)
    +    ugi.addCredentials(creds)
    +
    +    val driver = driverRef.get()
    +    if (driver != null) {
    --- End diff --
    
    YARN starts the token renewer before there is a driver reference available; the AM gets that later (when it connects back to the driver in client mode, or when the user code is started in cluster mode).
    
    I've been trying to come up with a better API here that would avoid these checks, but haven't figure it out yet.
    
    Basically for the YARN case you would need:
    
    - an initial call to login and fetch delegation tokens, before the driver is even known (the fix for SPARK-23361)
    - a call to start the renewal task, which needs information retrieved in the above call (the renewal interval)
    
    So breaking it into two calls would need some state to be kept between the calls. Not sure whether that would be much cleaner than what's here now.
    
    Trying to follow the Mesos way (where the driver itself manages the renewer, instead of the AM code in YARN) would probably not work in YARN, because the AM tries to connect to HDFS (for distributed cache functionality) before the driver is running, so it needs those tokens. It might be possible to rework some of the YARN backend internals to avoid that, I think, but it needs more thought and would be better left as a separate / future enhancement. 
    
    At some point I'd also like to change the client mode on YARN to be more like the Mesos case (where the driver manages the login, not the AM), maybe all this could be cleaned up at that time.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    **[Test build #98105 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98105/testReport)** for PR 22624 at commit [`da8be70`](https://github.com/apache/spark/commit/da8be70321dcc3d943535231d5eb26591f5115e0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Add base class for token rene...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r223766345
  
    --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala ---
    @@ -14,147 +14,39 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    -
     package org.apache.spark.scheduler.cluster.mesos
     
    -import java.security.PrivilegedExceptionAction
    -import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    -
     import org.apache.hadoop.conf.Configuration
    -import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
    -import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.deploy.security.AbstractCredentialRenewer
     import org.apache.spark.deploy.security.HadoopDelegationTokenManager
    -import org.apache.spark.internal.{config, Logging}
     import org.apache.spark.rpc.RpcEndpointRef
    -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    -import org.apache.spark.ui.UIUtils
    -import org.apache.spark.util.ThreadUtils
    -
     
     /**
    - * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf
    - * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer,
    - * and similarly will renew the Credentials when 75% of the renewal interval has passed.
    - * The principal difference is that instead of writing the new credentials to HDFS and
    - * incrementing the timestamp of the file, the new credentials (called Tokens when they are
    - * serialized) are broadcast to all running executors. On the executor side, when new Tokens are
    - * received they overwrite the current credentials.
    + * Mesos-specific implementation of AbstractCredentialRenewer.
      */
     private[spark] class MesosHadoopDelegationTokenManager(
    --- End diff --
    
    That's the existing name and I'd rather not change it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    **[Test build #98238 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98238/testReport)** for PR 22624 at commit [`88fe6cb`](https://github.com/apache/spark/commit/88fe6cb9105d140d5ebb250baee2623bb07650ce).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    any more comments @ifilonenko @skonto ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r229081942
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -230,7 +242,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
             val reply = SparkAppConfig(
               sparkProperties,
               SparkEnv.get.securityManager.getIOEncryptionKey(),
    -          fetchHadoopDelegationTokens())
    +          Option(delegationTokens.get()))
    --- End diff --
    
    >  But I was a bit hazy on what happens after that
    
    The RPC layer is basically a message queue. So first message to be sent = first message to be processed.
    
    But as I said, I'll clean this up since now the code is all in `CoarseGrainedSchedulerBackend` and that makes it easier.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by ifilonenko <gi...@git.apache.org>.
Github user ifilonenko commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    If we are talking about the token renewal functionality, could we possibly refactor the `HadoopFSDelegationTokenProvider` as well. I found that within the function `obtainDelegationTokens()`:
    
    This code-block:
    ```
        val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fsToGetTokens, creds)
    
        // Get the token renewal interval if it is not set. It will only be called once.
        if (tokenRenewalInterval == null) {
          tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf, fsToGetTokens)
        }
    ```
    calls `fetchDelegationTokens()` twice since the `tokenRenewalInterval` will always be null upon creation of the `TokenManager` which I think is unnecessary in the case of Kubernetes (as you are creating 2 DTs when only one is needed.) Idk if use-case is different in Mesos / Yarn, but could this possibly be refactored to only call `fetchDelegationTokens()` once upon startup or to have a param to specify `tokenRenewalInterval`? I could send a follow-up PR if desired, but idk if this fits better within the scope of this PR. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    **[Test build #96915 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96915/testReport)** for PR 22624 at commit [`5b7604e`](https://github.com/apache/spark/commit/5b7604e77122b4c44048b14e8be47d17eeffd866).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    merged to master


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    **[Test build #98238 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98238/testReport)** for PR 22624 at commit [`88fe6cb`](https://github.com/apache/spark/commit/88fe6cb9105d140d5ebb250baee2623bb07650ce).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by ifilonenko <gi...@git.apache.org>.
Github user ifilonenko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r227075160
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala ---
    @@ -17,76 +17,175 @@
     
     package org.apache.spark.deploy.security
     
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
     import org.apache.hadoop.conf.Configuration
     import org.apache.hadoop.fs.FileSystem
    -import org.apache.hadoop.security.Credentials
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
     
     /**
    - * Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to
    - * obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]],
    - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not
    - * explicitly disabled.
    + * Manager for delegation tokens in a Spark application.
    + *
    + * This manager has two modes of operation:
    + *
    + * 1.  When configured with a principal and a keytab, it will make sure long-running apps can run
    + * without interruption while accessing secured services. It periodically logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often
    + * to check that a relogin is necessary. This is done reasonably often since the check is a no-op
    + * when the relogin is not yet needed. The check period can be overridden in the configuration.
      *
    - * Also, each HadoopDelegationTokenProvider is controlled by
    - * spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to
    - * false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be
    - * enabled/disabled by the configuration spark.security.credentials.hive.enabled.
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
      *
    - * @param sparkConf Spark configuration
    - * @param hadoopConf Hadoop configuration
    - * @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems.
    + * 2. When operating without an explicit principal and keytab, token renewal will not be available.
    + * Starting the manager will distribute an initial set of delegation tokens to the provided Spark
    + * driver, but the app will not get new tokens when those expire.
    + *
    + * It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens`
    + * method. This option does not require calling the `start` method, but leaves it up to the
    + * caller to distribute the tokens that were generated.
      */
     private[spark] class HadoopDelegationTokenManager(
    -    sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    fileSystems: Configuration => Set[FileSystem])
    -  extends Logging {
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
     
       private val deprecatedProviderEnabledConfigs = List(
         "spark.yarn.security.tokens.%s.enabled",
         "spark.yarn.security.credentials.%s.enabled")
       private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
     
    -  // Maintain all the registered delegation token providers
    -  private val delegationTokenProviders = getDelegationTokenProviders
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a keytab.")
    +    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +  }
    +
    +  private val delegationTokenProviders = loadProviders()
       logDebug("Using the following builtin delegation token providers: " +
         s"${delegationTokenProviders.keys.mkString(", ")}.")
     
    -  /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */
    -  def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
    -    this(
    -      sparkConf,
    -      hadoopConf,
    -      hadoopConf => Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf)))
    +  private var renewalExecutor: ScheduledExecutorService = _
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
    +
    +  protected def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
       }
     
    -  private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
    -    val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++
    -      safeCreateProvider(new HiveDelegationTokenProvider) ++
    -      safeCreateProvider(new HBaseDelegationTokenProvider)
    +  /**
    +   * Start the token renewer. Upon start, if a principal has been configured, the renewer will:
    +   *
    +   * - log in the configured principal, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * When token renewal is not enabled, this method will not start any periodic tasks. Instead, it
    +   * will generate tokens if the driver ref has been provided, update the current user, and send
    +   * those tokens to the driver. No future tokens will be generated, so when that initial set
    +   * expires, the app will stop working.
    +   *
    +   * @param driver If provided, the driver where to send the newly generated tokens.
    +   *               The same ref will also receive future token updates unless overridden later.
    +   * @return The newly logged in user, or null if a principal is not configured.
    +   */
    +  def start(driver: Option[RpcEndpointRef] = None): UserGroupInformation = {
    +    driver.foreach(setDriverRef)
     
    -    // Filter out providers for which spark.security.credentials.{service}.enabled is false.
    -    providers
    -      .filter { p => isServiceEnabled(p.serviceName) }
    -      .map { p => (p.serviceName, p) }
    -      .toMap
    +    if (principal != null) {
    +      renewalExecutor =
    +        ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
    +
    +      val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
    +      val ugi = doLogin()
    +
    +      val tgtRenewalTask = new Runnable() {
    +        override def run(): Unit = {
    +          ugi.checkTGTAndReloginFromKeytab()
    +        }
    +      }
    +      val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +      renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
    +        TimeUnit.SECONDS)
    +
    +      val creds = obtainTokensAndScheduleRenewal(ugi)
    +      ugi.addCredentials(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        val tokens = SparkHadoopUtil.get.serialize(creds)
    --- End diff --
    
    > The problem is that you still need Spark to load the updated secrets, and where is that logic?
    
    Agreed, it isn't there yet. But I agree, such a change could be added on later, but it will just require a slight refactor of this method which I wanted to pre-empt. But sounds good! 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3664/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    **[Test build #97163 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97163/testReport)** for PR 22624 at commit [`ef8628a`](https://github.com/apache/spark/commit/ef8628ac1a3059bb4871ebd8f1daa6deacf5d7f3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r228630089
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala ---
    @@ -110,32 +209,105 @@ private[spark] class HadoopDelegationTokenManager(
       }
     
       /**
    -   * Get delegation token provider for the specified service.
    +   * List of file systems for which to obtain delegation tokens. The base implementation
    +   * returns just the default file system in the given Hadoop configuration.
        */
    -  def getServiceDelegationTokenProvider(service: String): Option[HadoopDelegationTokenProvider] = {
    -    delegationTokenProviders.get(service)
    +  protected def fileSystemsToAccess(): Set[FileSystem] = {
    +    Set(FileSystem.get(hadoopConf))
    +  }
    +
    +  private def scheduleRenewal(delay: Long): Unit = {
    +    val _delay = math.max(0, delay)
    +    logInfo(s"Scheduling login from keytab in ${UIUtils.formatDuration(delay)}.")
    +
    +    val renewalTask = new Runnable() {
    +      override def run(): Unit = {
    +        updateTokensTask()
    +      }
    +    }
    +    renewalExecutor.schedule(renewalTask, _delay, TimeUnit.MILLISECONDS)
       }
     
       /**
    -   * Writes delegation tokens to creds.  Delegation tokens are fetched from all registered
    -   * providers.
    -   *
    -   * @param hadoopConf hadoop Configuration
    -   * @param creds Credentials that will be updated in place (overwritten)
    -   * @return Time after which the fetched delegation tokens should be renewed.
    +   * Periodic task to login to the KDC and create new delegation tokens. Re-schedules itself
    +   * to fetch the next set of tokens when needed.
        */
    -  def obtainDelegationTokens(
    -      hadoopConf: Configuration,
    -      creds: Credentials): Long = {
    -    delegationTokenProviders.values.flatMap { provider =>
    -      if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
    -        provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
    +  private def updateTokensTask(): Unit = {
    +    try {
    +      val freshUGI = doLogin()
    +      val creds = obtainTokensAndScheduleRenewal(freshUGI)
    +      val tokens = SparkHadoopUtil.get.serialize(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        logInfo("Updating delegation tokens.")
    +        driver.send(UpdateDelegationTokens(tokens))
           } else {
    -        logDebug(s"Service ${provider.serviceName} does not require a token." +
    -          s" Check your configuration to see if security is disabled or not.")
    -        None
    +        // This shouldn't really happen, since the driver should register way before tokens expire.
    +        logWarning("Delegation tokens close to expiration but no driver has registered yet.")
    +        SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
           }
    -    }.foldLeft(Long.MaxValue)(math.min)
    +    } catch {
    +      case e: Exception =>
    +        val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
    +        logWarning(s"Failed to update tokens, will try again in ${UIUtils.formatDuration(delay)}!" +
    +          " If this happens too often tasks will fail.", e)
    +        scheduleRenewal(delay)
    +    }
    +  }
    +
    +  /**
    +   * Obtain new delegation tokens from the available providers. Schedules a new task to fetch
    +   * new tokens before the new set expires.
    +   *
    +   * @return Credentials containing the new tokens.
    +   */
    +  private def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): Credentials = {
    +    ugi.doAs(new PrivilegedExceptionAction[Credentials]() {
    +      override def run(): Credentials = {
    +        val creds = new Credentials()
    +        val nextRenewal = obtainDelegationTokens(creds)
    +
    +        // Calculate the time when new credentials should be created, based on the configured
    +        // ratio.
    +        val now = System.currentTimeMillis
    +        val ratio = sparkConf.get(CREDENTIALS_RENEWAL_INTERVAL_RATIO)
    +        val adjustedNextRenewal = (now + (ratio * (nextRenewal - now))).toLong
    +
    +        scheduleRenewal(adjustedNextRenewal - now)
    +        creds
    +      }
    +    })
    +  }
    +
    +  private def doLogin(): UserGroupInformation = {
    +    logInfo(s"Attempting to login to KDC using principal: $principal")
    +    val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
    +    logInfo("Successfully logged into KDC.")
    +    ugi
    +  }
    +
    +  private def loadProviders(): Map[String, HadoopDelegationTokenProvider] = {
    +    val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystemsToAccess)) ++
    +      safeCreateProvider(new HiveDelegationTokenProvider) ++
    +      safeCreateProvider(new HBaseDelegationTokenProvider)
    +
    +    // Filter out providers for which spark.security.credentials.{service}.enabled is false.
    +    providers
    +      .filter { p => isServiceEnabled(p.serviceName) }
    +      .map { p => (p.serviceName, p) }
    +      .toMap
       }
    -}
     
    +  private def safeCreateProvider(
    +      createFn: => HadoopDelegationTokenProvider): Option[HadoopDelegationTokenProvider] = {
    +    try {
    +      Some(createFn)
    +    } catch {
    +      case t: Throwable =>
    +        logDebug(s"Failed to load built in provider.", t)
    --- End diff --
    
    I think `NoClassDefFoundError` is the main one but I'm not sure whether others may occur. I think SO or OOM here are very unlikely so it's a pretty small risk to leave this as is.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Trying more pings: @jerryshao @tgravescs 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r229031795
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala ---
    @@ -17,76 +17,175 @@
     
     package org.apache.spark.deploy.security
     
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
     import org.apache.hadoop.conf.Configuration
     import org.apache.hadoop.fs.FileSystem
    -import org.apache.hadoop.security.Credentials
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
     
     import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
     
     /**
    - * Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to
    - * obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]],
    - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not
    - * explicitly disabled.
    + * Manager for delegation tokens in a Spark application.
    + *
    + * This manager has two modes of operation:
    + *
    + * 1.  When configured with a principal and a keytab, it will make sure long-running apps can run
    + * without interruption while accessing secured services. It periodically logs in to the KDC with
    + * user-provided credentials, and contacts all the configured secure services to obtain delegation
    + * tokens to be distributed to the rest of the application.
    + *
    + * Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often
    + * to check that a relogin is necessary. This is done reasonably often since the check is a no-op
    + * when the relogin is not yet needed. The check period can be overridden in the configuration.
      *
    - * Also, each HadoopDelegationTokenProvider is controlled by
    - * spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to
    - * false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be
    - * enabled/disabled by the configuration spark.security.credentials.hive.enabled.
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
      *
    - * @param sparkConf Spark configuration
    - * @param hadoopConf Hadoop configuration
    - * @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems.
    + * 2. When operating without an explicit principal and keytab, token renewal will not be available.
    + * Starting the manager will distribute an initial set of delegation tokens to the provided Spark
    + * driver, but the app will not get new tokens when those expire.
    + *
    + * It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens`
    + * method. This option does not require calling the `start` method, but leaves it up to the
    + * caller to distribute the tokens that were generated.
      */
     private[spark] class HadoopDelegationTokenManager(
    -    sparkConf: SparkConf,
    -    hadoopConf: Configuration,
    -    fileSystems: Configuration => Set[FileSystem])
    -  extends Logging {
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
     
       private val deprecatedProviderEnabledConfigs = List(
         "spark.yarn.security.tokens.%s.enabled",
         "spark.yarn.security.credentials.%s.enabled")
       private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
     
    -  // Maintain all the registered delegation token providers
    -  private val delegationTokenProviders = getDelegationTokenProviders
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  require((principal == null) == (keytab == null),
    +    "Both principal and keytab must be defined, or neither.")
    +  require(keytab == null || new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +
    +  private val delegationTokenProviders = loadProviders()
       logDebug("Using the following builtin delegation token providers: " +
         s"${delegationTokenProviders.keys.mkString(", ")}.")
     
    -  /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */
    -  def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
    -    this(
    -      sparkConf,
    -      hadoopConf,
    -      hadoopConf => Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf)))
    +  private var renewalExecutor: ScheduledExecutorService = _
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
    +
    +  protected def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
       }
     
    -  private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
    -    val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++
    -      safeCreateProvider(new HiveDelegationTokenProvider) ++
    -      safeCreateProvider(new HBaseDelegationTokenProvider)
    +  /**
    +   * Start the token renewer. Upon start, if a principal has been configured, the renewer will:
    +   *
    +   * - log in the configured principal, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * When token renewal is not enabled, this method will not start any periodic tasks. Instead, it
    +   * will generate tokens if the driver ref has been provided, update the current user, and send
    +   * those tokens to the driver. No future tokens will be generated, so when that initial set
    +   * expires, the app will stop working.
    +   *
    +   * @param driverEndpoint If provided, the driver where to send the newly generated tokens.
    +   *                       The same ref will also receive future token updates unless overridden
    +   *                       later.
    +   * @return The newly logged in user, or null if a principal is not configured.
    +   */
    +  def start(driverEndpoint: Option[RpcEndpointRef] = None): UserGroupInformation = {
    +    driverEndpoint.foreach(setDriverRef)
     
    -    // Filter out providers for which spark.security.credentials.{service}.enabled is false.
    -    providers
    -      .filter { p => isServiceEnabled(p.serviceName) }
    -      .map { p => (p.serviceName, p) }
    -      .toMap
    +    if (principal != null) {
    +      renewalExecutor =
    +        ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
    +
    +      val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
    +      val ugi = doLogin()
    +
    +      val tgtRenewalTask = new Runnable() {
    +        override def run(): Unit = {
    +          ugi.checkTGTAndReloginFromKeytab()
    +        }
    +      }
    +      val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +      renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
    +        TimeUnit.SECONDS)
    +
    +      val creds = obtainTokensAndScheduleRenewal(ugi)
    +      ugi.addCredentials(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        val tokens = SparkHadoopUtil.get.serialize(creds)
    +        driver.send(UpdateDelegationTokens(tokens))
    +      }
    +
    +      // Transfer the original user's tokens to the new user, since it may contain needed tokens
    +      // (such as those user to connect to YARN). Explicitly avoid overwriting tokens that already
    +      // exist in the current user's credentials, since those were freshly obtained above
    +      // (see SPARK-23361).
    +      val existing = ugi.getCredentials()
    +      existing.mergeAll(originalCreds)
    +      ugi.addCredentials(existing)
    +      ugi
    +    } else {
    +      driverEndpoint.foreach { driver =>
    --- End diff --
    
    This is the part of the current API that I dislike the most, but I really don't know how to make it better without fixing SPARK-25689 first.
    
    This at least keeps a single method to initialize delegation tokens, instead of having logic in the call sites about what method to call.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Add base class for token rene...

Posted by attilapiros <gi...@git.apache.org>.
Github user attilapiros commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r223616283
  
    --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala ---
    @@ -60,8 +60,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
         with org.apache.mesos.Scheduler with MesosSchedulerUtils {
     
    -  private lazy val hadoopDelegationTokenManager: MesosHadoopDelegationTokenManager =
    -    new MesosHadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint)
    +  private val tokenManager: MesosHadoopDelegationTokenManager = {
    --- End diff --
    
    Option?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r228398765
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala ---
    @@ -110,32 +209,105 @@ private[spark] class HadoopDelegationTokenManager(
       }
     
       /**
    -   * Get delegation token provider for the specified service.
    +   * List of file systems for which to obtain delegation tokens. The base implementation
    +   * returns just the default file system in the given Hadoop configuration.
        */
    -  def getServiceDelegationTokenProvider(service: String): Option[HadoopDelegationTokenProvider] = {
    -    delegationTokenProviders.get(service)
    +  protected def fileSystemsToAccess(): Set[FileSystem] = {
    +    Set(FileSystem.get(hadoopConf))
    +  }
    +
    +  private def scheduleRenewal(delay: Long): Unit = {
    +    val _delay = math.max(0, delay)
    +    logInfo(s"Scheduling login from keytab in ${UIUtils.formatDuration(delay)}.")
    +
    +    val renewalTask = new Runnable() {
    +      override def run(): Unit = {
    +        updateTokensTask()
    +      }
    +    }
    +    renewalExecutor.schedule(renewalTask, _delay, TimeUnit.MILLISECONDS)
       }
     
       /**
    -   * Writes delegation tokens to creds.  Delegation tokens are fetched from all registered
    -   * providers.
    -   *
    -   * @param hadoopConf hadoop Configuration
    -   * @param creds Credentials that will be updated in place (overwritten)
    -   * @return Time after which the fetched delegation tokens should be renewed.
    +   * Periodic task to login to the KDC and create new delegation tokens. Re-schedules itself
    +   * to fetch the next set of tokens when needed.
        */
    -  def obtainDelegationTokens(
    -      hadoopConf: Configuration,
    -      creds: Credentials): Long = {
    -    delegationTokenProviders.values.flatMap { provider =>
    -      if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
    -        provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
    +  private def updateTokensTask(): Unit = {
    +    try {
    +      val freshUGI = doLogin()
    +      val creds = obtainTokensAndScheduleRenewal(freshUGI)
    +      val tokens = SparkHadoopUtil.get.serialize(creds)
    +
    +      val driver = driverRef.get()
    +      if (driver != null) {
    +        logInfo("Updating delegation tokens.")
    +        driver.send(UpdateDelegationTokens(tokens))
           } else {
    -        logDebug(s"Service ${provider.serviceName} does not require a token." +
    -          s" Check your configuration to see if security is disabled or not.")
    -        None
    +        // This shouldn't really happen, since the driver should register way before tokens expire.
    +        logWarning("Delegation tokens close to expiration but no driver has registered yet.")
    +        SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
           }
    -    }.foldLeft(Long.MaxValue)(math.min)
    +    } catch {
    +      case e: Exception =>
    +        val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
    +        logWarning(s"Failed to update tokens, will try again in ${UIUtils.formatDuration(delay)}!" +
    +          " If this happens too often tasks will fail.", e)
    +        scheduleRenewal(delay)
    +    }
    +  }
    +
    +  /**
    +   * Obtain new delegation tokens from the available providers. Schedules a new task to fetch
    +   * new tokens before the new set expires.
    +   *
    +   * @return Credentials containing the new tokens.
    +   */
    +  private def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): Credentials = {
    +    ugi.doAs(new PrivilegedExceptionAction[Credentials]() {
    +      override def run(): Credentials = {
    +        val creds = new Credentials()
    +        val nextRenewal = obtainDelegationTokens(creds)
    +
    +        // Calculate the time when new credentials should be created, based on the configured
    +        // ratio.
    +        val now = System.currentTimeMillis
    +        val ratio = sparkConf.get(CREDENTIALS_RENEWAL_INTERVAL_RATIO)
    +        val adjustedNextRenewal = (now + (ratio * (nextRenewal - now))).toLong
    +
    +        scheduleRenewal(adjustedNextRenewal - now)
    +        creds
    +      }
    +    })
    +  }
    +
    +  private def doLogin(): UserGroupInformation = {
    +    logInfo(s"Attempting to login to KDC using principal: $principal")
    +    val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
    +    logInfo("Successfully logged into KDC.")
    +    ugi
    +  }
    +
    +  private def loadProviders(): Map[String, HadoopDelegationTokenProvider] = {
    +    val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystemsToAccess)) ++
    +      safeCreateProvider(new HiveDelegationTokenProvider) ++
    +      safeCreateProvider(new HBaseDelegationTokenProvider)
    +
    +    // Filter out providers for which spark.security.credentials.{service}.enabled is false.
    +    providers
    +      .filter { p => isServiceEnabled(p.serviceName) }
    +      .map { p => (p.serviceName, p) }
    +      .toMap
       }
    -}
     
    +  private def safeCreateProvider(
    +      createFn: => HadoopDelegationTokenProvider): Option[HadoopDelegationTokenProvider] = {
    +    try {
    +      Some(createFn)
    +    } catch {
    +      case t: Throwable =>
    +        logDebug(s"Failed to load built in provider.", t)
    --- End diff --
    
    I know this is old code, but do you know why this is catching `Throwable` and not `Exception`?  Just for `NoClassDefFoundError`?  I wonder if we should special case that ... I worry about trying to recover from stackoverflow or oom


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3665/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/4135/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Merge token renewer functiona...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r229032985
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -230,7 +242,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
             val reply = SparkAppConfig(
               sparkProperties,
               SparkEnv.get.securityManager.getIOEncryptionKey(),
    -          fetchHadoopDelegationTokens())
    +          Option(delegationTokens.get()))
    --- End diff --
    
    Although it's not obvious I think this is fine.
    
    The token manager is started before any rm-specific code runs and requests executors (in the `CoarseGrainedSchedulerBackend.start()` method; rm-specific schedulers call that before they do anything). So the message to set these tokens is the first message queued on the driver, and this reference will already be set when the first executor asks for this info.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Merge token renewer functionality in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/4068/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Add base class for token rene...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r223767408
  
    --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala ---
    @@ -60,8 +60,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
         with org.apache.mesos.Scheduler with MesosSchedulerUtils {
     
    -  private lazy val hadoopDelegationTokenManager: MesosHadoopDelegationTokenManager =
    -    new MesosHadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint)
    +  private val tokenManager: MesosHadoopDelegationTokenManager = {
    --- End diff --
    
    Just adds unnecessary indirection.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    **[Test build #97163 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97163/testReport)** for PR 22624 at commit [`ef8628a`](https://github.com/apache/spark/commit/ef8628ac1a3059bb4871ebd8f1daa6deacf5d7f3).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22624: [SPARK-23781][CORE] Add base class for token rene...

Posted by ifilonenko <gi...@git.apache.org>.
Github user ifilonenko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22624#discussion_r223472701
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/security/AbstractCredentialRenewer.scala ---
    @@ -0,0 +1,224 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.deploy.security
    +
    +import java.io.File
    +import java.security.PrivilegedExceptionAction
    +import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.security.{Credentials, UserGroupInformation}
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
    +import org.apache.spark.ui.UIUtils
    +import org.apache.spark.util.ThreadUtils
    +
    +/**
    + * Base class for periodically updating delegation tokens needed by the application.
    + *
    + * When configured with a principal and a keytab, this manager will make sure long-running apps
    + * (such as Spark Streaming apps) can run without interruption while accessing secured services. It
    + * periodically logs in to the KDC with user-provided credentials, and contacts all the configured
    + * secure services to obtain delegation tokens to be distributed to the rest of the application.
    + *
    + * This class will manage the kerberos login, by renewing the TGT when needed. Because the UGI API
    + * does not expose the TTL of the TGT, a configuration controls how often to check that a relogin is
    + * necessary. This is done reasonably often since the check is a no-op when the relogin is not yet
    + * needed. The check period can be overridden in the configuration.
    + *
    + * New delegation tokens are created once 75% of the renewal interval of the original tokens has
    + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
    + * The driver is tasked with distributing the tokens to other processes that might need them.
    + *
    + * This class can also be used when without a principal and keytab, in which case token renewal will
    + * not be available. It provides a different API in that case (see `createAndUpdateTokens()`), which
    + * automates the distribution of tokens to the different processes in the Spark app.
    + */
    +private[spark] abstract class AbstractCredentialRenewer(
    +    protected val sparkConf: SparkConf,
    +    protected val hadoopConf: Configuration) extends Logging {
    +
    +  private val principal = sparkConf.get(PRINCIPAL).orNull
    +  private val keytab = sparkConf.get(KEYTAB).orNull
    +
    +  if (principal != null) {
    +    require(keytab != null, "Kerberos principal specified without a keytab.")
    +    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
    +  }
    +
    +  private val renewalExecutor: ScheduledExecutorService =
    +    if (principal != null) {
    +      ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
    +    } else {
    +      null
    +    }
    +
    +  private val driverRef = new AtomicReference[RpcEndpointRef]()
    +
    +  protected def setDriverRef(ref: RpcEndpointRef): Unit = {
    +    driverRef.set(ref)
    +  }
    +
    +  protected def renewalEnabled: Boolean = principal != null
    +
    +  /**
    +   * Start the token renewer. Upon start, if a principal has been configured, the renewer will:
    +   *
    +   * - log in the configured principal, and set up a task to keep that user's ticket renewed
    +   * - obtain delegation tokens from all available providers
    +   * - schedule a periodic task to update the tokens when needed.
    +   *
    +   * @return The newly logged in user.
    +   */
    +  def start(): UserGroupInformation = {
    +    require(renewalEnabled, "Token renewal is disabled.")
    +
    +    val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
    +    val ugi = doLogin()
    +
    +    val tgtRenewalTask = new Runnable() {
    +      override def run(): Unit = {
    +        ugi.checkTGTAndReloginFromKeytab()
    +      }
    +    }
    +    val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
    +    renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
    +      TimeUnit.SECONDS)
    +
    +    val creds = obtainTokensAndScheduleRenewal(ugi)
    +    ugi.addCredentials(creds)
    +
    +    val driver = driverRef.get()
    +    if (driver != null) {
    --- End diff --
    
    Should a `require(driver.isDefined)` be thrown here to ensure that the driver sends the updates?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22624: [SPARK-23781][CORE] Add base class for token renewal fun...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/22624
  
    Given the lack of reviews let me ping some people: @skonto @jerryshao 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org