You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2024/04/22 01:18:34 UTC

(kyuubi) branch branch-1.9 updated: [KYUUBI #6049] Support to filter sessions/operations with session type

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.9 by this push:
     new 031306db0 [KYUUBI #6049] Support to filter sessions/operations with session type
031306db0 is described below

commit 031306db0aa21c5d2e70ccb71c602bdd1263aa8a
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Sun Apr 21 18:18:19 2024 -0700

    [KYUUBI #6049] Support to filter sessions/operations with session type
    
    # :mag: Description
    Support to filter sessions/operations with session type.
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #
    
    ## Describe Your Solution ๐Ÿ”ง
    
    Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6049 from turboFei/batch_interactive.
    
    Closes #6049
    
    68390c774 [Wang, Fei] add ut
    bfc2cb8c2 [Fei Wang] save
    c1979c7ea [Fei Wang] saev
    
    Lead-authored-by: Fei Wang <fw...@ebay.com>
    Co-authored-by: Wang, Fei <fw...@ebay.com>
    Signed-off-by: Wang, Fei <fw...@ebay.com>
    (cherry picked from commit d299da77b3657801e30392b2db0aca2cbed23e56)
    Signed-off-by: Wang, Fei <fw...@ebay.com>
---
 .../org/apache/kyuubi/client/AdminRestApi.java     | 15 ++++++---
 .../kyuubi/server/api/v1/AdminResource.scala       | 16 ++++++++--
 .../kyuubi/server/api/v1/AdminResourceSuite.scala  | 37 ++++++++++++++++++++++
 3 files changed, 62 insertions(+), 6 deletions(-)

diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java
index 002f6b46d..0f6fbbc47 100644
--- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java
+++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java
@@ -89,14 +89,17 @@ public class AdminRestApi {
   }
 
   public List<SessionData> listSessions() {
-    return listSessions(Collections.emptyList());
+    return listSessions(Collections.emptyList(), null);
   }
 
-  public List<SessionData> listSessions(List<String> users) {
+  public List<SessionData> listSessions(List<String> users, String sessionType) {
     Map<String, Object> params = new HashMap<>();
     if (users != null && !users.isEmpty()) {
       params.put("users", String.join(",", users));
     }
+    if (StringUtils.isNotBlank(sessionType)) {
+      params.put("sessionType", sessionType);
+    }
     SessionData[] result =
         this.getClient()
             .get(API_BASE_PATH + "/sessions", params, SessionData[].class, client.getAuthHeader());
@@ -109,10 +112,11 @@ public class AdminRestApi {
   }
 
   public List<OperationData> listOperations() {
-    return listOperations(Collections.emptyList(), null);
+    return listOperations(Collections.emptyList(), null, null);
   }
 
-  public List<OperationData> listOperations(List<String> users, String sessionHandleStr) {
+  public List<OperationData> listOperations(
+      List<String> users, String sessionHandleStr, String sessionType) {
     Map<String, Object> params = new HashMap<>();
     if (users != null && !users.isEmpty()) {
       params.put("users", String.join(",", users));
@@ -120,6 +124,9 @@ public class AdminRestApi {
     if (StringUtils.isNotBlank(sessionHandleStr)) {
       params.put("sessionHandle", sessionHandleStr);
     }
+    if (StringUtils.isNotBlank(sessionType)) {
+      params.put("sessionType", sessionType);
+    }
     OperationData[] result =
         this.getClient()
             .get(
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
index e31c792c3..2e61e6b08 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
@@ -168,7 +168,9 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
     description = "get the list of all live sessions")
   @GET
   @Path("sessions")
-  def sessions(@QueryParam("users") users: String): Seq[SessionData] = {
+  def sessions(
+      @QueryParam("users") users: String,
+      @QueryParam("sessionType") sessionType: String): Seq[SessionData] = {
     val userName = fe.getSessionUser(Map.empty[String, String])
     val ipAddress = fe.getIpAddress
     info(s"Received listing all live sessions request from $userName/$ipAddress")
@@ -177,6 +179,10 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
         s"$userName is not allowed to list all live sessions")
     }
     var sessions = fe.be.sessionManager.allSessions()
+    if (StringUtils.isNoneBlank(sessionType)) {
+      sessions = sessions.filter(session =>
+        sessionType.equals(session.asInstanceOf[KyuubiSession].sessionType.toString))
+    }
     if (StringUtils.isNotBlank(users)) {
       val usersSet = users.split(",").toSet
       sessions = sessions.filter(session => usersSet.contains(session.user))
@@ -214,7 +220,8 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
   @Path("operations")
   def listOperations(
       @QueryParam("users") users: String,
-      @QueryParam("sessionHandle") sessionHandle: String): Seq[OperationData] = {
+      @QueryParam("sessionHandle") sessionHandle: String,
+      @QueryParam("sessionType") sessionType: String): Seq[OperationData] = {
     val userName = fe.getSessionUser(Map.empty[String, String])
     val ipAddress = fe.getIpAddress
     info(s"Received listing all of the active operations request from $userName/$ipAddress")
@@ -231,6 +238,11 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
       operations = operations.filter(operation =>
         operation.getSession.handle.equals(SessionHandle.fromUUID(sessionHandle)))
     }
+    if (StringUtils.isNotBlank(sessionType)) {
+      operations = operations.filter(operation =>
+        sessionType.equalsIgnoreCase(
+          operation.getSession.asInstanceOf[KyuubiSession].sessionType.toString))
+    }
     operations
       .map(operation => ApiUtils.operationData(operation.asInstanceOf[KyuubiOperation])).toSeq
   }
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
index 95aa3de02..2360dea60 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
@@ -44,6 +44,7 @@ import org.apache.kyuubi.server.KyuubiRestFrontendService
 import org.apache.kyuubi.server.http.util.HttpAuthUtils
 import org.apache.kyuubi.server.http.util.HttpAuthUtils.AUTHORIZATION_HEADER
 import org.apache.kyuubi.service.authentication.AnonymousAuthenticationProviderImpl
+import org.apache.kyuubi.session.SessionType
 import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2
 
 class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
@@ -230,6 +231,42 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
     operations = response.readEntity(classOf[Seq[OperationData]])
     assert(response.getStatus === 200)
     assert(operations.size == 1)
+
+    response = webTarget.path("api/v1/admin/sessions")
+      .queryParam("sessionType", SessionType.INTERACTIVE.toString)
+      .request()
+      .header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser))
+      .get()
+    sessions = response.readEntity(classOf[Seq[SessionData]])
+    assert(response.getStatus === 200)
+    assert(sessions.size > 0)
+
+    response = webTarget.path("api/v1/admin/sessions")
+      .queryParam("sessionType", SessionType.BATCH.toString)
+      .request()
+      .header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser))
+      .get()
+    sessions = response.readEntity(classOf[Seq[SessionData]])
+    assert(response.getStatus === 200)
+    assert(sessions.size == 0)
+
+    response = webTarget.path("api/v1/admin/operations")
+      .queryParam("sessionType", SessionType.INTERACTIVE.toString)
+      .request()
+      .header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser))
+      .get()
+    operations = response.readEntity(classOf[Seq[OperationData]])
+    assert(response.getStatus === 200)
+    assert(operations.size > 0)
+
+    response = webTarget.path("api/v1/admin/operations")
+      .queryParam("sessionType", SessionType.BATCH.toString)
+      .request()
+      .header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser))
+      .get()
+    operations = response.readEntity(classOf[Seq[OperationData]])
+    assert(response.getStatus === 200)
+    assert(operations.size == 0)
   }
 
   test("list/close operations") {