You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2021/12/08 10:12:14 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #1516] Implement api: /${version}/operations/${operation_identifier}
This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 81b14ea [KYUUBI #1516] Implement api: /${version}/operations/${operation_identifier}
81b14ea is described below
commit 81b14eadc837b76ffc453cb8ce9e86363a2d27b3
Author: simon <zh...@cvte.com>
AuthorDate: Wed Dec 8 18:12:02 2021 +0800
[KYUUBI #1516] Implement api: /${version}/operations/${operation_identifier}
### _Why are the changes needed?_
This is a subtask of umbrella issue #KPIP-1
/${version}/operations/${operation_identifier}
- desc: apply an action for an operation based on a given session identifier and operation identifier
- method: PUT
- params:
- action: "CANCEL" or "ADD" or "CLOSE"
- returns: status code
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #1517 from simon824/oa.
Closes #1516
94742c67 [simon] fix desc
fffe3e02 [simon] fix desc
f9415507 [simon] fix
f455995f [simon] operationAction
72bcf5f3 [simon] init
Authored-by: simon <zh...@cvte.com>
Signed-off-by: Kent Yao <ya...@apache.org>
---
.../kyuubi/server/api/v1/OperationsResource.scala | 29 +++++++-
.../org/apache/kyuubi/server/api/v1/dto.scala | 2 +
.../server/api/v1/OperationsResourceSuite.scala | 87 ++++++++++++++--------
3 files changed, 88 insertions(+), 30 deletions(-)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
index a910979..003499d 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
@@ -18,7 +18,7 @@
package org.apache.kyuubi.server.api.v1
import javax.ws.rs.{GET, Path, PathParam, Produces, _}
-import javax.ws.rs.core.MediaType
+import javax.ws.rs.core.{MediaType, Response}
import scala.util.control.NonFatal
@@ -26,6 +26,7 @@ import io.swagger.v3.oas.annotations.media.Content
import io.swagger.v3.oas.annotations.responses.ApiResponse
import io.swagger.v3.oas.annotations.tags.Tag
+import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.operation.OperationHandle.parseOperationHandle
import org.apache.kyuubi.server.api.ApiRequestContext
@@ -52,4 +53,30 @@ private[v1] class OperationsResource extends ApiRequestContext {
throw new NotFoundException(s"Error getting an operation detail")
}
}
+
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.APPLICATION_JSON)),
+ description =
+ "apply an action for an operation")
+ @PUT
+ @Path("{operationHandle}")
+ def applyOpAction(
+ request: OpActionRequest,
+ @PathParam("operationHandle") operationHandleStr: String): Response = {
+ try {
+ val operationHandle = parseOperationHandle(operationHandleStr)
+ request.action.toLowerCase() match {
+ case "cancel" => backendService.cancelOperation(operationHandle)
+ case "close" => backendService.closeOperation(operationHandle)
+ case _ => throw KyuubiSQLException(s"Invalid action ${request.action}")
+ }
+ Response.ok().build()
+ } catch {
+ case NonFatal(_) =>
+ throw new NotFoundException(s"Error applying ${request.action} " +
+ s"for operation handle $operationHandleStr")
+ }
+ }
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
index f458446..9e32300 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
@@ -83,3 +83,5 @@ case class OperationDetail(
shouldRunAsync: Boolean,
isTimedOut: Boolean,
operationStatus: OperationStatus)
+
+case class OpActionRequest(action: String)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala
index 98440c0..b342b64 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala
@@ -17,56 +17,85 @@
package org.apache.kyuubi.server.api.v1
-import javax.ws.rs.client.Entity
+import javax.ws.rs.client.{Entity, WebTarget}
import javax.ws.rs.core.{MediaType, Response}
import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
-import org.apache.kyuubi.operation.{OperationHandle, OperationState, OperationType}
+import org.apache.kyuubi.operation.{OperationHandle, OperationState}
import org.apache.kyuubi.session.SessionHandle
class OperationsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
test("test get an operation detail by identifier") {
- val requestObj = SessionOpenRequest(
- 1,
- "admin",
- "123456",
- "localhost",
- Map("testConfig" -> "testValue"))
-
withKyuubiRestServer { (_, _, _, webTarget) =>
- var response: Response = webTarget.path("api/v1/sessions")
- .request(MediaType.APPLICATION_JSON_TYPE)
- .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
+ val opHandleStr = getOpHandleStr(webTarget, "catalogs")
+
+ var response = webTarget.path(s"api/v1/operations/$opHandleStr")
+ .request(MediaType.APPLICATION_JSON_TYPE).get()
+ val operationDetail = response.readEntity(classOf[OperationDetail])
+ assert(200 == response.getStatus)
+ assert(operationDetail.operationStatus.state == OperationState.FINISHED)
+
+ // Invalid operationHandleStr
+ val invalidOperationHandle = opHandleStr.replaceAll("GET_CATALOGS", "GET_TYPE_INFO")
+ response = webTarget.path(s"api/v1/operations/$invalidOperationHandle")
+ .request(MediaType.APPLICATION_JSON_TYPE).get()
+ assert(404 == response.getStatus)
- val sessionHandle = response.readEntity(classOf[SessionHandle])
- val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
- s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
+ }
+ }
- response = webTarget.path(s"api/v1/sessions/$serializedSessionHandle/operations/catalogs")
+ test("test apply an action for an operation") {
+ withKyuubiRestServer { (_, _, _, webTarget: WebTarget) =>
+ val opHandleStr = getOpHandleStr(webTarget, "catalogs")
+
+ var response = webTarget.path(s"api/v1/operations/$opHandleStr")
.request(MediaType.APPLICATION_JSON_TYPE)
- .post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
+ .put(Entity.entity(OpActionRequest("cancel"), MediaType.APPLICATION_JSON_TYPE))
assert(200 == response.getStatus)
- var operationHandle = response.readEntity(classOf[OperationHandle])
- assert(operationHandle.typ == OperationType.GET_CATALOGS)
-
- val serializedOperationHandle = s"${operationHandle.identifier.publicId}|" +
- s"${operationHandle.identifier.secretId}|${operationHandle.protocol.getValue}|" +
- s"${operationHandle.typ.toString}"
- response = webTarget.path(s"api/v1/operations/$serializedOperationHandle")
+ response = webTarget.path(s"api/v1/operations/$opHandleStr")
.request(MediaType.APPLICATION_JSON_TYPE).get()
val operationDetail = response.readEntity(classOf[OperationDetail])
+ assert(operationDetail.operationStatus.state == OperationState.FINISHED ||
+ operationDetail.operationStatus.state == OperationState.CANCELED)
+
+ response = webTarget.path(s"api/v1/operations/$opHandleStr")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .put(Entity.entity(OpActionRequest("close"), MediaType.APPLICATION_JSON_TYPE))
assert(200 == response.getStatus)
- assert(operationDetail.operationStatus.state == OperationState.FINISHED)
- // Invalid operationHandleStr
- val invalidOperationHandle = s"${operationHandle.identifier.publicId}|" +
- s"${operationHandle.identifier.secretId}|${operationHandle.protocol.getValue}|GET_TYPE_INFO"
- response = webTarget.path(s"api/v1/operations/$invalidOperationHandle")
+ response = webTarget.path(s"api/v1/operations/$opHandleStr")
.request(MediaType.APPLICATION_JSON_TYPE).get()
assert(404 == response.getStatus)
}
}
+
+ def getOpHandleStr(webTarget: WebTarget, operationType: String): String = {
+ val requestObj = SessionOpenRequest(
+ 1,
+ "admin",
+ "123456",
+ "localhost",
+ Map("testConfig" -> "testValue"))
+
+ var response: Response = webTarget.path("api/v1/sessions")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
+ val sessionHandle = response.readEntity(classOf[SessionHandle])
+ val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
+ s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
+
+ response = webTarget.path(s"api/v1/sessions/$serializedSessionHandle/operations/$operationType")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
+ assert(200 == response.getStatus)
+ val operationHandle = response.readEntity(classOf[OperationHandle])
+
+ s"${operationHandle.identifier.publicId}|" +
+ s"${operationHandle.identifier.secretId}|${operationHandle.protocol.getValue}|" +
+ s"${operationHandle.typ.toString}"
+
+ }
}