You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2023/03/15 06:04:21 UTC

[kyuubi] branch master updated: [KYUUBI #4498] list sessions/operations with conditions

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new a027b6e0a [KYUUBI #4498] list sessions/operations with conditions
a027b6e0a is described below

commit a027b6e0a187010c4d720f2619137d0625894362
Author: Tianlin Liao <ti...@ebay.com>
AuthorDate: Wed Mar 15 14:04:14 2023 +0800

    [KYUUBI #4498] list sessions/operations with conditions
    
    ### _Why are the changes needed?_
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4507 from lightning-L/kyuubi-4498.
    
    Closes #4498
    
    6e3101288 [Tianlin Liao] [KYUUBI #4498] list sessions/operations with conditions
    
    Authored-by: Tianlin Liao <ti...@ebay.com>
    Signed-off-by: fwang12 <fw...@ebay.com>
---
 .../kyuubi/server/api/v1/AdminResource.scala       | 19 +++++--
 .../kyuubi/server/api/v1/AdminResourceSuite.scala  | 64 ++++++++++++++++++++++
 2 files changed, 79 insertions(+), 4 deletions(-)

diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
index e5401ca41..ee63243cc 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable.ListBuffer
 import io.swagger.v3.oas.annotations.media.{ArraySchema, Content, Schema}
 import io.swagger.v3.oas.annotations.responses.ApiResponse
 import io.swagger.v3.oas.annotations.tags.Tag
+import org.apache.commons.lang3.StringUtils
 import org.apache.zookeeper.KeeperException.NoNodeException
 
 import org.apache.kyuubi.{KYUUBI_VERSION, Logging, Utils}
@@ -113,7 +114,7 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
     description = "get the list of all live sessions")
   @GET
   @Path("sessions")
-  def sessions(): Seq[SessionData] = {
+  def sessions(@QueryParam("users") users: String): Seq[SessionData] = {
     val userName = fe.getSessionUser(Map.empty[String, String])
     val ipAddress = fe.getIpAddress
     info(s"Received listing all live sessions request from $userName/$ipAddress")
@@ -121,7 +122,12 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
       throw new NotAllowedException(
         s"$userName is not allowed to list all live sessions")
     }
-    fe.be.sessionManager.allSessions().map { case session =>
+    var sessions = fe.be.sessionManager.allSessions()
+    if (StringUtils.isNotBlank(users)) {
+      val usersSet = users.split(",").toSet
+      sessions = sessions.filter(session => usersSet.contains(session.user))
+    }
+    sessions.map { case session =>
       ApiUtils.sessionData(session.asInstanceOf[KyuubiSession])
     }.toSeq
   }
@@ -154,7 +160,7 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
       "get the list of all active operations")
   @GET
   @Path("operations")
-  def listOperations(): Seq[OperationData] = {
+  def listOperations(@QueryParam("users") users: String): Seq[OperationData] = {
     val userName = fe.getSessionUser(Map.empty[String, String])
     val ipAddress = fe.getIpAddress
     info(s"Received listing all of the active operations request from $userName/$ipAddress")
@@ -162,7 +168,12 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
       throw new NotAllowedException(
         s"$userName is not allowed to list all the operations")
     }
-    fe.be.sessionManager.operationManager.allOperations()
+    var operations = fe.be.sessionManager.operationManager.allOperations()
+    if (StringUtils.isNotBlank(users)) {
+      val usersSet = users.split(",").toSet
+      operations = operations.filter(operation => usersSet.contains(operation.getSession.user))
+    }
+    operations
       .map(operation => ApiUtils.operationData(operation.asInstanceOf[KyuubiOperation])).toSeq
   }
 
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
index 0fc912e7a..0e0a2415a 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
@@ -168,6 +168,70 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
     assert(sessions2.isEmpty)
   }
 
+  test("list sessions/operations with filter") {
+    fe.be.openSession(
+      HIVE_CLI_SERVICE_PROTOCOL_V2,
+      "admin",
+      "123456",
+      "localhost",
+      Map("testConfig" -> "testValue"))
+
+    fe.be.openSession(
+      HIVE_CLI_SERVICE_PROTOCOL_V2,
+      "admin",
+      "123456",
+      "localhost",
+      Map("testConfig" -> "testValue"))
+
+    fe.be.openSession(
+      HIVE_CLI_SERVICE_PROTOCOL_V2,
+      "test_user_1",
+      "xxxxxx",
+      "localhost",
+      Map("testConfig" -> "testValue"))
+
+    fe.be.openSession(
+      HIVE_CLI_SERVICE_PROTOCOL_V2,
+      "test_user_2",
+      "xxxxxx",
+      "localhost",
+      Map("testConfig" -> "testValue"))
+
+    val adminUser = Utils.currentUser
+    val encodeAuthorization = new String(
+      Base64.getEncoder.encode(
+        s"$adminUser:".getBytes()),
+      "UTF-8")
+
+    // list sessions
+    var response = webTarget.path("api/v1/admin/sessions")
+      .queryParam("users", "admin")
+      .request()
+      .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
+      .get()
+    var sessions = response.readEntity(classOf[Seq[SessionData]])
+    assert(200 == response.getStatus)
+    assert(sessions.size == 2)
+
+    response = webTarget.path("api/v1/admin/sessions")
+      .queryParam("users", "test_user_1,test_user_2")
+      .request()
+      .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
+      .get()
+    sessions = response.readEntity(classOf[Seq[SessionData]])
+    assert(200 == response.getStatus)
+    assert(sessions.size == 2)
+
+    // list operations
+    response = webTarget.path("api/v1/admin/operations")
+      .queryParam("users", "test_user_1,test_user_2")
+      .request()
+      .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
+      .get()
+    val operations = response.readEntity(classOf[Seq[OperationData]])
+    assert(operations.size == 2)
+  }
+
   test("list/close operations") {
     val sessionHandle = fe.be.openSession(
       HIVE_CLI_SERVICE_PROTOCOL_V2,