You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/03/24 03:34:20 UTC

[kyuubi] branch master updated: [KYUUBI #4560] [KSHC] Support Kerberized HMS in cluster mode w/o keytab

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f803c001 [KYUUBI #4560] [KSHC] Support Kerberized HMS in cluster mode w/o keytab
6f803c001 is described below

commit 6f803c0015876171990ce366b6ffc274468aad25
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Fri Mar 24 11:34:08 2023 +0800

    [KYUUBI #4560] [KSHC] Support Kerberized HMS in cluster mode w/o keytab
    
    ### _Why are the changes needed?_
    
    This PR aims to make Kyuubi Spark Hive Connector(KSHC) support kerberized HMS in cluster mode w/o keytab(which is the typical use case in Kyuubi) by implementing a `HadoopDelegationTokenProvider`.
    
    To enable access to an kerberized HMS using KSHC, the minimal configurations are
    ```
    spark.sql.catalog.warm=org.apache.kyuubi.spark.connector.hive.HiveTableCatalog
    spark.sql.catalog.warm.hive.metastore.uris=<thrift-uris>
    ```
    then it's able to run federation query across metastores
    ```
    SELECT * FROM spark_catalog.db1.tbl1 JOIN warm.db2.tbl2 ON ...
    ```
    
    In addition, it allows disabling token renewal for each catalog explicitly
    ```
    spark.sql.catalog.warm.delegation.token.renewal.enabled=false
    ```
    
    The current implementation has some limitations:
    
    the catalog configuration must be present on the Spark application bootstrap, which means the catalog configurations should be set in `spark-defaults.conf` or append as `--conf` like:
    ```
    spark-[sql|shell|submit] \
      --conf spark.sql.catalog.xxx=org.apache.kyuubi.spark.connector.hive.HiveTableCatalog
      --conf spark.sql.catalog.xxx.hive.abc=xyz
    ```
    
    but does not work for dynamic registering through SET statement, e.g. `SET spark.sql.catalog.xxx=`
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [x] Add screenshots for manual tests if appropriate
    
    ```
    > (select count(*) from hive_2.mammut.test_7) union ( select count(*) from spark_catalog.test.test01 limit 1);
    +-----------+
    | count(1)  |
    +-----------+
    | 4         |
    | 1         |
    +-----------+
    2 rows selected (8.378 seconds)
    ```
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4560 from pan3793/shc-token.
    
    Closes #4560
    
    fe8cd0c6d [Cheng Pan] Centralized metastore token signature fallback logic
    851159559 [Cheng Pan] comments
    fc3b4d596 [Cheng Pan] hive.metastore.token.signature fallback to hive.metastore.uris
    fb7eb033f [Cheng Pan] unused import
    858b39024 [Cheng Pan] New catalog property delegation.token.renewal.enabled
    28ec5a543 [Cheng Pan] disable hms client retry
    52044d474 [Cheng Pan] update comments
    33b241831 [Cheng Pan] [KSHC] Support Kerberos by implementing KyuubiHiveConnectorDelegationTokenProvider
    
    Authored-by: Cheng Pan <ch...@apache.org>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 ...he.spark.security.HadoopDelegationTokenProvider |  18 ++
 .../spark/connector/hive/HiveTableCatalog.scala    |   7 +-
 ...yuubiHiveConnectorDelegationTokenProvider.scala | 205 +++++++++++++++++++++
 .../hive/kyuubi/connector/HiveBridgeHelper.scala   |   2 +
 4 files changed, 229 insertions(+), 3 deletions(-)

diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/resources/META-INF/services/org.apache.spark.security.HadoopDelegationTokenProvider b/extensions/spark/kyuubi-spark-connector-hive/src/main/resources/META-INF/services/org.apache.spark.security.HadoopDelegationTokenProvider
new file mode 100644
index 000000000..8d93dd80a
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/resources/META-INF/services/org.apache.spark.security.HadoopDelegationTokenProvider
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
index c4d71dbba..d4e0f5ea2 100644
--- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
@@ -44,6 +44,7 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{toCatalogDatabase, CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper}
+import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider.metastoreTokenSignature
 
 /**
  * A [[TableCatalog]] that wrap HiveExternalCatalog to as V2 CatalogPlugin instance to access Hive.
@@ -73,9 +74,9 @@ class HiveTableCatalog(sparkSession: SparkSession)
 
   private lazy val hadoopConf: Configuration = {
     val conf = sparkSession.sessionState.newHadoopConf()
-    catalogOptions.asScala.foreach {
-      case (k, v) => conf.set(k, v)
-      case _ =>
+    catalogOptions.asScala.foreach { case (k, v) => conf.set(k, v) }
+    if (catalogOptions.containsKey("hive.metastore.uris")) {
+      conf.set("hive.metastore.token.signature", metastoreTokenSignature(catalogOptions))
     }
     conf
   }
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorDelegationTokenProvider.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorDelegationTokenProvider.scala
new file mode 100644
index 000000000..118f4f6b9
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorDelegationTokenProvider.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive
+
+import java.lang.reflect.UndeclaredThrowableException
+import java.security.PrivilegedExceptionAction
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.metastore.{HiveMetaStoreClient, IMetaStoreClient}
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.{Credentials, SecurityUtil, UserGroupInformation}
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
+import org.apache.hadoop.security.token.Token
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.security.HadoopDelegationTokenProvider
+import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._
+
+import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider.metastoreTokenSignature
+
+class KyuubiHiveConnectorDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  logInfo(s"Hadoop Delegation Token provider for service [$serviceName] is initialized.")
+
+  override def serviceName: String = "kyuubi-hive-connector"
+
+  private val classNotFoundErrorStr = "You are attempting to use the " +
+    s"${getClass.getCanonicalName}, but your Spark distribution is not built with Hive libraries."
+
+  private def hiveConf(
+      sparkConf: SparkConf,
+      hadoopConf: Configuration,
+      hiveCatalogName: String): Option[HiveConf] = {
+    try {
+      val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf])
+      sparkConf.getAllWithPrefix(s"spark.sql.catalog.$hiveCatalogName.")
+        .foreach { case (k, v) => hiveConf.set(k, v) }
+      // The `RetryingHiveMetaStoreClient` may block the subsequent token obtaining,
+      // and `obtainDelegationTokens` is scheduled frequently, it's fine to disable
+      // the Hive retry mechanism.
+      hiveConf.set("hive.metastore.fastpath", "false")
+      Some(hiveConf)
+    } catch {
+      case NonFatal(e) =>
+        logWarning("Fail to create Hive Configuration", e)
+        None
+      case e: NoClassDefFoundError =>
+        logWarning(classNotFoundErrorStr, e)
+        None
+    }
+  }
+
+  private def extractHiveCatalogNames(sparkConf: SparkConf): Set[String] = sparkConf
+    .getAllWithPrefix("spark.sql.catalog.")
+    .filter { case (_, v) => v == classOf[HiveTableCatalog].getName }
+    .flatMap { case (k, _) => k.stripPrefix("spark.sql.catalog.").split("\\.").headOption }
+    .distinct.toSet
+
+  // The current implementation has the following limitations:
+  // the v2 catalog is kind of SQL API, and we can not get SQLConf here, thus only those
+  // configurations present in the Spark application bootstrap will take effect, e.g.
+  // `spark-defaults.conf` or `spark-submit --conf k=v` will take effect, but dynamically
+  // register by SET statement will not.
+  override def delegationTokensRequired(
+      sparkConf: SparkConf,
+      hadoopConf: Configuration): Boolean = {
+    val hiveCatalogNames = extractHiveCatalogNames(sparkConf)
+    logDebug(s"Recognized Kyuubi Hive Connector catalogs: $hiveCatalogNames")
+    hiveCatalogNames.exists { hiveCatalogName =>
+      hiveConf(sparkConf, hadoopConf, hiveCatalogName).exists { remoteHmsConf =>
+        delegationTokensRequired(sparkConf, remoteHmsConf, hiveCatalogName)
+      }
+    }
+  }
+
+  private def delegationTokensRequired(
+      sparkConf: SparkConf,
+      hiveConf: HiveConf,
+      hiveCatalogName: String): Boolean = {
+    val tokenRenewalEnabled = sparkConf.getBoolean(
+      s"spark.sql.catalog.$hiveCatalogName.delegation.token.renewal.enabled",
+      true)
+    val metastoreUris =
+      sparkConf.get(s"spark.sql.catalog.$hiveCatalogName.hive.metastore.uris", "")
+    val tokenAlias = new Text(metastoreUris)
+    val currentToken = UserGroupInformation.getCurrentUser.getCredentials.getToken(tokenAlias)
+    tokenRenewalEnabled && metastoreUris.nonEmpty && currentToken == null &&
+    SecurityUtil.getAuthenticationMethod(hiveConf) != AuthenticationMethod.SIMPLE &&
+    hiveConf.getBoolean("hive.metastore.sasl.enabled", false) &&
+    (SparkHadoopUtil.get.isProxyUser(UserGroupInformation.getCurrentUser) ||
+      (!Utils.isClientMode(sparkConf) && !sparkConf.contains("spark.kerberos.keytab")))
+  }
+
+  override def obtainDelegationTokens(
+      hadoopConf: Configuration,
+      sparkConf: SparkConf,
+      creds: Credentials): Option[Long] = {
+    val hmsClients: mutable.HashSet[IMetaStoreClient] = mutable.HashSet.empty
+    try {
+      val hiveCatalogNames = extractHiveCatalogNames(sparkConf)
+      logDebug(s"Recognized Hive catalogs: $hiveCatalogNames")
+
+      val requireTokenCatalogs = hiveCatalogNames.filter { hiveCatalogName =>
+        hiveConf(sparkConf, hadoopConf, hiveCatalogName).exists { remoteHmsConf =>
+          delegationTokensRequired(sparkConf, remoteHmsConf, hiveCatalogName)
+        }
+      }
+      logDebug(s"Require token Hive catalogs: $requireTokenCatalogs")
+
+      requireTokenCatalogs.foreach { hiveCatalogName =>
+        // one token request failure should not block the subsequent token obtaining
+        Utils.tryLogNonFatalError {
+          hiveConf(sparkConf, hadoopConf, hiveCatalogName).foreach { remoteHmsConf =>
+            val metastoreUrisKey = s"spark.sql.catalog.$hiveCatalogName.hive.metastore.uris"
+            val metastoreUris = sparkConf.get(metastoreUrisKey, "")
+            assert(metastoreUris.nonEmpty)
+
+            val catalogOptions =
+              sparkConf.getAllWithPrefix(s"spark.sql.catalog.$hiveCatalogName.").toMap
+            val tokenSignature = metastoreTokenSignature(catalogOptions.asJava)
+
+            val principalKey = "hive.metastore.kerberos.principal"
+            val principal = remoteHmsConf.getTrimmed(principalKey, "")
+            require(principal.nonEmpty, s"Hive principal $principalKey undefined")
+
+            val currentUser = UserGroupInformation.getCurrentUser
+            logInfo(s"Getting Hive delegation token for ${currentUser.getUserName} against " +
+              s"$principal at $metastoreUris")
+
+            doAsRealUser {
+              val hmsClient = new HiveMetaStoreClient(remoteHmsConf, null, false)
+              hmsClients += hmsClient
+              val tokenStr = hmsClient.getDelegationToken(currentUser.getUserName, principal)
+              val hive2Token = new Token[DelegationTokenIdentifier]()
+              hive2Token.decodeFromUrlString(tokenStr)
+              hive2Token.setService(new Text(tokenSignature))
+              logDebug(s"Get Token from hive metastore: ${hive2Token.toString}")
+              val tokenAlias = new Text(metastoreUris)
+              creds.addToken(tokenAlias, hive2Token)
+            }
+          }
+        }
+      }
+      None
+    } catch {
+      case _: NoClassDefFoundError =>
+        logWarning(classNotFoundErrorStr)
+        None
+    } finally {
+      hmsClients.foreach { hmsClient => Utils.tryLogNonFatalError { hmsClient.close() } }
+    }
+  }
+
+  /**
+   * Run some code as the real logged in user (which may differ from the current user, for
+   * example, when using proxying).
+   */
+  private def doAsRealUser[T](fn: => T): T = {
+    val currentUser = UserGroupInformation.getCurrentUser
+    val realUser = Option(currentUser.getRealUser).getOrElse(currentUser)
+
+    // For some reason the Scala-generated anonymous class ends up causing an
+    // UndeclaredThrowableException, even if you annotate the method with @throws.
+    try {
+      realUser.doAs(new PrivilegedExceptionAction[T]() {
+        override def run(): T = fn
+      })
+    } catch {
+      case e: UndeclaredThrowableException => throw Option(e.getCause).getOrElse(e)
+    }
+  }
+}
+
+object KyuubiHiveConnectorDelegationTokenProvider {
+
+  // fallback to `hive.metastore.uris` if `hive.metastore.token.signature` is absent
+  def metastoreTokenSignature(catalogOptions: util.Map[String, String]): String = {
+    assert(catalogOptions.containsKey("hive.metastore.uris"))
+    val metastoreUris = catalogOptions.get("hive.metastore.uris")
+    catalogOptions.getOrDefault("hive.metastore.token.signature", metastoreUris)
+  }
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
index ce1e445fc..349edd327 100644
--- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
@@ -45,6 +45,8 @@ object HiveBridgeHelper {
   val HiveShim = org.apache.spark.sql.hive.HiveShim
   val InputFileBlockHolder = org.apache.spark.rdd.InputFileBlockHolder
   val HadoopTableReader = org.apache.spark.sql.hive.HadoopTableReader
+  val SparkHadoopUtil = org.apache.spark.deploy.SparkHadoopUtil
+  val Utils = org.apache.spark.util.Utils
 
   def postExternalCatalogEvent(sc: SparkContext, event: ExternalCatalogEvent): Unit = {
     sc.listenerBus.post(event)