You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2023/02/28 06:12:08 UTC

[kyuubi] branch master updated: [KYUUBI #4404] Support to list/close sessions in AdminResource

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new d086d79fb [KYUUBI #4404] Support to list/close sessions in AdminResource
d086d79fb is described below

commit d086d79fbb52463f1dfbb4872b88682d5e2d2047
Author: Tianlin Liao <ti...@ebay.com>
AuthorDate: Tue Feb 28 14:11:59 2023 +0800

    [KYUUBI #4404] Support to list/close sessions in AdminResource
    
    ### _Why are the changes needed?_
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4423 from lightning-L/kyuubi-4404.
    
    Closes #4404
    
    1570303a1 [Tianlin Liao] add response message
    e5921c992 [Tianlin Liao] [KYUUBI #4404] Support to list/close sessions in AdminResource
    
    Authored-by: Tianlin Liao <ti...@ebay.com>
    Signed-off-by: fwang12 <fw...@ebay.com>
---
 .../kyuubi/server/api/v1/AdminResource.scala       | 51 +++++++++++++++++++++-
 .../kyuubi/server/api/v1/AdminResourceSuite.scala  | 46 ++++++++++++++++++-
 2 files changed, 94 insertions(+), 3 deletions(-)

diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
index 104dd1045..a85cef16d 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
@@ -24,12 +24,12 @@ import javax.ws.rs.core.{MediaType, Response}
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
-import io.swagger.v3.oas.annotations.media.Content
+import io.swagger.v3.oas.annotations.media.{ArraySchema, Content, Schema}
 import io.swagger.v3.oas.annotations.responses.ApiResponse
 import io.swagger.v3.oas.annotations.tags.Tag
 
 import org.apache.kyuubi.{KYUUBI_VERSION, Logging, Utils}
-import org.apache.kyuubi.client.api.v1.dto.Engine
+import org.apache.kyuubi.client.api.v1.dto.{Engine, SessionData}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.ha.HighAvailabilityConf.HA_NAMESPACE
@@ -37,6 +37,7 @@ import org.apache.kyuubi.ha.client.{DiscoveryPaths, ServiceNodeInfo}
 import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
 import org.apache.kyuubi.server.KyuubiServer
 import org.apache.kyuubi.server.api.ApiRequestContext
+import org.apache.kyuubi.session.SessionHandle
 
 @Tag(name = "Admin")
 @Produces(Array(MediaType.APPLICATION_JSON))
@@ -102,6 +103,52 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
     Response.ok(s"Refresh the unlimited users successfully.").build()
   }
 
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.APPLICATION_JSON,
+      array = new ArraySchema(schema = new Schema(implementation = classOf[SessionData])))),
+    description = "get the list of all live sessions")
+  @GET
+  @Path("sessions")
+  def sessions(): Seq[SessionData] = {
+    val userName = fe.getSessionUser(Map.empty[String, String])
+    val ipAddress = fe.getIpAddress
+    info(s"Received listing all live sessions request from $userName/$ipAddress")
+    if (!isAdministrator(userName)) {
+      throw new NotAllowedException(
+        s"$userName is not allowed to list all live sessions")
+    }
+    fe.be.sessionManager.allSessions().map { session =>
+      new SessionData(
+        session.handle.identifier.toString,
+        session.user,
+        session.ipAddress,
+        session.conf.asJava,
+        session.createTime,
+        session.lastAccessTime - session.createTime,
+        session.getNoOperationTime)
+    }.toSeq
+  }
+
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(mediaType = MediaType.APPLICATION_JSON)),
+    description = "Close a session")
+  @DELETE
+  @Path("sessions/{sessionHandle}")
+  def closeSession(@PathParam("sessionHandle") sessionHandleStr: String): Response = {
+    val userName = fe.getSessionUser(Map.empty[String, String])
+    val ipAddress = fe.getIpAddress
+    info(s"Received closing a session request from $userName/$ipAddress")
+    if (!isAdministrator(userName)) {
+      throw new NotAllowedException(
+        s"$userName is not allowed to close the session $sessionHandleStr")
+    }
+    fe.be.closeSession(SessionHandle.fromUUID(sessionHandleStr))
+    Response.ok(s"Session $sessionHandleStr is closed successfully.").build()
+  }
+
   @ApiResponse(
     responseCode = "200",
     content = Array(new Content(mediaType = MediaType.APPLICATION_JSON)),
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
index ffd4a9140..9c050ba31 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
@@ -18,13 +18,17 @@
 package org.apache.kyuubi.server.api.v1
 
 import java.util.{Base64, UUID}
+import javax.ws.rs.client.Entity
 import javax.ws.rs.core.{GenericType, MediaType}
 
+import scala.collection.JavaConverters._
+
 import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
 
 import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite, RestFrontendTestHelper, Utils}
-import org.apache.kyuubi.client.api.v1.dto.Engine
+import org.apache.kyuubi.client.api.v1.dto.{Engine, SessionData, SessionHandle, SessionOpenRequest}
 import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_CONNECTION_URL_KEY
 import org.apache.kyuubi.engine.{ApplicationState, EngineRef, KyuubiApplicationManager}
 import org.apache.kyuubi.engine.EngineType.SPARK_SQL
 import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, USER}
@@ -123,6 +127,46 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
     assert(200 == response.getStatus)
   }
 
+  test("list/close sessions") {
+    val requestObj = new SessionOpenRequest(
+      1,
+      Map("testConfig" -> "testValue").asJava)
+
+    var response = webTarget.path("api/v1/sessions")
+      .request(MediaType.APPLICATION_JSON_TYPE)
+      .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
+
+    val adminUser = Utils.currentUser
+    val encodeAuthorization = new String(
+      Base64.getEncoder.encode(
+        s"$adminUser:".getBytes()),
+      "UTF-8")
+
+    // get session list
+    var response2 = webTarget.path("api/v1/admin/sessions").request()
+      .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
+      .get()
+    assert(200 == response2.getStatus)
+    val sessions1 = response2.readEntity(new GenericType[Seq[SessionData]]() {})
+    assert(sessions1.nonEmpty)
+    assert(sessions1.head.getConf.get(KYUUBI_SESSION_CONNECTION_URL_KEY) === fe.connectionUrl)
+
+    // close an opened session
+    val sessionHandle = response.readEntity(classOf[SessionHandle]).getIdentifier
+    response = webTarget.path(s"api/v1/admin/sessions/$sessionHandle").request()
+      .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
+      .delete()
+    assert(200 == response.getStatus)
+
+    // get session list again
+    response2 = webTarget.path("api/v1/admin/sessions").request()
+      .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
+      .get()
+    assert(200 == response2.getStatus)
+    val sessions2 = response2.readEntity(classOf[Seq[SessionData]])
+    assert(sessions2.isEmpty)
+  }
+
   test("delete engine - user share level") {
     val id = UUID.randomUUID().toString
     conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)