You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2021/12/06 05:17:45 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #1445] [Kyuubi #1444] Implement api: /${version}/sessions/${session_identifier}/operations/${operation_identifier}

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 73ac1b6  [KYUUBI #1445] [Kyuubi #1444] Implement api: /${version}/sessions/${session_identifier}/operations/${operation_identifier}
73ac1b6 is described below

commit 73ac1b63dbab7e46b34a495f4cdf17211c4f99af
Author: simon <zh...@cvte.com>
AuthorDate: Mon Dec 6 13:17:33 2021 +0800

    [KYUUBI #1445] [Kyuubi #1444] Implement api: /${version}/sessions/${session_identifier}/operations/${operation_identifier}
    
    ### _Why are the changes needed?_
    This is a subtask of umbrella issue #KPIP-1
    
    /${version}/sessions/${session_identifier}/operations/${operation_identifier}
    - desc: get an operation with a given session identifier and operation identifier
    - method: GET
    - params: none
    - returns: an instance of OperationHandle
    
    /${version}/sessions/${session_identifier}/operations/${operation_identifier}
    - desc: remove operation based on a given session identifier and operation identifier
    - method: DELETE
    - params: none
    - returns: an instance of OperationHandle
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #1445 from simon824/r5.
    
    Closes #1445
    
    196ea4c7 [simon] codestyle
    df1c20e8 [simon] remove isHasOperation
    6bdebab8 [simon] fix
    609aba1b [simon] codestyle
    dbae387d [simon] codestyle
    7eb9eeb8 [simon] Merge remote-tracking branch 'upstream/master' into r5
    8a9e78ca [simon] codestyle
    15b00aa9 [simon] operation
    b154cbc7 [simon] init
    
    Authored-by: simon <zh...@cvte.com>
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 .../kyuubi/server/api/v1/SessionsResource.scala    | 94 ++++++++++++++++++----
 .../org/apache/kyuubi/server/api/v1/dto.scala      |  6 ++
 .../server/api/v1/SessionsResourceSuite.scala      | 90 ++++++++++++++++++++-
 3 files changed, 174 insertions(+), 16 deletions(-)

diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
index d52dabf..8e30377 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
@@ -18,8 +18,7 @@
 package org.apache.kyuubi.server.api.v1
 
 import java.util.UUID
-import javax.ws.rs._
-import javax.ws.rs.{Consumes, DELETE, GET, Path, PathParam, POST, Produces}
+import javax.ws.rs.{Consumes, DELETE, GET, Path, PathParam, POST, Produces, _}
 import javax.ws.rs.core.{MediaType, Response}
 
 import scala.collection.JavaConverters._
@@ -32,7 +31,7 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType, TProtocolVersion}
 
 import org.apache.kyuubi.Utils.error
 import org.apache.kyuubi.cli.HandleIdentifier
-import org.apache.kyuubi.operation.OperationHandle
+import org.apache.kyuubi.operation.{OperationHandle, OperationType}
 import org.apache.kyuubi.server.api.ApiRequestContext
 import org.apache.kyuubi.session.SessionHandle
 
@@ -62,7 +61,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
   @GET
   @Path("{sessionHandle}")
   def sessionInfo(@PathParam("sessionHandle") sessionHandleStr: String): SessionDetail = {
-    val sessionHandle = getSessionHandle(sessionHandleStr)
+    val sessionHandle = parseSessionHandle(sessionHandleStr)
     try {
       val session = backendService.sessionManager.getSession(sessionHandle)
       SessionDetail(
@@ -92,7 +91,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
   def getInfo(
       @PathParam("sessionHandle") sessionHandleStr: String,
       @PathParam("infoType") infoType: Int): InfoDetail = {
-    val sessionHandle = getSessionHandle(sessionHandleStr)
+    val sessionHandle = parseSessionHandle(sessionHandleStr)
     val info = TGetInfoType.findByValue(infoType)
 
     try {
@@ -153,7 +152,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
   @DELETE
   @Path("{sessionHandle}")
   def closeSession(@PathParam("sessionHandle") sessionHandleStr: String): Response = {
-    val sessionHandle = getSessionHandle(sessionHandleStr)
+    val sessionHandle = parseSessionHandle(sessionHandleStr)
     backendService.closeSession(sessionHandle)
     Response.ok().build()
   }
@@ -168,7 +167,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
   def executeStatement(
       @PathParam("sessionHandle") sessionHandleStr: String,
       request: StatementRequest): OperationHandle = {
-    val sessionHandle = getSessionHandle(sessionHandleStr)
+    val sessionHandle = parseSessionHandle(sessionHandleStr)
     try {
       backendService.executeStatement(
         sessionHandle,
@@ -189,7 +188,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
   @POST
   @Path("{sessionHandle}/operations/typeInfo")
   def getTypeInfo(@PathParam("sessionHandle") sessionHandleStr: String): OperationHandle = {
-    val sessionHandle = getSessionHandle(sessionHandleStr)
+    val sessionHandle = parseSessionHandle(sessionHandleStr)
     try {
       backendService.getTypeInfo(sessionHandle)
     } catch {
@@ -206,7 +205,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
   @POST
   @Path("{sessionHandle}/operations/catalogs")
   def getCatalogs(@PathParam("sessionHandle") sessionHandleStr: String): OperationHandle = {
-    val sessionHandle = getSessionHandle(sessionHandleStr)
+    val sessionHandle = parseSessionHandle(sessionHandleStr)
     try {
       backendService.getCatalogs(sessionHandle)
     } catch {
@@ -225,7 +224,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
   def getSchemas(
       @PathParam("sessionHandle") sessionHandleStr: String,
       request: GetSchemasRequest): OperationHandle = {
-    val sessionHandle = getSessionHandle(sessionHandleStr)
+    val sessionHandle = parseSessionHandle(sessionHandleStr)
     try {
       backendService.getSchemas(sessionHandle, request.catalogName, request.schemaName)
     } catch {
@@ -244,7 +243,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
   def getTables(
       @PathParam("sessionHandle") sessionHandleStr: String,
       request: GetTablesRequest): OperationHandle = {
-    val sessionHandle = getSessionHandle(sessionHandleStr)
+    val sessionHandle = parseSessionHandle(sessionHandleStr)
     try {
       backendService.getTables(
         sessionHandle,
@@ -266,7 +265,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
   @POST
   @Path("{sessionHandle}/operations/tableTypes")
   def getTableTypes(@PathParam("sessionHandle") sessionHandleStr: String): OperationHandle = {
-    val sessionHandle = getSessionHandle(sessionHandleStr)
+    val sessionHandle = parseSessionHandle(sessionHandleStr)
     try {
       backendService.getTableTypes(sessionHandle)
     } catch {
@@ -285,7 +284,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
   def getColumns(
       @PathParam("sessionHandle") sessionHandleStr: String,
       request: GetColumnsRequest): OperationHandle = {
-    val sessionHandle = getSessionHandle(sessionHandleStr)
+    val sessionHandle = parseSessionHandle(sessionHandleStr)
     try {
       backendService.getColumns(
         sessionHandle,
@@ -309,7 +308,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
   def getFunctions(
       @PathParam("sessionHandle") sessionHandleStr: String,
       request: GetFunctionsRequest): OperationHandle = {
-    val sessionHandle = getSessionHandle(sessionHandleStr)
+    val sessionHandle = parseSessionHandle(sessionHandleStr)
     try {
       backendService.getFunctions(
         sessionHandle,
@@ -322,7 +321,72 @@ private[v1] class SessionsResource extends ApiRequestContext {
     }
   }
 
-  def getSessionHandle(sessionHandleStr: String): SessionHandle = {
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.APPLICATION_JSON)),
+    description = "Close an operation")
+  @DELETE
+  @Path("{sessionHandle}/operations/{operationHandle}")
+  def closeOperation(
+      @PathParam("sessionHandle") sessionHandleStr: String,
+      @PathParam("operationHandle") operationHandleStr: String): OperationHandle = {
+    val sessionHandle = parseSessionHandle(sessionHandleStr)
+    val operationHandle = parseOperationHandle(operationHandleStr)
+    try {
+      backendService.sessionManager.getSession(sessionHandle).closeOperation(operationHandle)
+      operationHandle
+    } catch {
+      case NonFatal(_) =>
+        throw new NotFoundException(s"Error closing an operation")
+    }
+  }
+
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.APPLICATION_JSON)),
+    description =
+      "Get an operation detail with a given session identifier and operation identifier")
+  @GET
+  @Path("{sessionHandle}/operations/{operationHandle}")
+  def getOperationHandle(
+      @PathParam("sessionHandle") sessionHandleStr: String,
+      @PathParam("operationHandle") operationHandleStr: String): OperationDetail = {
+    val operationHandle = parseOperationHandle(operationHandleStr)
+    try {
+      val operation = backendService.sessionManager.operationManager.getOperation(operationHandle)
+      OperationDetail(operation.shouldRunAsync, operation.isTimedOut, operation.getStatus)
+    } catch {
+      case NonFatal(e) =>
+        throw new NotFoundException(s"Error closing an operation")
+    }
+  }
+
+  def parseOperationHandle(operationHandleStr: String): OperationHandle = {
+    try {
+      val operationHandleParts = operationHandleStr.split("\\|")
+      require(
+        operationHandleParts.size == 4,
+        s"Expected 4 parameters but found ${operationHandleParts.size}.")
+
+      val handleIdentifier = new HandleIdentifier(
+        UUID.fromString(operationHandleParts(0)),
+        UUID.fromString(operationHandleParts(1)))
+
+      val protocolVersion = TProtocolVersion.findByValue(operationHandleParts(2).toInt)
+      val operationType = OperationType.withName(operationHandleParts(3))
+      val operationHandle = new OperationHandle(handleIdentifier, operationType, protocolVersion)
+
+      operationHandle
+    } catch {
+      case NonFatal(e) =>
+        error(s"Error getting operationHandle by $operationHandleStr.", e)
+        throw new NotFoundException(s"Error getting operationHandle by $operationHandleStr.")
+    }
+  }
+
+  def parseSessionHandle(sessionHandleStr: String): SessionHandle = {
     try {
       val splitSessionHandle = sessionHandleStr.split("\\|")
       val handleIdentifier = new HandleIdentifier(
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
index 6721370..f458446 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
@@ -17,6 +17,7 @@
 
 package org.apache.kyuubi.server.api.v1
 
+import org.apache.kyuubi.operation.OperationStatus
 import org.apache.kyuubi.session.SessionHandle
 
 case class SessionOpenCount(openSessionCount: Int)
@@ -77,3 +78,8 @@ case class GetFunctionsRequest(
     catalogName: String,
     schemaName: String,
     functionName: String)
+
+case class OperationDetail(
+    shouldRunAsync: Boolean,
+    isTimedOut: Boolean,
+    operationStatus: OperationStatus)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
index d489ee5..a84e606 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
@@ -24,7 +24,7 @@ import javax.ws.rs.core.{MediaType, Response}
 import scala.concurrent.duration._
 
 import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
-import org.apache.kyuubi.operation.{OperationHandle, OperationType}
+import org.apache.kyuubi.operation.{OperationHandle, OperationState, OperationType}
 import org.apache.kyuubi.session.SessionHandle
 
 class SessionsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
@@ -300,4 +300,92 @@ class SessionsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
       assert(operationHandle.typ == OperationType.GET_FUNCTIONS)
     }
   }
+
+  test("test get an operation status by identifier") {
+    val requestObj = SessionOpenRequest(
+      1,
+      "admin",
+      "123456",
+      "localhost",
+      Map("testConfig" -> "testValue"))
+
+    withKyuubiRestServer { (_, _, _, webTarget) =>
+      var response: Response = webTarget.path("api/v1/sessions")
+        .request(MediaType.APPLICATION_JSON_TYPE)
+        .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
+
+      val sessionHandle = response.readEntity(classOf[SessionHandle])
+      val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
+        s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
+
+      val pathPrefix = s"api/v1/sessions/$serializedSessionHandle"
+
+      response = webTarget.path(s"$pathPrefix/operations/catalogs")
+        .request(MediaType.APPLICATION_JSON_TYPE)
+        .post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
+      assert(200 == response.getStatus)
+      var operationHandle = response.readEntity(classOf[OperationHandle])
+      assert(operationHandle.typ == OperationType.GET_CATALOGS)
+
+      val serializedOperationHandle = s"${operationHandle.identifier.publicId}|" +
+        s"${operationHandle.identifier.secretId}|${operationHandle.protocol.getValue}|" +
+        s"${operationHandle.typ.toString}"
+
+      response = webTarget.path(s"$pathPrefix/operations/$serializedOperationHandle")
+        .request(MediaType.APPLICATION_JSON_TYPE).get()
+      val operationDetail = response.readEntity(classOf[OperationDetail])
+      assert(200 == response.getStatus)
+      assert(operationDetail.operationStatus.state == OperationState.FINISHED)
+
+      // Invalid operationHandleStr
+      val invalidOperationHandle = s"${operationHandle.identifier.publicId}|" +
+        s"${operationHandle.identifier.secretId}|${operationHandle.protocol.getValue}|GET_TYPE_INFO"
+      response = webTarget.path(s"$pathPrefix/operations/$invalidOperationHandle")
+        .request(MediaType.APPLICATION_JSON_TYPE).get()
+      assert(404 == response.getStatus)
+
+    }
+  }
+
+  test("test close an operation") {
+    val requestObj = SessionOpenRequest(
+      1,
+      "admin",
+      "123456",
+      "localhost",
+      Map("testConfig" -> "testValue"))
+
+    withKyuubiRestServer { (_, _, _, webTarget) =>
+      var response: Response = webTarget.path("api/v1/sessions")
+        .request(MediaType.APPLICATION_JSON_TYPE)
+        .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
+
+      val sessionHandle = response.readEntity(classOf[SessionHandle])
+      val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
+        s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
+
+      val pathPrefix = s"api/v1/sessions/$serializedSessionHandle"
+
+      response = webTarget.path(s"$pathPrefix/operations/catalogs")
+        .request(MediaType.APPLICATION_JSON_TYPE)
+        .post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
+      assert(200 == response.getStatus)
+      var operationHandle = response.readEntity(classOf[OperationHandle])
+      assert(operationHandle.typ == OperationType.GET_CATALOGS)
+
+      val serializedOperationHandle = s"${operationHandle.identifier.publicId}|" +
+        s"${operationHandle.identifier.secretId}|${operationHandle.protocol.getValue}|" +
+        s"${operationHandle.typ.toString}"
+
+      response = webTarget.path(s"$pathPrefix/operations/$serializedOperationHandle")
+        .request(MediaType.APPLICATION_JSON_TYPE).delete()
+      assert(200 == response.getStatus)
+
+      // verify operationHandle
+      response = webTarget.path(s"$pathPrefix/operations/$serializedOperationHandle")
+        .request(MediaType.APPLICATION_JSON_TYPE).get()
+      assert(404 == response.getStatus)
+
+    }
+  }
 }