You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/01/28 21:32:45 UTC

[spark] branch master updated: [SPARK-26595][CORE] Allow credential renewal based on kerberos ticket cache.

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a67dbf  [SPARK-26595][CORE] Allow credential renewal based on kerberos ticket cache.
2a67dbf is described below

commit 2a67dbfbd341af166b1c85904875f26a6dea5ba8
Author: Marcelo Vanzin <va...@cloudera.com>
AuthorDate: Mon Jan 28 13:32:34 2019 -0800

    [SPARK-26595][CORE] Allow credential renewal based on kerberos ticket cache.
    
    This change addes a new mode for credential renewal that does not require
    a keytab; it uses the local ticket cache instead, so it works while the
    user keeps the cache valid.
    
    This can be useful for, e.g., people running long spark-shell sessions where
    their kerberos login is kept up-to-date.
    
    The main change to enable this behavior is in HadoopDelegationTokenManager,
    with a small change in the HDFS token provider. The other changes are to avoid
    creating duplicate tokens when submitting the application to YARN; they allow
    the tokens from the scheduler to be sent to the YARN AM, reducing the round trips
    to HDFS.
    
    For that, the scheduler initialization code was changed a little bit so that
    the tokens are available when the YARN client is initialized. That basically
    takes care of a long-standing TODO that was in the code to clean up configuration
    propagation to the driver's RPC endpoint (in CoarseGrainedSchedulerBackend).
    
    Tested with an app designed to stress this functionality, with both keytab and
    cache-based logins. Some basic kerberos tests on k8s also.
    
    Closes #23525 from vanzin/SPARK-26595.
    
    Authored-by: Marcelo Vanzin <va...@cloudera.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../security/HadoopDelegationTokenManager.scala    | 38 +++++++++++++++-----
 .../security/HadoopFSDelegationTokenProvider.scala | 34 +++++++++---------
 .../org/apache/spark/internal/config/package.scala | 10 ++++++
 .../cluster/CoarseGrainedSchedulerBackend.scala    | 42 ++++++++--------------
 docs/security.md                                   | 32 ++++++++++++-----
 .../k8s/KubernetesClusterSchedulerBackend.scala    | 12 +++----
 .../mesos/MesosCoarseGrainedSchedulerBackend.scala |  5 ++-
 .../org/apache/spark/deploy/yarn/Client.scala      | 25 ++++++-------
 .../cluster/YarnClientSchedulerBackend.scala       |  8 ++---
 .../cluster/YarnClusterSchedulerBackend.scala      |  2 ++
 .../scheduler/cluster/YarnSchedulerBackend.scala   | 15 ++++----
 11 files changed, 123 insertions(+), 100 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index 2763a46..487291e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -41,7 +41,7 @@ import org.apache.spark.util.{ThreadUtils, Utils}
 /**
  * Manager for delegation tokens in a Spark application.
  *
- * When configured with a principal and a keytab, this manager will make sure long-running apps can
+ * When delegation token renewal is enabled, this manager will make sure long-running apps can
  * run without interruption while accessing secured services. It periodically logs in to the KDC
  * with user-provided credentials, and contacts all the configured secure services to obtain
  * delegation tokens to be distributed to the rest of the application.
@@ -50,6 +50,11 @@ import org.apache.spark.util.{ThreadUtils, Utils}
  * elapsed. The new tokens are sent to the Spark driver endpoint. The driver is tasked with
  * distributing the tokens to other processes that might need them.
  *
+ * Renewal can be enabled in two different ways: by providing a principal and keytab to Spark, or by
+ * enabling renewal based on the local credential cache. The latter has the drawback that Spark
+ * can't create new TGTs by itself, so the user has to manually update the Kerberos ticket cache
+ * externally.
+ *
  * This class can also be used just to create delegation tokens, by calling the
  * `obtainDelegationTokens` method. This option does not require calling the `start` method nor
  * providing a driver reference, but leaves it up to the caller to distribute the tokens that were
@@ -81,7 +86,11 @@ private[spark] class HadoopDelegationTokenManager(
   private var renewalExecutor: ScheduledExecutorService = _
 
   /** @return Whether delegation token renewal is enabled. */
-  def renewalEnabled: Boolean = principal != null
+  def renewalEnabled: Boolean = sparkConf.get(KERBEROS_RENEWAL_CREDENTIALS) match {
+    case "keytab" => principal != null
+    case "ccache" => UserGroupInformation.getCurrentUser().hasKerberosCredentials()
+    case _ => false
+  }
 
   /**
    * Start the token renewer. Requires a principal and keytab. Upon start, the renewer will
@@ -121,7 +130,7 @@ private[spark] class HadoopDelegationTokenManager(
 
   def stop(): Unit = {
     if (renewalExecutor != null) {
-      renewalExecutor.shutdown()
+      renewalExecutor.shutdownNow()
     }
   }
 
@@ -182,7 +191,7 @@ private[spark] class HadoopDelegationTokenManager(
 
   private def scheduleRenewal(delay: Long): Unit = {
     val _delay = math.max(0, delay)
-    logInfo(s"Scheduling login from keytab in ${UIUtils.formatDuration(delay)}.")
+    logInfo(s"Scheduling renewal in ${UIUtils.formatDuration(delay)}.")
 
     val renewalTask = new Runnable() {
       override def run(): Unit = {
@@ -206,6 +215,9 @@ private[spark] class HadoopDelegationTokenManager(
       schedulerRef.send(UpdateDelegationTokens(tokens))
       tokens
     } catch {
+      case _: InterruptedException =>
+        // Ignore, may happen if shutting down.
+        null
       case e: Exception =>
         val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
         logWarning(s"Failed to update tokens, will try again in ${UIUtils.formatDuration(delay)}!" +
@@ -239,11 +251,19 @@ private[spark] class HadoopDelegationTokenManager(
   }
 
   private def doLogin(): UserGroupInformation = {
-    logInfo(s"Attempting to login to KDC using principal: $principal")
-    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
-    val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
-    logInfo("Successfully logged into KDC.")
-    ugi
+    if (principal != null) {
+      logInfo(s"Attempting to login to KDC using principal: $principal")
+      require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
+      val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
+      logInfo("Successfully logged into KDC.")
+      ugi
+    } else {
+      logInfo(s"Attempting to load user's ticket cache.")
+      val ccache = sparkConf.getenv("KRB5CCNAME")
+      val user = Option(sparkConf.getenv("KRB5PRINCIPAL")).getOrElse(
+        UserGroupInformation.getCurrentUser().getUserName())
+      UserGroupInformation.getUGIFromTicketCache(ccache, user)
+    }
   }
 
   private def loadProviders(): Map[String, HadoopDelegationTokenProvider] = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
index 2cd160c..3bc40de 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
@@ -100,7 +100,7 @@ private[deploy] class HadoopFSDelegationTokenProvider
       creds: Credentials): Credentials = {
 
     filesystems.foreach { fs =>
-      logInfo("getting token for: " + fs)
+      logInfo(s"getting token for: $fs with renewer $renewer")
       fs.addDelegationTokens(renewer, creds)
     }
 
@@ -114,22 +114,22 @@ private[deploy] class HadoopFSDelegationTokenProvider
     // We cannot use the tokens generated with renewer yarn. Trying to renew
     // those will fail with an access control issue. So create new tokens with the logged in
     // user as renewer.
-    sparkConf.get(PRINCIPAL).flatMap { renewer =>
-      val creds = new Credentials()
-      fetchDelegationTokens(renewer, filesystems, creds)
-
-      val renewIntervals = creds.getAllTokens.asScala.filter {
-        _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
-      }.flatMap { token =>
-        Try {
-          val newExpiration = token.renew(hadoopConf)
-          val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
-          val interval = newExpiration - identifier.getIssueDate
-          logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}")
-          interval
-        }.toOption
-      }
-      if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
+    val renewer = UserGroupInformation.getCurrentUser().getUserName()
+
+    val creds = new Credentials()
+    fetchDelegationTokens(renewer, filesystems, creds)
+
+    val renewIntervals = creds.getAllTokens.asScala.filter {
+      _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
+    }.flatMap { token =>
+      Try {
+        val newExpiration = token.renew(hadoopConf)
+        val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
+        val interval = newExpiration - identifier.getIssueDate
+        logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}")
+        interval
+      }.toOption
     }
+    if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 32559ae..66b1ff7 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -354,6 +354,16 @@ package object config {
     .timeConf(TimeUnit.SECONDS)
     .createWithDefaultString("1m")
 
+  private[spark] val KERBEROS_RENEWAL_CREDENTIALS =
+    ConfigBuilder("spark.kerberos.renewal.credentials")
+      .doc(
+        "Which credentials to use when renewing delegation tokens for executors. Can be either " +
+        "'keytab', the default, which requires a keytab to be provided, or 'ccache', which uses " +
+        "the local credentials cache.")
+      .stringConf
+      .checkValues(Set("keytab", "ccache"))
+      .createWithDefault("keytab")
+
   private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances")
     .intConf
     .createOptional
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 3b05875..6bc0bdd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -110,14 +110,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   private val reviveThread =
     ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
 
-  class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
-    extends ThreadSafeRpcEndpoint with Logging {
+  class DriverEndpoint extends ThreadSafeRpcEndpoint with Logging {
+
+    override val rpcEnv: RpcEnv = CoarseGrainedSchedulerBackend.this.rpcEnv
 
     // Executors that have been lost, but for which we don't yet know the real exit reason.
     protected val executorsPendingLossReason = new HashSet[String]
 
     protected val addressToExecutorId = new HashMap[RpcAddress, String]
 
+    // Spark configuration sent to executors. This is a lazy val so that subclasses of the
+    // scheduler can modify the SparkConf object before this view is created.
+    private lazy val sparkProperties = scheduler.sc.conf.getAll
+      .filter { case (k, _) => k.startsWith("spark.") }
+      .toSeq
+
     override def onStart() {
       // Periodically revive offers to allow delay scheduling to work
       val reviveIntervalMs = conf.get(SCHEDULER_REVIVE_INTERVAL).getOrElse(1000L)
@@ -386,23 +393,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     }
   }
 
-  var driverEndpoint: RpcEndpointRef = null
+  val driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint())
 
   protected def minRegisteredRatio: Double = _minRegisteredRatio
 
   override def start() {
-    val properties = new ArrayBuffer[(String, String)]
-    for ((key, value) <- scheduler.sc.conf.getAll) {
-      if (key.startsWith("spark.")) {
-        properties += ((key, value))
-      }
-    }
-
-    // TODO (prashant) send conf instead of properties
-    driverEndpoint = createDriverEndpointRef(properties)
-
     if (UserGroupInformation.isSecurityEnabled()) {
-      delegationTokenManager = createTokenManager(driverEndpoint)
+      delegationTokenManager = createTokenManager()
       delegationTokenManager.foreach { dtm =>
         val tokens = if (dtm.renewalEnabled) {
           dtm.start()
@@ -412,20 +409,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
           SparkHadoopUtil.get.serialize(creds)
         }
         if (tokens != null) {
-          delegationTokens.set(tokens)
+          updateDelegationTokens(tokens)
         }
       }
     }
   }
 
-  protected def createDriverEndpointRef(
-      properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
-    rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
-  }
-
-  protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
-    new DriverEndpoint(rpcEnv, properties)
-  }
+  protected def createDriverEndpoint(): DriverEndpoint = new DriverEndpoint()
 
   def stopExecutors() {
     try {
@@ -715,12 +705,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
    * Create the delegation token manager to be used for the application. This method is called
    * once during the start of the scheduler backend (so after the object has already been
    * fully constructed), only if security is enabled in the Hadoop configuration.
-   *
-   * @param schedulerRef RPC endpoint for the scheduler, where updated delegation tokens should be
-   *                     sent.
    */
-  protected def createTokenManager(
-      schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = None
+  protected def createTokenManager(): Option[HadoopDelegationTokenManager] = None
 
   /**
    * Called when a new set of delegation tokens is sent to the driver. Child classes can override
diff --git a/docs/security.md b/docs/security.md
index 8416ed9..a1dc584 100644
--- a/docs/security.md
+++ b/docs/security.md
@@ -776,16 +776,32 @@ The following options provides finer-grained control for this feature:
 Long-running applications may run into issues if their run time exceeds the maximum delegation
 token lifetime configured in services it needs to access.
 
-Spark supports automatically creating new tokens for these applications when running in YARN mode.
-Kerberos credentials need to be provided to the Spark application via the `spark-submit` command,
-using the `--principal` and `--keytab` parameters.
+This feature is not available everywhere. In particular, it's only implemented
+on YARN and Kubernetes (both client and cluster modes), and on Mesos when using client mode.
 
-The provided keytab will be copied over to the machine running the Application Master via the Hadoop
-Distributed Cache. For this reason, it's strongly recommended that both YARN and HDFS be secured
-with encryption, at least.
+Spark supports automatically creating new tokens for these applications. There are two ways to
+enable this functionality.
 
-The Kerberos login will be periodically renewed using the provided credentials, and new delegation
-tokens for supported will be created.
+### Using a Keytab
+
+By providing Spark with a principal and keytab (e.g. using `spark-submit` with `--principal`
+and `--keytab` parameters), the application will maintain a valid Kerberos login that can be
+used to retrieve delegation tokens indefinitely.
+
+Note that when using a keytab in cluster mode, it will be copied over to the machine running the
+Spark driver. In the case of YARN, this means using HDFS as a staging area for the keytab, so it's
+strongly recommended that both YARN and HDFS be secured with encryption, at least.
+
+### Using a ticket cache
+
+By setting `spark.kerberos.renewal.credentials` to `ccache` in Spark's configuration, the local
+Kerberos ticket cache will be used for authentication. Spark will keep the ticket renewed during its
+renewable life, but after it expires a new ticket needs to be acquired (e.g. by running `kinit`).
+
+It's up to the user to maintain an updated ticket cache that Spark can use.
+
+The location of the ticket cache can be customized by setting the `KRB5CCNAME` environment
+variable.
 
 ## Secure Interaction with Kubernetes
 
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index 0d2737e..4a91a2f 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -144,17 +144,15 @@ private[spark] class KubernetesClusterSchedulerBackend(
     // Don't do anything else - let event handling from the Kubernetes API do the Spark changes
   }
 
-  override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
-    new KubernetesDriverEndpoint(sc.env.rpcEnv, properties)
+  override def createDriverEndpoint(): DriverEndpoint = {
+    new KubernetesDriverEndpoint()
   }
 
-  override protected def createTokenManager(
-      schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = {
-    Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, schedulerRef))
+  override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = {
+    Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint))
   }
 
-  private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
-      extends DriverEndpoint(rpcEnv, sparkProperties) {
+  private class KubernetesDriverEndpoint extends DriverEndpoint {
 
     override def onDisconnected(rpcAddress: RpcAddress): Unit = {
       // Don't do anything besides disabling the executor - allow the Kubernetes API events to
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index be2854f..7e2a8ba 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -772,9 +772,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
     }
   }
 
-  override protected def createTokenManager(
-      schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = {
-    Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, schedulerRef))
+  override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = {
+    Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint))
   }
 
   private def numExecutors(): Int = {
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 7582a2b..7523e3c 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -306,26 +306,21 @@ private[spark] class Client(
   /**
    * Set up security tokens for launching our ApplicationMaster container.
    *
-   * This method will obtain delegation tokens from all the registered providers, and set them in
-   * the AM's launch context.
+   * In client mode, a set of credentials has been obtained by the scheduler, so they are copied
+   * and sent to the AM. In cluster mode, new credentials are obtained and then sent to the AM,
+   * along with whatever credentials the current user already has.
    */
   private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
-    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
-    val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf, null)
-    credentialManager.obtainDelegationTokens(credentials)
-
-    // When using a proxy user, copy the delegation tokens to the user's credentials. Avoid
-    // that for regular users, since in those case the user already has access to the TGT,
-    // and adding delegation tokens could lead to expired or cancelled tokens being used
-    // later, as reported in SPARK-15754.
     val currentUser = UserGroupInformation.getCurrentUser()
-    if (SparkHadoopUtil.get.isProxyUser(currentUser)) {
-      currentUser.addCredentials(credentials)
+    val credentials = currentUser.getCredentials()
+
+    if (isClusterMode) {
+      val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf, null)
+      credentialManager.obtainDelegationTokens(credentials)
     }
 
-    val dob = new DataOutputBuffer
-    credentials.writeTokenStorageToStream(dob)
-    amContainer.setTokens(ByteBuffer.wrap(dob.getData))
+    val serializedCreds = SparkHadoopUtil.get.serialize(credentials)
+    amContainer.setTokens(ByteBuffer.wrap(serializedCreds))
   }
 
   /** Get the application report from the ResourceManager for an application we have submitted. */
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 7b77f8c..c7c495f 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -43,6 +43,8 @@ private[spark] class YarnClientSchedulerBackend(
    * This waits until the application is running.
    */
   override def start() {
+    super.start()
+
     val driverHost = conf.get(config.DRIVER_HOST_ADDRESS)
     val driverPort = conf.get(config.DRIVER_PORT)
     val hostport = driverHost + ":" + driverPort
@@ -57,14 +59,12 @@ private[spark] class YarnClientSchedulerBackend(
     client = new Client(args, conf, sc.env.rpcEnv)
     bindToYarn(client.submitApplication(), None)
 
-    // SPARK-8687: Ensure all necessary properties have already been set before
-    // we initialize our driver scheduler backend, which serves these properties
-    // to the executors
-    super.start()
     waitForApplication()
 
     monitorThread = asyncMonitorApplication()
     monitorThread.start()
+
+    startBindings()
   }
 
   /**
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 62bf981..e6680e1 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 
 import org.apache.spark.SparkContext
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.util.Utils
@@ -35,6 +36,7 @@ private[spark] class YarnClusterSchedulerBackend(
     bindToYarn(attemptId.getApplicationId(), Some(attemptId))
     super.start()
     totalExpectedExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(sc.conf)
+    startBindings()
   }
 
   override def getDriverLogUrls: Option[Map[String, String]] = {
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 3dae11e..821fbcd 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -90,11 +90,10 @@ private[spark] abstract class YarnSchedulerBackend(
     this.attemptId = attemptId
   }
 
-  override def start() {
+  protected def startBindings(): Unit = {
     require(appId.isDefined, "application ID unset")
     val binding = SchedulerExtensionServiceBinding(sc, appId.get, attemptId)
     services.start(binding)
-    super.start()
   }
 
   override def stop(): Unit = {
@@ -209,8 +208,8 @@ private[spark] abstract class YarnSchedulerBackend(
     }
   }
 
-  override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
-    new YarnDriverEndpoint(rpcEnv, properties)
+  override def createDriverEndpoint(): DriverEndpoint = {
+    new YarnDriverEndpoint()
   }
 
   /**
@@ -223,9 +222,8 @@ private[spark] abstract class YarnSchedulerBackend(
     sc.executorAllocationManager.foreach(_.reset())
   }
 
-  override protected def createTokenManager(
-      schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = {
-    Some(new YARNHadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration, schedulerRef))
+  override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = {
+    Some(new YARNHadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration, driverEndpoint))
   }
 
   /**
@@ -233,8 +231,7 @@ private[spark] abstract class YarnSchedulerBackend(
    * This endpoint communicates with the executors and queries the AM for an executor's exit
    * status when the executor is disconnected.
    */
-  private class YarnDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
-      extends DriverEndpoint(rpcEnv, sparkProperties) {
+  private class YarnDriverEndpoint extends DriverEndpoint {
 
     /**
      * When onDisconnected is received at the driver endpoint, the superclass DriverEndpoint


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org