You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/09/23 04:32:53 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #3527] [SPARK] Kyuubi should set env SPARK_USER_NAME for K8s deployment

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 37385124f [KYUUBI #3527] [SPARK] Kyuubi should set env SPARK_USER_NAME for K8s deployment
37385124f is described below

commit 37385124f00f9510521dbacb3df7d9993fc7f5ba
Author: zwangsheng <22...@qq.com>
AuthorDate: Fri Sep 23 12:32:41 2022 +0800

    [KYUUBI #3527] [SPARK] Kyuubi should set env SPARK_USER_NAME for K8s deployment
    
    ### _Why are the changes needed?_
    
    On case spark on kubernetes, spark using env `SPARK_USER_NAME` as user name.
    So kyuubi should build spark engine with this env when proxy user or using keytab.
    This conf only affect on kubernetes case.
    
    Ref: https://github.com/apache/spark/pull/23017
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3527 from zwangsheng/feature/add_spark_user_name.
    
    Closes #3527
    
    9596372c [zwangsheng] only k8s case
    ddd713fa [zwangsheng] fix
    48b9b225 [zwangsheng] add
    
    Authored-by: zwangsheng <22...@qq.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../kyuubi/engine/spark/SparkProcessBuilder.scala  | 37 ++++++++++++++++------
 .../engine/spark/SparkProcessBuilderSuite.scala    | 28 ++++++++++++++++
 2 files changed, 55 insertions(+), 10 deletions(-)

diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 85ee83491..3b5703bdd 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -91,10 +91,26 @@ class SparkProcessBuilder(
       buffer += s"${convertConfigKey(k)}=$v"
     }
 
-    // iff the keytab is specified, PROXY_USER is not supported
-    if (!useKeytab()) {
-      buffer += PROXY_USER
-      buffer += proxyUser
+    // For spark on kubernetes, spark pod using env SPARK_USER_NAME as current user
+    def setSparkUserName(userName: String): Unit = {
+      clusterManager().foreach(cm => {
+        if (cm.startsWith("k8s://")) {
+          buffer += CONF
+          buffer += s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$userName"
+          buffer += CONF
+          buffer += s"spark.kubernetes.executorEnv.SPARK_USER_NAME=$userName"
+        }
+      })
+    }
+
+    // if the keytab is specified, PROXY_USER is not supported
+    tryKeytab() match {
+      case None =>
+        setSparkUserName(proxyUser)
+        buffer += PROXY_USER
+        buffer += proxyUser
+      case Some(name) =>
+        setSparkUserName(name)
     }
 
     mainResource.foreach { r => buffer += r }
@@ -104,26 +120,27 @@ class SparkProcessBuilder(
 
   override protected def module: String = "kyuubi-spark-sql-engine"
 
-  private def useKeytab(): Boolean = {
+  private def tryKeytab(): Option[String] = {
     val principal = conf.getOption(PRINCIPAL)
     val keytab = conf.getOption(KEYTAB)
     if (principal.isEmpty || keytab.isEmpty) {
-      false
+      None
     } else {
       try {
         val ugi = UserGroupInformation
           .loginUserFromKeytabAndReturnUGI(principal.get, keytab.get)
-        val keytabEnabled = ugi.getShortUserName == proxyUser
-        if (!keytabEnabled) {
+        if (ugi.getShortUserName != proxyUser) {
           warn(s"The session proxy user: $proxyUser is not same with " +
             s"spark principal: ${ugi.getShortUserName}, so we can't support use keytab. " +
             s"Fallback to use proxy user.")
+          None
+        } else {
+          Some(ugi.getShortUserName)
         }
-        keytabEnabled
       } catch {
         case e: IOException =>
           error(s"Failed to login for ${principal.get}", e)
-          false
+          None
       }
     }
   }
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
index b6a6b7c2e..550940fc5 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
@@ -270,6 +270,34 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
     assert(!pb.toString.contains(engineRefId2))
     assert(pb.toString.contains(engineRefId))
   }
+
+  test("SparkProcessBuilder build spark engine with SPARK_USER_NAME") {
+    val proxyName = "kyuubi"
+    val conf1 = KyuubiConf(false).set("spark.master", "k8s://test:12345")
+    val b1 = new SparkProcessBuilder(proxyName, conf1)
+    val c1 = b1.toString.split(' ')
+    assert(c1.contains(s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$proxyName"))
+    assert(c1.contains(s"spark.kubernetes.executorEnv.SPARK_USER_NAME=$proxyName"))
+
+    tryWithSecurityEnabled {
+      val conf2 = conf.set("spark.master", "k8s://test:12345")
+        .set("spark.kerberos.principal", testPrincipal)
+        .set("spark.kerberos.keytab", testKeytab)
+      val name = ServiceUtils.getShortName(testPrincipal)
+      val b2 = new SparkProcessBuilder(name, conf2)
+      val c2 = b2.toString.split(' ')
+      assert(c2.contains(s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$name"))
+      assert(c2.contains(s"spark.kubernetes.executorEnv.SPARK_USER_NAME=$name"))
+      assert(!c2.contains(s"--proxy-user $name"))
+    }
+
+    // Test no-kubernetes case
+    val conf3 = KyuubiConf(false)
+    val b3 = new SparkProcessBuilder(proxyName, conf3)
+    val c3 = b3.toString.split(' ')
+    assert(!c3.contains(s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$proxyName"))
+    assert(!c3.contains(s"spark.kubernetes.executorEnv.SPARK_USER_NAME=$proxyName"))
+  }
 }
 
 class FakeSparkProcessBuilder(config: KyuubiConf)