You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2024/04/22 01:18:34 UTC
(kyuubi) branch branch-1.9 updated: [KYUUBI #6049] Support to filter sessions/operations with session type
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.9 by this push:
new 031306db0 [KYUUBI #6049] Support to filter sessions/operations with session type
031306db0 is described below
commit 031306db0aa21c5d2e70ccb71c602bdd1263aa8a
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Sun Apr 21 18:18:19 2024 -0700
[KYUUBI #6049] Support to filter sessions/operations with session type
# :mag: Description
Support to filter sessions/operations with session type.
## Issue References ๐
This pull request fixes #
## Describe Your Solution ๐ง
Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklist ๐
- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6049 from turboFei/batch_interactive.
Closes #6049
68390c774 [Wang, Fei] add ut
bfc2cb8c2 [Fei Wang] save
c1979c7ea [Fei Wang] saev
Lead-authored-by: Fei Wang <fw...@ebay.com>
Co-authored-by: Wang, Fei <fw...@ebay.com>
Signed-off-by: Wang, Fei <fw...@ebay.com>
(cherry picked from commit d299da77b3657801e30392b2db0aca2cbed23e56)
Signed-off-by: Wang, Fei <fw...@ebay.com>
---
.../org/apache/kyuubi/client/AdminRestApi.java | 15 ++++++---
.../kyuubi/server/api/v1/AdminResource.scala | 16 ++++++++--
.../kyuubi/server/api/v1/AdminResourceSuite.scala | 37 ++++++++++++++++++++++
3 files changed, 62 insertions(+), 6 deletions(-)
diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java
index 002f6b46d..0f6fbbc47 100644
--- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java
+++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java
@@ -89,14 +89,17 @@ public class AdminRestApi {
}
public List<SessionData> listSessions() {
- return listSessions(Collections.emptyList());
+ return listSessions(Collections.emptyList(), null);
}
- public List<SessionData> listSessions(List<String> users) {
+ public List<SessionData> listSessions(List<String> users, String sessionType) {
Map<String, Object> params = new HashMap<>();
if (users != null && !users.isEmpty()) {
params.put("users", String.join(",", users));
}
+ if (StringUtils.isNotBlank(sessionType)) {
+ params.put("sessionType", sessionType);
+ }
SessionData[] result =
this.getClient()
.get(API_BASE_PATH + "/sessions", params, SessionData[].class, client.getAuthHeader());
@@ -109,10 +112,11 @@ public class AdminRestApi {
}
public List<OperationData> listOperations() {
- return listOperations(Collections.emptyList(), null);
+ return listOperations(Collections.emptyList(), null, null);
}
- public List<OperationData> listOperations(List<String> users, String sessionHandleStr) {
+ public List<OperationData> listOperations(
+ List<String> users, String sessionHandleStr, String sessionType) {
Map<String, Object> params = new HashMap<>();
if (users != null && !users.isEmpty()) {
params.put("users", String.join(",", users));
@@ -120,6 +124,9 @@ public class AdminRestApi {
if (StringUtils.isNotBlank(sessionHandleStr)) {
params.put("sessionHandle", sessionHandleStr);
}
+ if (StringUtils.isNotBlank(sessionType)) {
+ params.put("sessionType", sessionType);
+ }
OperationData[] result =
this.getClient()
.get(
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
index e31c792c3..2e61e6b08 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
@@ -168,7 +168,9 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
description = "get the list of all live sessions")
@GET
@Path("sessions")
- def sessions(@QueryParam("users") users: String): Seq[SessionData] = {
+ def sessions(
+ @QueryParam("users") users: String,
+ @QueryParam("sessionType") sessionType: String): Seq[SessionData] = {
val userName = fe.getSessionUser(Map.empty[String, String])
val ipAddress = fe.getIpAddress
info(s"Received listing all live sessions request from $userName/$ipAddress")
@@ -177,6 +179,10 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
s"$userName is not allowed to list all live sessions")
}
var sessions = fe.be.sessionManager.allSessions()
+ if (StringUtils.isNoneBlank(sessionType)) {
+ sessions = sessions.filter(session =>
+ sessionType.equals(session.asInstanceOf[KyuubiSession].sessionType.toString))
+ }
if (StringUtils.isNotBlank(users)) {
val usersSet = users.split(",").toSet
sessions = sessions.filter(session => usersSet.contains(session.user))
@@ -214,7 +220,8 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
@Path("operations")
def listOperations(
@QueryParam("users") users: String,
- @QueryParam("sessionHandle") sessionHandle: String): Seq[OperationData] = {
+ @QueryParam("sessionHandle") sessionHandle: String,
+ @QueryParam("sessionType") sessionType: String): Seq[OperationData] = {
val userName = fe.getSessionUser(Map.empty[String, String])
val ipAddress = fe.getIpAddress
info(s"Received listing all of the active operations request from $userName/$ipAddress")
@@ -231,6 +238,11 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
operations = operations.filter(operation =>
operation.getSession.handle.equals(SessionHandle.fromUUID(sessionHandle)))
}
+ if (StringUtils.isNotBlank(sessionType)) {
+ operations = operations.filter(operation =>
+ sessionType.equalsIgnoreCase(
+ operation.getSession.asInstanceOf[KyuubiSession].sessionType.toString))
+ }
operations
.map(operation => ApiUtils.operationData(operation.asInstanceOf[KyuubiOperation])).toSeq
}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
index 95aa3de02..2360dea60 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
@@ -44,6 +44,7 @@ import org.apache.kyuubi.server.KyuubiRestFrontendService
import org.apache.kyuubi.server.http.util.HttpAuthUtils
import org.apache.kyuubi.server.http.util.HttpAuthUtils.AUTHORIZATION_HEADER
import org.apache.kyuubi.service.authentication.AnonymousAuthenticationProviderImpl
+import org.apache.kyuubi.session.SessionType
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2
class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
@@ -230,6 +231,42 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
operations = response.readEntity(classOf[Seq[OperationData]])
assert(response.getStatus === 200)
assert(operations.size == 1)
+
+ response = webTarget.path("api/v1/admin/sessions")
+ .queryParam("sessionType", SessionType.INTERACTIVE.toString)
+ .request()
+ .header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser))
+ .get()
+ sessions = response.readEntity(classOf[Seq[SessionData]])
+ assert(response.getStatus === 200)
+ assert(sessions.size > 0)
+
+ response = webTarget.path("api/v1/admin/sessions")
+ .queryParam("sessionType", SessionType.BATCH.toString)
+ .request()
+ .header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser))
+ .get()
+ sessions = response.readEntity(classOf[Seq[SessionData]])
+ assert(response.getStatus === 200)
+ assert(sessions.size == 0)
+
+ response = webTarget.path("api/v1/admin/operations")
+ .queryParam("sessionType", SessionType.INTERACTIVE.toString)
+ .request()
+ .header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser))
+ .get()
+ operations = response.readEntity(classOf[Seq[OperationData]])
+ assert(response.getStatus === 200)
+ assert(operations.size > 0)
+
+ response = webTarget.path("api/v1/admin/operations")
+ .queryParam("sessionType", SessionType.BATCH.toString)
+ .request()
+ .header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser))
+ .get()
+ operations = response.readEntity(classOf[Seq[OperationData]])
+ assert(response.getStatus === 200)
+ assert(operations.size == 0)
}
test("list/close operations") {