You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/06/15 13:47:44 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2708] Open engine session and renew engine credentials in the one
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 37229d419 [KYUUBI #2708] Open engine session and renew engine credentials in the one
37229d419 is described below
commit 37229d419ebbd4beaf459208813f3bb801c16a17
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Wed Jun 15 21:47:36 2022 +0800
[KYUUBI #2708] Open engine session and renew engine credentials in the one
### _Why are the changes needed?_
To close #2708
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2856 from turboFei/open_in_one.
Closes #2708
2fd77951 [Fei Wang] comments
35ae00d4 [Fei Wang] comment
e02c13e3 [Fei Wang] use kyuubiconf
70d96775 [Fei Wang] refactor
cfb1a36b [Fei Wang] refresh token for startup
f55912f5 [Fei Wang] comments
130a3bb3 [Fei Wang] remove credentials to prevent escalate to session conf
d14ebf78 [Fei Wang] fix ut
71ec3c9e [Fei Wang] renew token when opening session
Authored-by: Fei Wang <fw...@ebay.com>
Signed-off-by: Fei Wang <fw...@ebay.com>
---
.../kyuubi/engine/spark/SparkSQLEngine.scala | 11 +++-
.../engine/spark/SparkTBinaryFrontendService.scala | 69 +++++++++++++---------
.../apache/kyuubi/config/KyuubiReservedKeys.scala | 1 +
.../credentials/HadoopCredentialsManager.scala | 13 +++-
.../org/apache/kyuubi/operation/LaunchEngine.scala | 17 +-----
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 23 +++++++-
6 files changed, 83 insertions(+), 51 deletions(-)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index 9c1936157..51362f302 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.{KyuubiException, Logging, Utils}
import org.apache.kyuubi.Utils._
-import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_SUBMIT_TIME_KEY
import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch, currentEngine}
@@ -179,7 +179,16 @@ object SparkSQLEngine extends Logging {
}
def createSpark(): SparkSession = {
+ val engineCredentials = kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
+ kyuubiConf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
+ _sparkConf.remove(s"spark.${KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY}")
+
val session = SparkSession.builder.config(_sparkConf).getOrCreate
+
+ engineCredentials.filter(_.nonEmpty).foreach { credentials =>
+ SparkTBinaryFrontendService.renewDelegationToken(session.sparkContext, credentials)
+ }
+
KyuubiSparkUtil.initializeSparkSession(
session,
kyuubiConf.get(ENGINE_INITIALIZE_SQL) ++ kyuubiConf.get(ENGINE_SESSION_INITIALIZE_SQL))
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala
index bbc6bfaa0..6b2250bd4 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala
@@ -21,9 +21,11 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hive.service.rpc.thrift.{TOpenSessionReq, TOpenSessionResp, TRenewDelegationTokenReq, TRenewDelegationTokenResp}
+import org.apache.spark.SparkContext
import org.apache.spark.kyuubi.SparkContextHelper
-import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.{KyuubiSQLException, Logging}
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY
import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, ServiceDiscovery}
import org.apache.kyuubi.service.{Serverable, Service, TBinaryFrontendService}
import org.apache.kyuubi.service.TFrontendService._
@@ -43,21 +45,7 @@ class SparkTBinaryFrontendService(
// Server to Spark SQL engine
val resp = new TRenewDelegationTokenResp()
try {
- val newCreds = KyuubiHadoopUtils.decodeCredentials(req.getDelegationToken)
- val (hiveTokens, otherTokens) =
- KyuubiHadoopUtils.getTokenMap(newCreds).partition(_._2.getKind == HIVE_DELEGATION_TOKEN)
-
- val updateCreds = new Credentials()
- val oldCreds = UserGroupInformation.getCurrentUser.getCredentials
- addHiveToken(hiveTokens, oldCreds, updateCreds)
- addOtherTokens(otherTokens, oldCreds, updateCreds)
- if (updateCreds.numberOfTokens() > 0) {
- info("Update delegation tokens. " +
- s"The number of tokens sent by the server is ${newCreds.numberOfTokens()}. " +
- s"The actual number of updated tokens is ${updateCreds.numberOfTokens()}.")
- SparkContextHelper.updateDelegationTokens(sc, updateCreds)
- }
-
+ renewDelegationToken(sc, req.getDelegationToken)
resp.setStatus(OK_STATUS)
} catch {
case e: Exception =>
@@ -75,6 +63,11 @@ class SparkTBinaryFrontendService(
val respConfiguration = new java.util.HashMap[String, String]()
respConfiguration.put("kyuubi.engine.id", sc.applicationId)
+ if (req.getConfiguration != null) {
+ val credentials = req.getConfiguration.remove(KYUUBI_ENGINE_CREDENTIALS_KEY)
+ Option(credentials).filter(_.nonEmpty).foreach(renewDelegationToken(sc, _))
+ }
+
val sessionHandle = getSessionHandle(req, resp)
resp.setSessionHandle(sessionHandle.toTSessionHandle)
resp.setConfiguration(respConfiguration)
@@ -88,7 +81,38 @@ class SparkTBinaryFrontendService(
resp
}
+ override lazy val discoveryService: Option[Service] = {
+ if (ServiceDiscovery.supportServiceDiscovery(conf)) {
+ Some(new EngineServiceDiscovery(this))
+ } else {
+ None
+ }
+ }
+}
+
+object SparkTBinaryFrontendService extends Logging {
+
+ val HIVE_DELEGATION_TOKEN = new Text("HIVE_DELEGATION_TOKEN")
+
+ private[spark] def renewDelegationToken(sc: SparkContext, delegationToken: String): Unit = {
+ val newCreds = KyuubiHadoopUtils.decodeCredentials(delegationToken)
+ val (hiveTokens, otherTokens) =
+ KyuubiHadoopUtils.getTokenMap(newCreds).partition(_._2.getKind == HIVE_DELEGATION_TOKEN)
+
+ val updateCreds = new Credentials()
+ val oldCreds = UserGroupInformation.getCurrentUser.getCredentials
+ addHiveToken(sc, hiveTokens, oldCreds, updateCreds)
+ addOtherTokens(otherTokens, oldCreds, updateCreds)
+ if (updateCreds.numberOfTokens() > 0) {
+ info("Update delegation tokens. " +
+ s"The number of tokens sent by the server is ${newCreds.numberOfTokens()}. " +
+ s"The actual number of updated tokens is ${updateCreds.numberOfTokens()}.")
+ SparkContextHelper.updateDelegationTokens(sc, updateCreds)
+ }
+ }
+
private def addHiveToken(
+ sc: SparkContext,
newTokens: Map[Text, Token[_ <: TokenIdentifier]],
oldCreds: Credentials,
updateCreds: Credentials): Unit = {
@@ -153,17 +177,4 @@ class SparkTBinaryFrontendService(
}
}
}
-
- override lazy val discoveryService: Option[Service] = {
- if (ServiceDiscovery.supportServiceDiscovery(conf)) {
- Some(new EngineServiceDiscovery(this))
- } else {
- None
- }
- }
-}
-
-object SparkTBinaryFrontendService {
-
- val HIVE_DELEGATION_TOKEN = new Text("HIVE_DELEGATION_TOKEN")
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
index 190f0b044..e19e69efa 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
@@ -21,6 +21,7 @@ object KyuubiReservedKeys {
final val KYUUBI_SESSION_USER_KEY = "kyuubi.session.user"
final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id"
final val KYUUBI_ENGINE_SUBMIT_TIME_KEY = "kyuubi.engine.submit.time"
+ final val KYUUBI_ENGINE_CREDENTIALS_KEY = "kyuubi.engine.credentials"
final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_GUID =
"kyuubi.session.engine.launch.handle.guid"
final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_SECRET =
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
index c005de5d8..0d923b214 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
@@ -155,6 +155,14 @@ class HadoopCredentialsManager private (name: String) extends AbstractService(na
super.stop()
}
+ def renewCredentials(appUser: String): String = {
+ if (renewalExecutor.isEmpty) {
+ return ""
+ }
+ val userRef = getOrCreateUserCredentialsRef(appUser, true)
+ userRef.getEncodedCredentials
+ }
+
/**
* Send credentials to SQL engine which the specified session is talking to if
* [[HadoopCredentialsManager]] has a newer credentials.
@@ -166,13 +174,12 @@ class HadoopCredentialsManager private (name: String) extends AbstractService(na
def sendCredentialsIfNeeded(
sessionId: String,
appUser: String,
- send: String => Unit,
- waitUntilCredentialsReady: Boolean = false): Unit = {
+ send: String => Unit): Unit = {
if (renewalExecutor.isEmpty) {
return
}
- val userRef = getOrCreateUserCredentialsRef(appUser, waitUntilCredentialsReady)
+ val userRef = getOrCreateUserCredentialsRef(appUser)
val sessionEpoch = getSessionCredentialsEpoch(sessionId)
if (userRef.getEpoch > sessionEpoch) {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
index 41a6cadb7..8e087df6e 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
@@ -18,7 +18,7 @@
package org.apache.kyuubi.operation
import org.apache.kyuubi.operation.log.OperationLog
-import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager}
+import org.apache.kyuubi.session.KyuubiSessionImpl
class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: Boolean)
extends KyuubiOperation(OperationType.UNKNOWN_OPERATION, session) {
@@ -49,7 +49,6 @@ class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: Bool
setState(OperationState.RUNNING)
try {
session.openEngineSession(getOperationLog)
- renewEngineCredentials()
setState(OperationState.FINISHED)
} catch onError()
}
@@ -60,18 +59,4 @@ class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: Bool
if (!shouldRunAsync) getBackgroundHandle.get()
}
-
- private def renewEngineCredentials(): Unit = {
- val sessionManager = session.sessionManager.asInstanceOf[KyuubiSessionManager]
- try {
- sessionManager.credentialsManager.sendCredentialsIfNeeded(
- session.handle.identifier.toString,
- session.user,
- client.sendCredentials,
- true)
- } catch {
- case e: Exception =>
- error(s"Failed to renew engine credentials when launching engine", e)
- }
- }
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index cf14f49ec..1a4e4146c 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -26,6 +26,7 @@ import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.client.KyuubiSyncThriftClient
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY
import org.apache.kyuubi.engine.EngineRef
import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
import org.apache.kyuubi.ha.client.DiscoveryClientProvider._
@@ -67,7 +68,9 @@ class KyuubiSessionImpl(
case (key, value) => sessionConf.set(key, value)
}
- val engine: EngineRef =
+ private lazy val engineCredentials = renewEngineCredentials()
+
+ lazy val engine: EngineRef =
new EngineRef(sessionConf, user, handle.identifier.toString, sessionManager.applicationManager)
private[kyuubi] val launchEngineOp = sessionManager.operationManager
.newLaunchEngineOperation(this, sessionConf.get(SESSION_ENGINE_LAUNCH_ASYNC))
@@ -98,6 +101,12 @@ class KyuubiSessionImpl(
private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = None): Unit = {
withDiscoveryClient(sessionConf) { discoveryClient =>
+ var openEngineSessionConf = optimizedConf
+ if (engineCredentials.nonEmpty) {
+ sessionConf.set(KYUUBI_ENGINE_CREDENTIALS_KEY, engineCredentials)
+ openEngineSessionConf =
+ optimizedConf ++ Map(KYUUBI_ENGINE_CREDENTIALS_KEY -> engineCredentials)
+ }
val (host, port) = engine.getOrCreate(discoveryClient, extraEngineLog)
val passwd =
if (sessionManager.getConf.get(ENGINE_SECURITY_ENABLED)) {
@@ -107,7 +116,7 @@ class KyuubiSessionImpl(
}
try {
_client = KyuubiSyncThriftClient.createClient(user, passwd, host, port, sessionConf)
- _engineSessionHandle = _client.openSession(protocol, user, passwd, optimizedConf)
+ _engineSessionHandle = _client.openSession(protocol, user, passwd, openEngineSessionConf)
} catch {
case e: Throwable =>
error(
@@ -157,6 +166,16 @@ class KyuubiSessionImpl(
}
}
+ private def renewEngineCredentials(): String = {
+ try {
+ sessionManager.credentialsManager.renewCredentials(user)
+ } catch {
+ case e: Exception =>
+ error(s"Failed to renew engine credentials for $handle", e)
+ ""
+ }
+ }
+
override def close(): Unit = {
super.close()
sessionManager.credentialsManager.removeSessionCredentialsEpoch(handle.identifier.toString)