You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2023/03/15 06:04:21 UTC
[kyuubi] branch master updated: [KYUUBI #4498] list sessions/operations with conditions
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new a027b6e0a [KYUUBI #4498] list sessions/operations with conditions
a027b6e0a is described below
commit a027b6e0a187010c4d720f2619137d0625894362
Author: Tianlin Liao <ti...@ebay.com>
AuthorDate: Wed Mar 15 14:04:14 2023 +0800
[KYUUBI #4498] list sessions/operations with conditions
### _Why are the changes needed?_
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4507 from lightning-L/kyuubi-4498.
Closes #4498
6e3101288 [Tianlin Liao] [KYUUBI #4498] list sessions/operations with conditions
Authored-by: Tianlin Liao <ti...@ebay.com>
Signed-off-by: fwang12 <fw...@ebay.com>
---
.../kyuubi/server/api/v1/AdminResource.scala | 19 +++++--
.../kyuubi/server/api/v1/AdminResourceSuite.scala | 64 ++++++++++++++++++++++
2 files changed, 79 insertions(+), 4 deletions(-)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
index e5401ca41..ee63243cc 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable.ListBuffer
import io.swagger.v3.oas.annotations.media.{ArraySchema, Content, Schema}
import io.swagger.v3.oas.annotations.responses.ApiResponse
import io.swagger.v3.oas.annotations.tags.Tag
+import org.apache.commons.lang3.StringUtils
import org.apache.zookeeper.KeeperException.NoNodeException
import org.apache.kyuubi.{KYUUBI_VERSION, Logging, Utils}
@@ -113,7 +114,7 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
description = "get the list of all live sessions")
@GET
@Path("sessions")
- def sessions(): Seq[SessionData] = {
+ def sessions(@QueryParam("users") users: String): Seq[SessionData] = {
val userName = fe.getSessionUser(Map.empty[String, String])
val ipAddress = fe.getIpAddress
info(s"Received listing all live sessions request from $userName/$ipAddress")
@@ -121,7 +122,12 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
throw new NotAllowedException(
s"$userName is not allowed to list all live sessions")
}
- fe.be.sessionManager.allSessions().map { case session =>
+ var sessions = fe.be.sessionManager.allSessions()
+ if (StringUtils.isNotBlank(users)) {
+ val usersSet = users.split(",").toSet
+ sessions = sessions.filter(session => usersSet.contains(session.user))
+ }
+ sessions.map { case session =>
ApiUtils.sessionData(session.asInstanceOf[KyuubiSession])
}.toSeq
}
@@ -154,7 +160,7 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
"get the list of all active operations")
@GET
@Path("operations")
- def listOperations(): Seq[OperationData] = {
+ def listOperations(@QueryParam("users") users: String): Seq[OperationData] = {
val userName = fe.getSessionUser(Map.empty[String, String])
val ipAddress = fe.getIpAddress
info(s"Received listing all of the active operations request from $userName/$ipAddress")
@@ -162,7 +168,12 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
throw new NotAllowedException(
s"$userName is not allowed to list all the operations")
}
- fe.be.sessionManager.operationManager.allOperations()
+ var operations = fe.be.sessionManager.operationManager.allOperations()
+ if (StringUtils.isNotBlank(users)) {
+ val usersSet = users.split(",").toSet
+ operations = operations.filter(operation => usersSet.contains(operation.getSession.user))
+ }
+ operations
.map(operation => ApiUtils.operationData(operation.asInstanceOf[KyuubiOperation])).toSeq
}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
index 0fc912e7a..0e0a2415a 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
@@ -168,6 +168,70 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
assert(sessions2.isEmpty)
}
+ test("list sessions/operations with filter") {
+ fe.be.openSession(
+ HIVE_CLI_SERVICE_PROTOCOL_V2,
+ "admin",
+ "123456",
+ "localhost",
+ Map("testConfig" -> "testValue"))
+
+ fe.be.openSession(
+ HIVE_CLI_SERVICE_PROTOCOL_V2,
+ "admin",
+ "123456",
+ "localhost",
+ Map("testConfig" -> "testValue"))
+
+ fe.be.openSession(
+ HIVE_CLI_SERVICE_PROTOCOL_V2,
+ "test_user_1",
+ "xxxxxx",
+ "localhost",
+ Map("testConfig" -> "testValue"))
+
+ fe.be.openSession(
+ HIVE_CLI_SERVICE_PROTOCOL_V2,
+ "test_user_2",
+ "xxxxxx",
+ "localhost",
+ Map("testConfig" -> "testValue"))
+
+ val adminUser = Utils.currentUser
+ val encodeAuthorization = new String(
+ Base64.getEncoder.encode(
+ s"$adminUser:".getBytes()),
+ "UTF-8")
+
+ // list sessions
+ var response = webTarget.path("api/v1/admin/sessions")
+ .queryParam("users", "admin")
+ .request()
+ .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
+ .get()
+ var sessions = response.readEntity(classOf[Seq[SessionData]])
+ assert(200 == response.getStatus)
+ assert(sessions.size == 2)
+
+ response = webTarget.path("api/v1/admin/sessions")
+ .queryParam("users", "test_user_1,test_user_2")
+ .request()
+ .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
+ .get()
+ sessions = response.readEntity(classOf[Seq[SessionData]])
+ assert(200 == response.getStatus)
+ assert(sessions.size == 2)
+
+ // list operations
+ response = webTarget.path("api/v1/admin/operations")
+ .queryParam("users", "test_user_1,test_user_2")
+ .request()
+ .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
+ .get()
+ val operations = response.readEntity(classOf[Seq[OperationData]])
+ assert(operations.size == 2)
+ }
+
test("list/close operations") {
val sessionHandle = fe.be.openSession(
HIVE_CLI_SERVICE_PROTOCOL_V2,