You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/11/07 10:03:36 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #3766] Support real user for KyuubiSession

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 20fca4cfa [KYUUBI #3766] Support real user for KyuubiSession
20fca4cfa is described below

commit 20fca4cfa48a8d8b8c84d9181c490dc37de460bf
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Mon Nov 7 18:03:25 2022 +0800

    [KYUUBI #3766] Support real user for KyuubiSession
    
    ### _Why are the changes needed?_
    
    Support real user for kyuubi session
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3766 from turboFei/real_user.
    
    Closes #3766
    
    49c996ee [Fei Wang] refactor
    894b4720 [Fei Wang] owner -> user
    477119bf [Fei Wang] address
    c36511ae [Fei Wang] add ut
    8efc310c [Fei Wang] save
    8ca2d589 [Fei Wang] add ut
    89264d90 [Fei Wang] get real user
    a431b0ad [Fei Wang] real user
    
    Authored-by: Fei Wang <fw...@ebay.com>
    Signed-off-by: fwang12 <fw...@ebay.com>
---
 .../apache/kyuubi/config/KyuubiReservedKeys.scala  |  1 +
 .../apache/kyuubi/service/TFrontendService.scala   | 30 ++++++++++++++--------
 .../org/apache/kyuubi/KerberizedTestHelper.scala   |  3 ++-
 .../kyuubi/server/KyuubiRestFrontendService.scala  | 14 ++++++----
 .../kyuubi/server/KyuubiTHttpFrontendService.scala | 12 ++++-----
 .../kyuubi/server/api/v1/AdminResource.scala       |  6 ++---
 .../kyuubi/server/api/v1/BatchesResource.scala     | 13 +++++-----
 .../kyuubi/server/api/v1/SessionsResource.scala    |  7 ++---
 .../kyuubi/server/mysql/MySQLCommandHandler.scala  |  6 +++--
 .../kyuubi/session/KyuubiBatchSessionImpl.scala    |  3 +--
 .../org/apache/kyuubi/session/KyuubiSession.scala  |  4 ++-
 .../org/apache/kyuubi/RestClientTestHelper.scala   |  6 ++++-
 .../operation/KyuubiOperationPerGroupSuite.scala   | 14 +++++++++-
 .../operation/KyuubiRestAuthenticationSuite.scala  | 18 ++++++++++++-
 .../server/rest/client/BatchRestApiSuite.scala     | 18 +++++++++++--
 15 files changed, 110 insertions(+), 45 deletions(-)

diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
index 4e214cfa1..b0d869cff 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
@@ -20,6 +20,7 @@ package org.apache.kyuubi.config
 object KyuubiReservedKeys {
   final val KYUUBI_CLIENT_IP_KEY = "kyuubi.client.ipAddress"
   final val KYUUBI_SESSION_USER_KEY = "kyuubi.session.user"
+  final val KYUUBI_SESSION_REAL_USER_KEY = "kyuubi.session.real.user"
   final val KYUUBI_SESSION_CONNECTION_URL_KEY = "kyuubi.session.connection.url"
   final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id"
   final val KYUUBI_ENGINE_ID = "kyuubi.engine.id"
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
index b9ee7beb5..98a4f332e 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
@@ -32,7 +32,7 @@ import org.apache.thrift.transport.TTransport
 import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
 import org.apache.kyuubi.Utils.stringifyException
 import org.apache.kyuubi.config.KyuubiConf.FRONTEND_CONNECTION_URL_USE_HOSTNAME
-import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_CLIENT_IP_KEY, KYUUBI_SESSION_CONNECTION_URL_KEY}
+import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_CLIENT_IP_KEY, KYUUBI_SESSION_CONNECTION_URL_KEY, KYUUBI_SESSION_REAL_USER_KEY}
 import org.apache.kyuubi.operation.{FetchOrientation, OperationHandle}
 import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
 import org.apache.kyuubi.session.SessionHandle
@@ -136,15 +136,23 @@ abstract class TFrontendService(name: String)
     }
   }
 
-  protected def getUserName(req: TOpenSessionReq): String = {
+  /**
+   * Get the real user and the session user.
+   * The real user is the user used for session authentication.
+   * The session user is the proxy user if proxy user is provided, otherwise is the real user.
+   */
+  protected def getRealUserAndSessionUser(req: TOpenSessionReq): (String, String) = {
     val realUser: String =
       ServiceUtils.getShortName(authFactory.getRemoteUser.getOrElse(req.getUsername))
-    if (req.getConfiguration == null) {
-      realUser
-    } else {
-      getProxyUser(req.getConfiguration, authFactory.getIpAddress.orNull, realUser)
-    }
+    val sessionUser =
+      if (req.getConfiguration == null) {
+        realUser
+      } else {
+        getProxyUser(req.getConfiguration, authFactory.getIpAddress.orNull, realUser)
+      }
+    realUser -> sessionUser
   }
+
   protected def getIpAddress: String = {
     authFactory.getIpAddress.orNull
   }
@@ -157,15 +165,17 @@ abstract class TFrontendService(name: String)
   protected def getSessionHandle(req: TOpenSessionReq, res: TOpenSessionResp): SessionHandle = {
     val protocol = getMinVersion(SERVER_VERSION, req.getClient_protocol)
     res.setServerProtocolVersion(protocol)
-    val userName = getUserName(req)
+    val (realUser, sessionUser) = getRealUserAndSessionUser(req)
     val ipAddress = getIpAddress
     val configuration =
       Map(KYUUBI_CLIENT_IP_KEY -> ipAddress) ++
         Option(req.getConfiguration).map(_.asScala.toMap).getOrElse(Map.empty[String, String]) ++
-        Map(KYUUBI_SESSION_CONNECTION_URL_KEY -> connectionUrl)
+        Map(
+          KYUUBI_SESSION_CONNECTION_URL_KEY -> connectionUrl,
+          KYUUBI_SESSION_REAL_USER_KEY -> realUser)
     val sessionHandle = be.openSession(
       protocol,
-      userName,
+      sessionUser,
       req.getPassword,
       ipAddress,
       configuration)
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/KerberizedTestHelper.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/KerberizedTestHelper.scala
index d877f1d52..3d4ed0a2a 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/KerberizedTestHelper.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/KerberizedTestHelper.scala
@@ -34,6 +34,7 @@ import org.ietf.jgss.{GSSContext, GSSException, GSSManager, GSSName}
 import org.scalatest.time.SpanSugar._
 
 trait KerberizedTestHelper extends KyuubiFunSuite {
+  val clientPrincipalUser = "client"
   val baseDir: File =
     Utils.createTempDir("kyuubi-kdc", Utils.getCodeSourceLocation(getClass)).toFile
   val kdcConf = MiniKdc.createConf()
@@ -74,7 +75,7 @@ trait KerberizedTestHelper extends KyuubiFunSuite {
           throw e
       }
     }
-    val tempTestPrincipal = s"client/$hostName"
+    val tempTestPrincipal = s"$clientPrincipalUser/$hostName"
     val tempSpnegoPrincipal = s"HTTP/$hostName"
     kdc.createPrincipal(keytabFile, tempTestPrincipal, tempSpnegoPrincipal)
     rewriteKrb5Conf()
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
index a58312897..aa328b3c8 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
@@ -174,17 +174,21 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
     super.stop()
   }
 
-  def getUserName(hs2ProxyUser: String): String = {
+  def getRealUser(): String = {
+    ServiceUtils.getShortName(
+      Option(AuthenticationFilter.getUserName).filter(_.nonEmpty).getOrElse("anonymous"))
+  }
+
+  def getSessionUser(hs2ProxyUser: String): String = {
     val sessionConf = Option(hs2ProxyUser).filter(_.nonEmpty).map(proxyUser =>
       Map(KyuubiAuthenticationFactory.HS2_PROXY_USER -> proxyUser)).getOrElse(Map())
-    getUserName(sessionConf)
+    getSessionUser(sessionConf)
   }
 
-  def getUserName(sessionConf: Map[String, String]): String = {
+  def getSessionUser(sessionConf: Map[String, String]): String = {
     // using the remote ip address instead of that in proxy http header for authentication
     val ipAddress = AuthenticationFilter.getUserIpAddress
-    val realUser: String = ServiceUtils.getShortName(
-      Option(AuthenticationFilter.getUserName).filter(_.nonEmpty).getOrElse("anonymous"))
+    val realUser: String = getRealUser()
     try {
       getProxyUser(sessionConf, ipAddress, realUser)
     } catch {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiTHttpFrontendService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiTHttpFrontendService.scala
index 4c0e2b875..63933aa77 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiTHttpFrontendService.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiTHttpFrontendService.scala
@@ -274,15 +274,13 @@ final class KyuubiTHttpFrontendService(
     Option(SessionManager.getProxyHttpHeaderIpAddress).getOrElse(SessionManager.getIpAddress)
   }
 
-  override protected def getUserName(req: TOpenSessionReq): String = {
-    var userName: String = SessionManager.getUserName
+  override protected def getRealUserAndSessionUser(req: TOpenSessionReq): (String, String) = {
+    val realUser = getShortName(Option(SessionManager.getUserName).getOrElse(req.getUsername))
     // using the remote ip address instead of that in proxy http header for authentication
     val ipAddress: String = SessionManager.getIpAddress
-    if (userName == null) userName = req.getUsername
-    userName = getShortName(userName)
-    val effectiveClientUser: String = getProxyUser(req.getConfiguration, ipAddress, userName)
-    debug("Client's username: " + effectiveClientUser)
-    effectiveClientUser
+    val sessionUser: String = getProxyUser(req.getConfiguration, ipAddress, realUser)
+    debug(s"Client's real user: $realUser, session user: $sessionUser")
+    realUser -> sessionUser
   }
 
   private def getShortName(userName: String): String = {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
index bc62e6983..37eb36afe 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
@@ -52,7 +52,7 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
   @POST
   @Path("refresh/hadoop_conf")
   def refreshFrontendHadoopConf(): Response = {
-    val userName = fe.getUserName(Map.empty[String, String])
+    val userName = fe.getSessionUser(Map.empty[String, String])
     val ipAddress = fe.getIpAddress
     info(s"Receive refresh Kyuubi server hadoop conf request from $userName/$ipAddress")
     if (!userName.equals(administrator)) {
@@ -76,7 +76,7 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
       @QueryParam("sharelevel") shareLevel: String,
       @QueryParam("subdomain") subdomain: String,
       @QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): Response = {
-    val userName = fe.getUserName(hs2ProxyUser)
+    val userName = fe.getSessionUser(hs2ProxyUser)
     val engine = getEngine(userName, engineType, shareLevel, subdomain, "default")
     val engineSpace = getEngineSpace(engine)
 
@@ -111,7 +111,7 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
       @QueryParam("sharelevel") shareLevel: String,
       @QueryParam("subdomain") subdomain: String,
       @QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): Seq[Engine] = {
-    val userName = fe.getUserName(hs2ProxyUser)
+    val userName = fe.getSessionUser(hs2ProxyUser)
     val engine = getEngine(userName, engineType, shareLevel, subdomain, "")
     val engineSpace = getEngineSpace(engine)
 
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 9fd1c2bda..6900c1a7f 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -34,7 +34,7 @@ import org.apache.kyuubi.{Logging, Utils}
 import org.apache.kyuubi.client.api.v1.dto._
 import org.apache.kyuubi.client.exception.KyuubiRestException
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_CLIENT_IP_KEY, KYUUBI_SESSION_CONNECTION_URL_KEY}
+import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_CLIENT_IP_KEY, KYUUBI_SESSION_CONNECTION_URL_KEY, KYUUBI_SESSION_REAL_USER_KEY}
 import org.apache.kyuubi.engine.ApplicationInfo
 import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation, OperationState}
 import org.apache.kyuubi.server.api.ApiRequestContext
@@ -166,12 +166,13 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
     require(request.getClassName != null, "classname is a required parameter")
     request.setBatchType(request.getBatchType.toUpperCase(Locale.ROOT))
 
-    val userName = fe.getUserName(request.getConf.asScala.toMap)
+    val userName = fe.getSessionUser(request.getConf.asScala.toMap)
     val ipAddress = fe.getIpAddress
     request.setConf(
       (request.getConf.asScala ++ Map(
         KYUUBI_CLIENT_IP_KEY -> ipAddress,
-        KYUUBI_SESSION_CONNECTION_URL_KEY -> fe.connectionUrl)).asJava)
+        KYUUBI_SESSION_CONNECTION_URL_KEY -> fe.connectionUrl,
+        KYUUBI_SESSION_REAL_USER_KEY -> fe.getRealUser())).asJava)
     val sessionHandle = sessionManager.openBatchSession(
       userName,
       "anonymous",
@@ -190,7 +191,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
   @GET
   @Path("{batchId}")
   def batchInfo(@PathParam("batchId") batchId: String): Batch = {
-    val userName = fe.getUserName(Map.empty[String, String])
+    val userName = fe.getSessionUser(Map.empty[String, String])
     val sessionHandle = formatSessionHandle(batchId)
     Option(sessionManager.getBatchSessionImpl(sessionHandle)).map { batchSession =>
       buildBatch(batchSession)
@@ -267,7 +268,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
       @PathParam("batchId") batchId: String,
       @QueryParam("from") @DefaultValue("-1") from: Int,
       @QueryParam("size") @DefaultValue("100") size: Int): OperationLog = {
-    val userName = fe.getUserName(Map.empty[String, String])
+    val userName = fe.getSessionUser(Map.empty[String, String])
     val sessionHandle = formatSessionHandle(batchId)
     Option(sessionManager.getBatchSessionImpl(sessionHandle)).map { batchSession =>
       try {
@@ -312,7 +313,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
       @QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): CloseBatchResponse = {
     val sessionHandle = formatSessionHandle(batchId)
 
-    val userName = fe.getUserName(hs2ProxyUser)
+    val userName = fe.getSessionUser(hs2ProxyUser)
 
     Option(sessionManager.getBatchSessionImpl(sessionHandle)).map { batchSession =>
       if (userName != batchSession.user) {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
index f28036c0f..093b8c08c 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
@@ -32,7 +32,7 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType, TProtocolVersion}
 import org.apache.kyuubi.Logging
 import org.apache.kyuubi.client.api.v1.dto
 import org.apache.kyuubi.client.api.v1.dto._
-import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_CLIENT_IP_KEY, KYUUBI_SESSION_CONNECTION_URL_KEY}
+import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_CLIENT_IP_KEY, KYUUBI_SESSION_CONNECTION_URL_KEY, KYUUBI_SESSION_REAL_USER_KEY}
 import org.apache.kyuubi.events.KyuubiEvent
 import org.apache.kyuubi.operation.OperationHandle
 import org.apache.kyuubi.server.api.ApiRequestContext
@@ -141,7 +141,7 @@ private[v1] class SessionsResource extends ApiRequestContext with Logging {
   @POST
   @Consumes(Array(MediaType.APPLICATION_JSON))
   def openSession(request: SessionOpenRequest): dto.SessionHandle = {
-    val userName = fe.getUserName(request.getConfigs.asScala.toMap)
+    val userName = fe.getSessionUser(request.getConfigs.asScala.toMap)
     val ipAddress = fe.getIpAddress
     val handle = fe.be.openSession(
       TProtocolVersion.findByValue(request.getProtocolVersion),
@@ -150,7 +150,8 @@ private[v1] class SessionsResource extends ApiRequestContext with Logging {
       ipAddress,
       (request.getConfigs.asScala ++ Map(
         KYUUBI_CLIENT_IP_KEY -> ipAddress,
-        KYUUBI_SESSION_CONNECTION_URL_KEY -> fe.connectionUrl)).toMap)
+        KYUUBI_SESSION_CONNECTION_URL_KEY -> fe.connectionUrl,
+        KYUUBI_SESSION_REAL_USER_KEY -> fe.getRealUser())).toMap)
     new dto.SessionHandle(handle.identifier)
   }
 
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandHandler.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandHandler.scala
index 5515f34df..03d437e09 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandHandler.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandHandler.scala
@@ -27,7 +27,7 @@ import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
 import org.apache.kyuubi.{KyuubiSQLException, Logging}
-import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_CONNECTION_URL_KEY
+import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_CONNECTION_URL_KEY, KYUUBI_SESSION_REAL_USER_KEY}
 import org.apache.kyuubi.operation.FetchOrientation
 import org.apache.kyuubi.operation.OperationState._
 import org.apache.kyuubi.server.mysql.MySQLCommandHandler._
@@ -107,7 +107,9 @@ class MySQLCommandHandler(connectionUrl: String, be: BackendService, execPool: T
         user,
         "",
         remoteIp,
-        sessionConf ++ Map(KYUUBI_SESSION_CONNECTION_URL_KEY -> connectionUrl))
+        sessionConf ++ Map(
+          KYUUBI_SESSION_CONNECTION_URL_KEY -> connectionUrl,
+          KYUUBI_SESSION_REAL_USER_KEY -> user))
       sessionHandle
     } catch {
       case rethrow: Exception =>
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
index 2b8d7ef35..24dcc0477 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
@@ -114,8 +114,7 @@ class KyuubiBatchSessionImpl(
       val metaData = Metadata(
         identifier = handle.identifier.toString,
         sessionType = sessionType,
-        // TODO: support real user
-        realUser = user,
+        realUser = realUser,
         username = user,
         ipAddress = ipAddress,
         kyuubiInstance = connectionUrl,
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala
index 988b3ee41..e2c692820 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala
@@ -18,7 +18,7 @@ package org.apache.kyuubi.session
 
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
-import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_CONNECTION_URL_KEY
+import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_CONNECTION_URL_KEY, KYUUBI_SESSION_REAL_USER_KEY}
 import org.apache.kyuubi.events.KyuubiSessionEvent
 import org.apache.kyuubi.session.SessionType.SessionType
 
@@ -35,6 +35,8 @@ abstract class KyuubiSession(
 
   val connectionUrl = conf.get(KYUUBI_SESSION_CONNECTION_URL_KEY).getOrElse("")
 
+  val realUser = conf.get(KYUUBI_SESSION_REAL_USER_KEY).getOrElse(user)
+
   def getSessionEvent: Option[KyuubiSessionEvent]
 
   def checkSessionAccessPathURIs(): Unit
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/RestClientTestHelper.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/RestClientTestHelper.scala
index d15993797..5b3627381 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/RestClientTestHelper.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/RestClientTestHelper.scala
@@ -38,6 +38,8 @@ trait RestClientTestHelper extends RestFrontendTestHelper with KerberizedTestHel
     super.afterAll()
   }
 
+  protected val otherConfigs: Map[String, String] = Map.empty
+
   override protected lazy val conf: KyuubiConf = {
     val config = new Configuration()
     val authType = "hadoop.security.authentication"
@@ -46,7 +48,7 @@ trait RestClientTestHelper extends RestFrontendTestHelper with KerberizedTestHel
     UserGroupInformation.setConfiguration(config)
     assert(UserGroupInformation.isSecurityEnabled)
 
-    KyuubiConf().set(KyuubiConf.AUTHENTICATION_METHOD, Seq("KERBEROS", "LDAP", "CUSTOM"))
+    val conf = KyuubiConf().set(KyuubiConf.AUTHENTICATION_METHOD, Seq("KERBEROS", "LDAP", "CUSTOM"))
       .set(KyuubiConf.SERVER_KEYTAB.key, testKeytab)
       .set(KyuubiConf.SERVER_PRINCIPAL, testPrincipal)
       .set(KyuubiConf.SERVER_SPNEGO_KEYTAB, testKeytab)
@@ -56,6 +58,8 @@ trait RestClientTestHelper extends RestFrontendTestHelper with KerberizedTestHel
       .set(
         KyuubiConf.AUTHENTICATION_CUSTOM_CLASS,
         classOf[UserDefineAuthenticationProviderImpl].getCanonicalName)
+    otherConfigs.foreach(kv => conf.set(kv._1, kv._2))
+    conf
   }
 
 }
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerGroupSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerGroupSuite.scala
index 5a7634408..173b4d293 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerGroupSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerGroupSuite.scala
@@ -19,8 +19,9 @@ package org.apache.kyuubi.operation
 
 import org.apache.hadoop.security.UserGroupInformation
 
-import org.apache.kyuubi.WithKyuubiServer
+import org.apache.kyuubi.{Utils, WithKyuubiServer}
 import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.session.KyuubiSession
 
 class KyuubiOperationPerGroupSuite extends WithKyuubiServer with SparkQueryTests {
 
@@ -76,4 +77,15 @@ class KyuubiOperationPerGroupSuite extends WithKyuubiServer with SparkQueryTests
       }
     }
   }
+
+  test("support real user for kyuubi session") {
+    withSessionConf(Map("hive.server2.proxy.user" -> "user1"))(Map.empty)(Map.empty) {
+      withJdbcStatement() { _ =>
+        val session =
+          server.backendService.sessionManager.allSessions().head.asInstanceOf[KyuubiSession]
+        assert(session.realUser === Utils.currentUser)
+        assert(session.user === "user1")
+      }
+    }
+  }
 }
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiRestAuthenticationSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiRestAuthenticationSuite.scala
index 8bb5d1a76..858aac00e 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiRestAuthenticationSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiRestAuthenticationSuite.scala
@@ -33,9 +33,17 @@ import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.server.http.authentication.AuthenticationHandler.AUTHORIZATION_HEADER
 import org.apache.kyuubi.server.http.authentication.AuthSchemes
 import org.apache.kyuubi.service.authentication.InternalSecurityAccessor
+import org.apache.kyuubi.session.KyuubiSession
 
 class KyuubiRestAuthenticationSuite extends RestClientTestHelper {
 
+  override protected val otherConfigs: Map[String, String] = {
+    // allow to impersonate other users with spnego authentication
+    Map(
+      s"hadoop.proxyuser.$clientPrincipalUser.groups" -> "*",
+      s"hadoop.proxyuser.$clientPrincipalUser.hosts" -> "*")
+  }
+
   test("test with LDAP authorization") {
     val encodeAuthorization = new String(
       Base64.getEncoder.encode(
@@ -118,6 +126,7 @@ class KyuubiRestAuthenticationSuite extends RestClientTestHelper {
   }
 
   test("test with ugi wrapped open session") {
+    val proxyUser = "user1"
     UserGroupInformation.loginUserFromKeytab(testPrincipal, testKeytab)
     var token = generateToken(hostName)
     val sessionOpenRequest = new SessionOpenRequest(
@@ -125,7 +134,9 @@ class KyuubiRestAuthenticationSuite extends RestClientTestHelper {
       "kyuubi",
       "pass",
       "localhost",
-      Map(KyuubiConf.ENGINE_SHARE_LEVEL.key -> "CONNECTION").asJava)
+      Map(
+        KyuubiConf.ENGINE_SHARE_LEVEL.key -> "CONNECTION",
+        "hive.server2.proxy.user" -> proxyUser).asJava)
 
     var response = webTarget.path("api/v1/sessions")
       .request()
@@ -134,6 +145,11 @@ class KyuubiRestAuthenticationSuite extends RestClientTestHelper {
 
     assert(HttpServletResponse.SC_OK == response.getStatus)
     val sessionHandle = response.readEntity(classOf[SessionHandle])
+    val session = server.backendService.sessionManager.getSession(
+      org.apache.kyuubi.session.SessionHandle.fromUUID(sessionHandle.getIdentifier.toString))
+      .asInstanceOf[KyuubiSession]
+    assert(session.realUser === clientPrincipalUser)
+    assert(session.user === proxyUser)
 
     token = generateToken(hostName)
     response = webTarget.path(s"api/v1/sessions/${sessionHandle.getIdentifier}")
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala
index c0a1343c6..b425f62d6 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala
@@ -26,9 +26,17 @@ import org.apache.kyuubi.client.{BatchRestApi, KyuubiRestClient}
 import org.apache.kyuubi.client.api.v1.dto.Batch
 import org.apache.kyuubi.client.exception.KyuubiRestException
 import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
+import org.apache.kyuubi.session.{KyuubiSession, SessionHandle}
 
 class BatchRestApiSuite extends RestClientTestHelper with BatchTestHelper {
 
+  override protected val otherConfigs: Map[String, String] = {
+    // allow to impersonate other users with spnego authentication
+    Map(
+      s"hadoop.proxyuser.$clientPrincipalUser.groups" -> "*",
+      s"hadoop.proxyuser.$clientPrincipalUser.hosts" -> "*")
+  }
+
   override protected def afterEach(): Unit = {
     eventually(timeout(5.seconds), interval(200.milliseconds)) {
       assert(MetricsSystem.counterValue(
@@ -108,6 +116,7 @@ class BatchRestApiSuite extends RestClientTestHelper with BatchTestHelper {
   }
 
   test("spnego batch rest client") {
+    val proxyUser = "user1"
     val spnegoKyuubiRestClient: KyuubiRestClient =
       KyuubiRestClient.builder(baseUri.toString)
         .authHeaderMethod(KyuubiRestClient.AuthHeaderMethod.SPNEGO)
@@ -115,11 +124,16 @@ class BatchRestApiSuite extends RestClientTestHelper with BatchTestHelper {
         .build()
     val batchRestApi: BatchRestApi = new BatchRestApi(spnegoKyuubiRestClient)
     // create batch
-    val requestObj = newSparkBatchRequest(Map("spark.master" -> "local"))
+    val requestObj =
+      newSparkBatchRequest(Map("spark.master" -> "local", "hive.server2.proxy.user" -> proxyUser))
 
     var batch: Batch = batchRestApi.createBatch(requestObj)
     assert(batch.getKyuubiInstance === fe.connectionUrl)
     assert(batch.getBatchType === "SPARK")
+    val session = server.backendService.sessionManager.getSession(
+      SessionHandle.fromUUID(batch.getId)).asInstanceOf[KyuubiSession]
+    assert(session.realUser === clientPrincipalUser)
+    assert(session.user === proxyUser)
 
     // get batch by id
     batch = batchRestApi.getBatchById(batch.getId())
@@ -131,7 +145,7 @@ class BatchRestApiSuite extends RestClientTestHelper with BatchTestHelper {
     assert(log.getRowCount == 1)
 
     // delete batch
-    val closeResp = batchRestApi.deleteBatch(batch.getId(), null)
+    val closeResp = batchRestApi.deleteBatch(batch.getId(), proxyUser)
     assert(closeResp.getMsg.nonEmpty)
 
     // list batches