You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/03/24 03:34:20 UTC
[kyuubi] branch master updated: [KYUUBI #4560] [KSHC] Support Kerberized HMS in cluster mode w/o keytab
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 6f803c001 [KYUUBI #4560] [KSHC] Support Kerberized HMS in cluster mode w/o keytab
6f803c001 is described below
commit 6f803c0015876171990ce366b6ffc274468aad25
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Fri Mar 24 11:34:08 2023 +0800
[KYUUBI #4560] [KSHC] Support Kerberized HMS in cluster mode w/o keytab
### _Why are the changes needed?_
This PR aims to make Kyuubi Spark Hive Connector(KSHC) support kerberized HMS in cluster mode w/o keytab(which is the typical use case in Kyuubi) by implementing a `HadoopDelegationTokenProvider`.
To enable access to an kerberized HMS using KSHC, the minimal configurations are
```
spark.sql.catalog.warm=org.apache.kyuubi.spark.connector.hive.HiveTableCatalog
spark.sql.catalog.warm.hive.metastore.uris=<thrift-uris>
```
then it's able to run federation query across metastores
```
SELECT * FROM spark_catalog.db1.tbl1 JOIN warm.db2.tbl2 ON ...
```
In addition, it allows disabling token renewal for each catalog explicitly
```
spark.sql.catalog.warm.delegation.token.renewal.enabled=false
```
The current implementation has some limitations:
the catalog configuration must be present on the Spark application bootstrap, which means the catalog configurations should be set in `spark-defaults.conf` or append as `--conf` like:
```
spark-[sql|shell|submit] \
--conf spark.sql.catalog.xxx=org.apache.kyuubi.spark.connector.hive.HiveTableCatalog
--conf spark.sql.catalog.xxx.hive.abc=xyz
```
but does not work for dynamic registering through SET statement, e.g. `SET spark.sql.catalog.xxx=`
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [x] Add screenshots for manual tests if appropriate
```
> (select count(*) from hive_2.mammut.test_7) union ( select count(*) from spark_catalog.test.test01 limit 1);
+-----------+
| count(1) |
+-----------+
| 4 |
| 1 |
+-----------+
2 rows selected (8.378 seconds)
```
- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4560 from pan3793/shc-token.
Closes #4560
fe8cd0c6d [Cheng Pan] Centralized metastore token signature fallback logic
851159559 [Cheng Pan] comments
fc3b4d596 [Cheng Pan] hive.metastore.token.signature fallback to hive.metastore.uris
fb7eb033f [Cheng Pan] unused import
858b39024 [Cheng Pan] New catalog property delegation.token.renewal.enabled
28ec5a543 [Cheng Pan] disable hms client retry
52044d474 [Cheng Pan] update comments
33b241831 [Cheng Pan] [KSHC] Support Kerberos by implementing KyuubiHiveConnectorDelegationTokenProvider
Authored-by: Cheng Pan <ch...@apache.org>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
...he.spark.security.HadoopDelegationTokenProvider | 18 ++
.../spark/connector/hive/HiveTableCatalog.scala | 7 +-
...yuubiHiveConnectorDelegationTokenProvider.scala | 205 +++++++++++++++++++++
.../hive/kyuubi/connector/HiveBridgeHelper.scala | 2 +
4 files changed, 229 insertions(+), 3 deletions(-)
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/resources/META-INF/services/org.apache.spark.security.HadoopDelegationTokenProvider b/extensions/spark/kyuubi-spark-connector-hive/src/main/resources/META-INF/services/org.apache.spark.security.HadoopDelegationTokenProvider
new file mode 100644
index 000000000..8d93dd80a
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/resources/META-INF/services/org.apache.spark.security.HadoopDelegationTokenProvider
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
index c4d71dbba..d4e0f5ea2 100644
--- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
@@ -44,6 +44,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{toCatalogDatabase, CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper}
+import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider.metastoreTokenSignature
/**
* A [[TableCatalog]] that wrap HiveExternalCatalog to as V2 CatalogPlugin instance to access Hive.
@@ -73,9 +74,9 @@ class HiveTableCatalog(sparkSession: SparkSession)
private lazy val hadoopConf: Configuration = {
val conf = sparkSession.sessionState.newHadoopConf()
- catalogOptions.asScala.foreach {
- case (k, v) => conf.set(k, v)
- case _ =>
+ catalogOptions.asScala.foreach { case (k, v) => conf.set(k, v) }
+ if (catalogOptions.containsKey("hive.metastore.uris")) {
+ conf.set("hive.metastore.token.signature", metastoreTokenSignature(catalogOptions))
}
conf
}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorDelegationTokenProvider.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorDelegationTokenProvider.scala
new file mode 100644
index 000000000..118f4f6b9
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorDelegationTokenProvider.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive
+
+import java.lang.reflect.UndeclaredThrowableException
+import java.security.PrivilegedExceptionAction
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.metastore.{HiveMetaStoreClient, IMetaStoreClient}
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.{Credentials, SecurityUtil, UserGroupInformation}
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
+import org.apache.hadoop.security.token.Token
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.security.HadoopDelegationTokenProvider
+import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._
+
+import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider.metastoreTokenSignature
+
+class KyuubiHiveConnectorDelegationTokenProvider
+ extends HadoopDelegationTokenProvider with Logging {
+
+ logInfo(s"Hadoop Delegation Token provider for service [$serviceName] is initialized.")
+
+ override def serviceName: String = "kyuubi-hive-connector"
+
+ private val classNotFoundErrorStr = "You are attempting to use the " +
+ s"${getClass.getCanonicalName}, but your Spark distribution is not built with Hive libraries."
+
+ private def hiveConf(
+ sparkConf: SparkConf,
+ hadoopConf: Configuration,
+ hiveCatalogName: String): Option[HiveConf] = {
+ try {
+ val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf])
+ sparkConf.getAllWithPrefix(s"spark.sql.catalog.$hiveCatalogName.")
+ .foreach { case (k, v) => hiveConf.set(k, v) }
+ // The `RetryingHiveMetaStoreClient` may block the subsequent token obtaining,
+ // and `obtainDelegationTokens` is scheduled frequently, it's fine to disable
+ // the Hive retry mechanism.
+ hiveConf.set("hive.metastore.fastpath", "false")
+ Some(hiveConf)
+ } catch {
+ case NonFatal(e) =>
+ logWarning("Fail to create Hive Configuration", e)
+ None
+ case e: NoClassDefFoundError =>
+ logWarning(classNotFoundErrorStr, e)
+ None
+ }
+ }
+
+ private def extractHiveCatalogNames(sparkConf: SparkConf): Set[String] = sparkConf
+ .getAllWithPrefix("spark.sql.catalog.")
+ .filter { case (_, v) => v == classOf[HiveTableCatalog].getName }
+ .flatMap { case (k, _) => k.stripPrefix("spark.sql.catalog.").split("\\.").headOption }
+ .distinct.toSet
+
+ // The current implementation has the following limitations:
+ // the v2 catalog is kind of SQL API, and we can not get SQLConf here, thus only those
+ // configurations present in the Spark application bootstrap will take effect, e.g.
+ // `spark-defaults.conf` or `spark-submit --conf k=v` will take effect, but dynamically
+ // register by SET statement will not.
+ override def delegationTokensRequired(
+ sparkConf: SparkConf,
+ hadoopConf: Configuration): Boolean = {
+ val hiveCatalogNames = extractHiveCatalogNames(sparkConf)
+ logDebug(s"Recognized Kyuubi Hive Connector catalogs: $hiveCatalogNames")
+ hiveCatalogNames.exists { hiveCatalogName =>
+ hiveConf(sparkConf, hadoopConf, hiveCatalogName).exists { remoteHmsConf =>
+ delegationTokensRequired(sparkConf, remoteHmsConf, hiveCatalogName)
+ }
+ }
+ }
+
+ private def delegationTokensRequired(
+ sparkConf: SparkConf,
+ hiveConf: HiveConf,
+ hiveCatalogName: String): Boolean = {
+ val tokenRenewalEnabled = sparkConf.getBoolean(
+ s"spark.sql.catalog.$hiveCatalogName.delegation.token.renewal.enabled",
+ true)
+ val metastoreUris =
+ sparkConf.get(s"spark.sql.catalog.$hiveCatalogName.hive.metastore.uris", "")
+ val tokenAlias = new Text(metastoreUris)
+ val currentToken = UserGroupInformation.getCurrentUser.getCredentials.getToken(tokenAlias)
+ tokenRenewalEnabled && metastoreUris.nonEmpty && currentToken == null &&
+ SecurityUtil.getAuthenticationMethod(hiveConf) != AuthenticationMethod.SIMPLE &&
+ hiveConf.getBoolean("hive.metastore.sasl.enabled", false) &&
+ (SparkHadoopUtil.get.isProxyUser(UserGroupInformation.getCurrentUser) ||
+ (!Utils.isClientMode(sparkConf) && !sparkConf.contains("spark.kerberos.keytab")))
+ }
+
+ override def obtainDelegationTokens(
+ hadoopConf: Configuration,
+ sparkConf: SparkConf,
+ creds: Credentials): Option[Long] = {
+ val hmsClients: mutable.HashSet[IMetaStoreClient] = mutable.HashSet.empty
+ try {
+ val hiveCatalogNames = extractHiveCatalogNames(sparkConf)
+ logDebug(s"Recognized Hive catalogs: $hiveCatalogNames")
+
+ val requireTokenCatalogs = hiveCatalogNames.filter { hiveCatalogName =>
+ hiveConf(sparkConf, hadoopConf, hiveCatalogName).exists { remoteHmsConf =>
+ delegationTokensRequired(sparkConf, remoteHmsConf, hiveCatalogName)
+ }
+ }
+ logDebug(s"Require token Hive catalogs: $requireTokenCatalogs")
+
+ requireTokenCatalogs.foreach { hiveCatalogName =>
+ // one token request failure should not block the subsequent token obtaining
+ Utils.tryLogNonFatalError {
+ hiveConf(sparkConf, hadoopConf, hiveCatalogName).foreach { remoteHmsConf =>
+ val metastoreUrisKey = s"spark.sql.catalog.$hiveCatalogName.hive.metastore.uris"
+ val metastoreUris = sparkConf.get(metastoreUrisKey, "")
+ assert(metastoreUris.nonEmpty)
+
+ val catalogOptions =
+ sparkConf.getAllWithPrefix(s"spark.sql.catalog.$hiveCatalogName.").toMap
+ val tokenSignature = metastoreTokenSignature(catalogOptions.asJava)
+
+ val principalKey = "hive.metastore.kerberos.principal"
+ val principal = remoteHmsConf.getTrimmed(principalKey, "")
+ require(principal.nonEmpty, s"Hive principal $principalKey undefined")
+
+ val currentUser = UserGroupInformation.getCurrentUser
+ logInfo(s"Getting Hive delegation token for ${currentUser.getUserName} against " +
+ s"$principal at $metastoreUris")
+
+ doAsRealUser {
+ val hmsClient = new HiveMetaStoreClient(remoteHmsConf, null, false)
+ hmsClients += hmsClient
+ val tokenStr = hmsClient.getDelegationToken(currentUser.getUserName, principal)
+ val hive2Token = new Token[DelegationTokenIdentifier]()
+ hive2Token.decodeFromUrlString(tokenStr)
+ hive2Token.setService(new Text(tokenSignature))
+ logDebug(s"Get Token from hive metastore: ${hive2Token.toString}")
+ val tokenAlias = new Text(metastoreUris)
+ creds.addToken(tokenAlias, hive2Token)
+ }
+ }
+ }
+ }
+ None
+ } catch {
+ case _: NoClassDefFoundError =>
+ logWarning(classNotFoundErrorStr)
+ None
+ } finally {
+ hmsClients.foreach { hmsClient => Utils.tryLogNonFatalError { hmsClient.close() } }
+ }
+ }
+
+ /**
+ * Run some code as the real logged in user (which may differ from the current user, for
+ * example, when using proxying).
+ */
+ private def doAsRealUser[T](fn: => T): T = {
+ val currentUser = UserGroupInformation.getCurrentUser
+ val realUser = Option(currentUser.getRealUser).getOrElse(currentUser)
+
+ // For some reason the Scala-generated anonymous class ends up causing an
+ // UndeclaredThrowableException, even if you annotate the method with @throws.
+ try {
+ realUser.doAs(new PrivilegedExceptionAction[T]() {
+ override def run(): T = fn
+ })
+ } catch {
+ case e: UndeclaredThrowableException => throw Option(e.getCause).getOrElse(e)
+ }
+ }
+}
+
+object KyuubiHiveConnectorDelegationTokenProvider {
+
+ // fallback to `hive.metastore.uris` if `hive.metastore.token.signature` is absent
+ def metastoreTokenSignature(catalogOptions: util.Map[String, String]): String = {
+ assert(catalogOptions.containsKey("hive.metastore.uris"))
+ val metastoreUris = catalogOptions.get("hive.metastore.uris")
+ catalogOptions.getOrDefault("hive.metastore.token.signature", metastoreUris)
+ }
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
index ce1e445fc..349edd327 100644
--- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
@@ -45,6 +45,8 @@ object HiveBridgeHelper {
val HiveShim = org.apache.spark.sql.hive.HiveShim
val InputFileBlockHolder = org.apache.spark.rdd.InputFileBlockHolder
val HadoopTableReader = org.apache.spark.sql.hive.HadoopTableReader
+ val SparkHadoopUtil = org.apache.spark.deploy.SparkHadoopUtil
+ val Utils = org.apache.spark.util.Utils
def postExternalCatalogEvent(sc: SparkContext, event: ExternalCatalogEvent): Unit = {
sc.listenerBus.post(event)