You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2021/12/08 10:12:14 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #1516] Implement api: /${version}/operations/${operation_identifier}

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 81b14ea  [KYUUBI #1516] Implement api: /${version}/operations/${operation_identifier}
81b14ea is described below

commit 81b14eadc837b76ffc453cb8ce9e86363a2d27b3
Author: simon <zh...@cvte.com>
AuthorDate: Wed Dec 8 18:12:02 2021 +0800

    [KYUUBI #1516] Implement api: /${version}/operations/${operation_identifier}
    
    ### _Why are the changes needed?_
    This is a subtask of umbrella issue #KPIP-1
    
    /${version}/operations/${operation_identifier}
    - desc: apply an action for an operation based on a given session identifier and operation identifier
    - method: PUT
    - params:
    - action: "CANCEL" or "ADD" or "CLOSE"
    - returns: status code
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #1517 from simon824/oa.
    
    Closes #1516
    
    94742c67 [simon] fix desc
    fffe3e02 [simon] fix desc
    f9415507 [simon] fix
    f455995f [simon] operationAction
    72bcf5f3 [simon] init
    
    Authored-by: simon <zh...@cvte.com>
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 .../kyuubi/server/api/v1/OperationsResource.scala  | 29 +++++++-
 .../org/apache/kyuubi/server/api/v1/dto.scala      |  2 +
 .../server/api/v1/OperationsResourceSuite.scala    | 87 ++++++++++++++--------
 3 files changed, 88 insertions(+), 30 deletions(-)

diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
index a910979..003499d 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
@@ -18,7 +18,7 @@
 package org.apache.kyuubi.server.api.v1
 
 import javax.ws.rs.{GET, Path, PathParam, Produces, _}
-import javax.ws.rs.core.MediaType
+import javax.ws.rs.core.{MediaType, Response}
 
 import scala.util.control.NonFatal
 
@@ -26,6 +26,7 @@ import io.swagger.v3.oas.annotations.media.Content
 import io.swagger.v3.oas.annotations.responses.ApiResponse
 import io.swagger.v3.oas.annotations.tags.Tag
 
+import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.operation.OperationHandle.parseOperationHandle
 import org.apache.kyuubi.server.api.ApiRequestContext
 
@@ -52,4 +53,30 @@ private[v1] class OperationsResource extends ApiRequestContext {
         throw new NotFoundException(s"Error getting an operation detail")
     }
   }
+
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.APPLICATION_JSON)),
+    description =
+      "apply an action for an operation")
+  @PUT
+  @Path("{operationHandle}")
+  def applyOpAction(
+      request: OpActionRequest,
+      @PathParam("operationHandle") operationHandleStr: String): Response = {
+    try {
+      val operationHandle = parseOperationHandle(operationHandleStr)
+      request.action.toLowerCase() match {
+        case "cancel" => backendService.cancelOperation(operationHandle)
+        case "close" => backendService.closeOperation(operationHandle)
+        case _ => throw KyuubiSQLException(s"Invalid action ${request.action}")
+      }
+      Response.ok().build()
+    } catch {
+      case NonFatal(_) =>
+        throw new NotFoundException(s"Error applying ${request.action} " +
+          s"for operation handle $operationHandleStr")
+    }
+  }
 }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
index f458446..9e32300 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
@@ -83,3 +83,5 @@ case class OperationDetail(
     shouldRunAsync: Boolean,
     isTimedOut: Boolean,
     operationStatus: OperationStatus)
+
+case class OpActionRequest(action: String)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala
index 98440c0..b342b64 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala
@@ -17,56 +17,85 @@
 
 package org.apache.kyuubi.server.api.v1
 
-import javax.ws.rs.client.Entity
+import javax.ws.rs.client.{Entity, WebTarget}
 import javax.ws.rs.core.{MediaType, Response}
 
 import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
-import org.apache.kyuubi.operation.{OperationHandle, OperationState, OperationType}
+import org.apache.kyuubi.operation.{OperationHandle, OperationState}
 import org.apache.kyuubi.session.SessionHandle
 
 class OperationsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
 
   test("test get an operation detail by identifier") {
-    val requestObj = SessionOpenRequest(
-      1,
-      "admin",
-      "123456",
-      "localhost",
-      Map("testConfig" -> "testValue"))
-
     withKyuubiRestServer { (_, _, _, webTarget) =>
-      var response: Response = webTarget.path("api/v1/sessions")
-        .request(MediaType.APPLICATION_JSON_TYPE)
-        .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
+      val opHandleStr = getOpHandleStr(webTarget, "catalogs")
+
+      var response = webTarget.path(s"api/v1/operations/$opHandleStr")
+        .request(MediaType.APPLICATION_JSON_TYPE).get()
+      val operationDetail = response.readEntity(classOf[OperationDetail])
+      assert(200 == response.getStatus)
+      assert(operationDetail.operationStatus.state == OperationState.FINISHED)
+
+      // Invalid operationHandleStr
+      val invalidOperationHandle = opHandleStr.replaceAll("GET_CATALOGS", "GET_TYPE_INFO")
+      response = webTarget.path(s"api/v1/operations/$invalidOperationHandle")
+        .request(MediaType.APPLICATION_JSON_TYPE).get()
+      assert(404 == response.getStatus)
 
-      val sessionHandle = response.readEntity(classOf[SessionHandle])
-      val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
-        s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
+    }
+  }
 
-      response = webTarget.path(s"api/v1/sessions/$serializedSessionHandle/operations/catalogs")
+  test("test apply an action for an operation") {
+    withKyuubiRestServer { (_, _, _, webTarget: WebTarget) =>
+      val opHandleStr = getOpHandleStr(webTarget, "catalogs")
+
+      var response = webTarget.path(s"api/v1/operations/$opHandleStr")
         .request(MediaType.APPLICATION_JSON_TYPE)
-        .post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
+        .put(Entity.entity(OpActionRequest("cancel"), MediaType.APPLICATION_JSON_TYPE))
       assert(200 == response.getStatus)
-      var operationHandle = response.readEntity(classOf[OperationHandle])
-      assert(operationHandle.typ == OperationType.GET_CATALOGS)
-
-      val serializedOperationHandle = s"${operationHandle.identifier.publicId}|" +
-        s"${operationHandle.identifier.secretId}|${operationHandle.protocol.getValue}|" +
-        s"${operationHandle.typ.toString}"
 
-      response = webTarget.path(s"api/v1/operations/$serializedOperationHandle")
+      response = webTarget.path(s"api/v1/operations/$opHandleStr")
         .request(MediaType.APPLICATION_JSON_TYPE).get()
       val operationDetail = response.readEntity(classOf[OperationDetail])
+      assert(operationDetail.operationStatus.state == OperationState.FINISHED ||
+        operationDetail.operationStatus.state == OperationState.CANCELED)
+
+      response = webTarget.path(s"api/v1/operations/$opHandleStr")
+        .request(MediaType.APPLICATION_JSON_TYPE)
+        .put(Entity.entity(OpActionRequest("close"), MediaType.APPLICATION_JSON_TYPE))
       assert(200 == response.getStatus)
-      assert(operationDetail.operationStatus.state == OperationState.FINISHED)
 
-      // Invalid operationHandleStr
-      val invalidOperationHandle = s"${operationHandle.identifier.publicId}|" +
-        s"${operationHandle.identifier.secretId}|${operationHandle.protocol.getValue}|GET_TYPE_INFO"
-      response = webTarget.path(s"api/v1/operations/$invalidOperationHandle")
+      response = webTarget.path(s"api/v1/operations/$opHandleStr")
         .request(MediaType.APPLICATION_JSON_TYPE).get()
       assert(404 == response.getStatus)
 
     }
   }
+
+  def getOpHandleStr(webTarget: WebTarget, operationType: String): String = {
+    val requestObj = SessionOpenRequest(
+      1,
+      "admin",
+      "123456",
+      "localhost",
+      Map("testConfig" -> "testValue"))
+
+    var response: Response = webTarget.path("api/v1/sessions")
+      .request(MediaType.APPLICATION_JSON_TYPE)
+      .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
+    val sessionHandle = response.readEntity(classOf[SessionHandle])
+    val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
+      s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
+
+    response = webTarget.path(s"api/v1/sessions/$serializedSessionHandle/operations/$operationType")
+      .request(MediaType.APPLICATION_JSON_TYPE)
+      .post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
+    assert(200 == response.getStatus)
+    val operationHandle = response.readEntity(classOf[OperationHandle])
+
+    s"${operationHandle.identifier.publicId}|" +
+      s"${operationHandle.identifier.secretId}|${operationHandle.protocol.getValue}|" +
+      s"${operationHandle.typ.toString}"
+
+  }
 }