You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/06/15 13:47:44 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #2708] Open engine session and renew engine credentials in the one

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 37229d419 [KYUUBI #2708] Open engine session and renew engine credentials in the one
37229d419 is described below

commit 37229d419ebbd4beaf459208813f3bb801c16a17
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Wed Jun 15 21:47:36 2022 +0800

    [KYUUBI #2708] Open engine session and renew engine credentials in the one
    
    ### _Why are the changes needed?_
    
    To close #2708
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2856 from turboFei/open_in_one.
    
    Closes #2708
    
    2fd77951 [Fei Wang] comments
    35ae00d4 [Fei Wang] comment
    e02c13e3 [Fei Wang] use kyuubiconf
    70d96775 [Fei Wang] refactor
    cfb1a36b [Fei Wang] refresh token for startup
    f55912f5 [Fei Wang] comments
    130a3bb3 [Fei Wang] remove credentials to prevent escalate to session conf
    d14ebf78 [Fei Wang] fix ut
    71ec3c9e [Fei Wang] renew token when opening session
    
    Authored-by: Fei Wang <fw...@ebay.com>
    Signed-off-by: Fei Wang <fw...@ebay.com>
---
 .../kyuubi/engine/spark/SparkSQLEngine.scala       | 11 +++-
 .../engine/spark/SparkTBinaryFrontendService.scala | 69 +++++++++++++---------
 .../apache/kyuubi/config/KyuubiReservedKeys.scala  |  1 +
 .../credentials/HadoopCredentialsManager.scala     | 13 +++-
 .../org/apache/kyuubi/operation/LaunchEngine.scala | 17 +-----
 .../apache/kyuubi/session/KyuubiSessionImpl.scala  | 23 +++++++-
 6 files changed, 83 insertions(+), 51 deletions(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index 9c1936157..51362f302 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.SparkSession
 
 import org.apache.kyuubi.{KyuubiException, Logging, Utils}
 import org.apache.kyuubi.Utils._
-import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_SUBMIT_TIME_KEY
 import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch, currentEngine}
@@ -179,7 +179,16 @@ object SparkSQLEngine extends Logging {
   }
 
   def createSpark(): SparkSession = {
+    val engineCredentials = kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
+    kyuubiConf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
+    _sparkConf.remove(s"spark.${KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY}")
+
     val session = SparkSession.builder.config(_sparkConf).getOrCreate
+
+    engineCredentials.filter(_.nonEmpty).foreach { credentials =>
+      SparkTBinaryFrontendService.renewDelegationToken(session.sparkContext, credentials)
+    }
+
     KyuubiSparkUtil.initializeSparkSession(
       session,
       kyuubiConf.get(ENGINE_INITIALIZE_SQL) ++ kyuubiConf.get(ENGINE_SESSION_INITIALIZE_SQL))
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala
index bbc6bfaa0..6b2250bd4 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala
@@ -21,9 +21,11 @@ import org.apache.hadoop.io.Text
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 import org.apache.hadoop.security.token.{Token, TokenIdentifier}
 import org.apache.hive.service.rpc.thrift.{TOpenSessionReq, TOpenSessionResp, TRenewDelegationTokenReq, TRenewDelegationTokenResp}
+import org.apache.spark.SparkContext
 import org.apache.spark.kyuubi.SparkContextHelper
 
-import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.{KyuubiSQLException, Logging}
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY
 import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, ServiceDiscovery}
 import org.apache.kyuubi.service.{Serverable, Service, TBinaryFrontendService}
 import org.apache.kyuubi.service.TFrontendService._
@@ -43,21 +45,7 @@ class SparkTBinaryFrontendService(
     // Server to Spark SQL engine
     val resp = new TRenewDelegationTokenResp()
     try {
-      val newCreds = KyuubiHadoopUtils.decodeCredentials(req.getDelegationToken)
-      val (hiveTokens, otherTokens) =
-        KyuubiHadoopUtils.getTokenMap(newCreds).partition(_._2.getKind == HIVE_DELEGATION_TOKEN)
-
-      val updateCreds = new Credentials()
-      val oldCreds = UserGroupInformation.getCurrentUser.getCredentials
-      addHiveToken(hiveTokens, oldCreds, updateCreds)
-      addOtherTokens(otherTokens, oldCreds, updateCreds)
-      if (updateCreds.numberOfTokens() > 0) {
-        info("Update delegation tokens. " +
-          s"The number of tokens sent by the server is ${newCreds.numberOfTokens()}. " +
-          s"The actual number of updated tokens is ${updateCreds.numberOfTokens()}.")
-        SparkContextHelper.updateDelegationTokens(sc, updateCreds)
-      }
-
+      renewDelegationToken(sc, req.getDelegationToken)
       resp.setStatus(OK_STATUS)
     } catch {
       case e: Exception =>
@@ -75,6 +63,11 @@ class SparkTBinaryFrontendService(
       val respConfiguration = new java.util.HashMap[String, String]()
       respConfiguration.put("kyuubi.engine.id", sc.applicationId)
 
+      if (req.getConfiguration != null) {
+        val credentials = req.getConfiguration.remove(KYUUBI_ENGINE_CREDENTIALS_KEY)
+        Option(credentials).filter(_.nonEmpty).foreach(renewDelegationToken(sc, _))
+      }
+
       val sessionHandle = getSessionHandle(req, resp)
       resp.setSessionHandle(sessionHandle.toTSessionHandle)
       resp.setConfiguration(respConfiguration)
@@ -88,7 +81,38 @@ class SparkTBinaryFrontendService(
     resp
   }
 
+  override lazy val discoveryService: Option[Service] = {
+    if (ServiceDiscovery.supportServiceDiscovery(conf)) {
+      Some(new EngineServiceDiscovery(this))
+    } else {
+      None
+    }
+  }
+}
+
+object SparkTBinaryFrontendService extends Logging {
+
+  val HIVE_DELEGATION_TOKEN = new Text("HIVE_DELEGATION_TOKEN")
+
+  private[spark] def renewDelegationToken(sc: SparkContext, delegationToken: String): Unit = {
+    val newCreds = KyuubiHadoopUtils.decodeCredentials(delegationToken)
+    val (hiveTokens, otherTokens) =
+      KyuubiHadoopUtils.getTokenMap(newCreds).partition(_._2.getKind == HIVE_DELEGATION_TOKEN)
+
+    val updateCreds = new Credentials()
+    val oldCreds = UserGroupInformation.getCurrentUser.getCredentials
+    addHiveToken(sc, hiveTokens, oldCreds, updateCreds)
+    addOtherTokens(otherTokens, oldCreds, updateCreds)
+    if (updateCreds.numberOfTokens() > 0) {
+      info("Update delegation tokens. " +
+        s"The number of tokens sent by the server is ${newCreds.numberOfTokens()}. " +
+        s"The actual number of updated tokens is ${updateCreds.numberOfTokens()}.")
+      SparkContextHelper.updateDelegationTokens(sc, updateCreds)
+    }
+  }
+
   private def addHiveToken(
+      sc: SparkContext,
       newTokens: Map[Text, Token[_ <: TokenIdentifier]],
       oldCreds: Credentials,
       updateCreds: Credentials): Unit = {
@@ -153,17 +177,4 @@ class SparkTBinaryFrontendService(
       }
     }
   }
-
-  override lazy val discoveryService: Option[Service] = {
-    if (ServiceDiscovery.supportServiceDiscovery(conf)) {
-      Some(new EngineServiceDiscovery(this))
-    } else {
-      None
-    }
-  }
-}
-
-object SparkTBinaryFrontendService {
-
-  val HIVE_DELEGATION_TOKEN = new Text("HIVE_DELEGATION_TOKEN")
 }
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
index 190f0b044..e19e69efa 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
@@ -21,6 +21,7 @@ object KyuubiReservedKeys {
   final val KYUUBI_SESSION_USER_KEY = "kyuubi.session.user"
   final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id"
   final val KYUUBI_ENGINE_SUBMIT_TIME_KEY = "kyuubi.engine.submit.time"
+  final val KYUUBI_ENGINE_CREDENTIALS_KEY = "kyuubi.engine.credentials"
   final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_GUID =
     "kyuubi.session.engine.launch.handle.guid"
   final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_SECRET =
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
index c005de5d8..0d923b214 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
@@ -155,6 +155,14 @@ class HadoopCredentialsManager private (name: String) extends AbstractService(na
     super.stop()
   }
 
+  def renewCredentials(appUser: String): String = {
+    if (renewalExecutor.isEmpty) {
+      return ""
+    }
+    val userRef = getOrCreateUserCredentialsRef(appUser, true)
+    userRef.getEncodedCredentials
+  }
+
   /**
    * Send credentials to SQL engine which the specified session is talking to if
    * [[HadoopCredentialsManager]] has a newer credentials.
@@ -166,13 +174,12 @@ class HadoopCredentialsManager private (name: String) extends AbstractService(na
   def sendCredentialsIfNeeded(
       sessionId: String,
       appUser: String,
-      send: String => Unit,
-      waitUntilCredentialsReady: Boolean = false): Unit = {
+      send: String => Unit): Unit = {
     if (renewalExecutor.isEmpty) {
       return
     }
 
-    val userRef = getOrCreateUserCredentialsRef(appUser, waitUntilCredentialsReady)
+    val userRef = getOrCreateUserCredentialsRef(appUser)
     val sessionEpoch = getSessionCredentialsEpoch(sessionId)
 
     if (userRef.getEpoch > sessionEpoch) {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
index 41a6cadb7..8e087df6e 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
@@ -18,7 +18,7 @@
 package org.apache.kyuubi.operation
 
 import org.apache.kyuubi.operation.log.OperationLog
-import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager}
+import org.apache.kyuubi.session.KyuubiSessionImpl
 
 class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: Boolean)
   extends KyuubiOperation(OperationType.UNKNOWN_OPERATION, session) {
@@ -49,7 +49,6 @@ class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: Bool
       setState(OperationState.RUNNING)
       try {
         session.openEngineSession(getOperationLog)
-        renewEngineCredentials()
         setState(OperationState.FINISHED)
       } catch onError()
     }
@@ -60,18 +59,4 @@ class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: Bool
 
     if (!shouldRunAsync) getBackgroundHandle.get()
   }
-
-  private def renewEngineCredentials(): Unit = {
-    val sessionManager = session.sessionManager.asInstanceOf[KyuubiSessionManager]
-    try {
-      sessionManager.credentialsManager.sendCredentialsIfNeeded(
-        session.handle.identifier.toString,
-        session.user,
-        client.sendCredentials,
-        true)
-    } catch {
-      case e: Exception =>
-        error(s"Failed to renew engine credentials when launching engine", e)
-    }
-  }
 }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index cf14f49ec..1a4e4146c 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -26,6 +26,7 @@ import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.client.KyuubiSyncThriftClient
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY
 import org.apache.kyuubi.engine.EngineRef
 import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
 import org.apache.kyuubi.ha.client.DiscoveryClientProvider._
@@ -67,7 +68,9 @@ class KyuubiSessionImpl(
     case (key, value) => sessionConf.set(key, value)
   }
 
-  val engine: EngineRef =
+  private lazy val engineCredentials = renewEngineCredentials()
+
+  lazy val engine: EngineRef =
     new EngineRef(sessionConf, user, handle.identifier.toString, sessionManager.applicationManager)
   private[kyuubi] val launchEngineOp = sessionManager.operationManager
     .newLaunchEngineOperation(this, sessionConf.get(SESSION_ENGINE_LAUNCH_ASYNC))
@@ -98,6 +101,12 @@ class KyuubiSessionImpl(
 
   private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = None): Unit = {
     withDiscoveryClient(sessionConf) { discoveryClient =>
+      var openEngineSessionConf = optimizedConf
+      if (engineCredentials.nonEmpty) {
+        sessionConf.set(KYUUBI_ENGINE_CREDENTIALS_KEY, engineCredentials)
+        openEngineSessionConf =
+          optimizedConf ++ Map(KYUUBI_ENGINE_CREDENTIALS_KEY -> engineCredentials)
+      }
       val (host, port) = engine.getOrCreate(discoveryClient, extraEngineLog)
       val passwd =
         if (sessionManager.getConf.get(ENGINE_SECURITY_ENABLED)) {
@@ -107,7 +116,7 @@ class KyuubiSessionImpl(
         }
       try {
         _client = KyuubiSyncThriftClient.createClient(user, passwd, host, port, sessionConf)
-        _engineSessionHandle = _client.openSession(protocol, user, passwd, optimizedConf)
+        _engineSessionHandle = _client.openSession(protocol, user, passwd, openEngineSessionConf)
       } catch {
         case e: Throwable =>
           error(
@@ -157,6 +166,16 @@ class KyuubiSessionImpl(
     }
   }
 
+  private def renewEngineCredentials(): String = {
+    try {
+      sessionManager.credentialsManager.renewCredentials(user)
+    } catch {
+      case e: Exception =>
+        error(s"Failed to renew engine credentials for $handle", e)
+        ""
+    }
+  }
+
   override def close(): Unit = {
     super.close()
     sessionManager.credentialsManager.removeSessionCredentialsEpoch(handle.identifier.toString)