You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2021/12/06 05:17:45 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #1445] [Kyuubi #1444] Implement api: /${version}/sessions/${session_identifier}/operations/${operation_identifier}
This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 73ac1b6 [KYUUBI #1445] [Kyuubi #1444] Implement api: /${version}/sessions/${session_identifier}/operations/${operation_identifier}
73ac1b6 is described below
commit 73ac1b63dbab7e46b34a495f4cdf17211c4f99af
Author: simon <zh...@cvte.com>
AuthorDate: Mon Dec 6 13:17:33 2021 +0800
[KYUUBI #1445] [Kyuubi #1444] Implement api: /${version}/sessions/${session_identifier}/operations/${operation_identifier}
### _Why are the changes needed?_
This is a subtask of umbrella issue #KPIP-1
/${version}/sessions/${session_identifier}/operations/${operation_identifier}
- desc: get an operation with a given session identifier and operation identifier
- method: GET
- params: none
- returns: an instance of OperationHandle
/${version}/sessions/${session_identifier}/operations/${operation_identifier}
- desc: remove operation based on a given session identifier and operation identifier
- method: DELETE
- params: none
- returns: an instance of OperationHandle
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #1445 from simon824/r5.
Closes #1445
196ea4c7 [simon] codestyle
df1c20e8 [simon] remove isHasOperation
6bdebab8 [simon] fix
609aba1b [simon] codestyle
dbae387d [simon] codestyle
7eb9eeb8 [simon] Merge remote-tracking branch 'upstream/master' into r5
8a9e78ca [simon] codestyle
15b00aa9 [simon] operation
b154cbc7 [simon] init
Authored-by: simon <zh...@cvte.com>
Signed-off-by: Kent Yao <ya...@apache.org>
---
.../kyuubi/server/api/v1/SessionsResource.scala | 94 ++++++++++++++++++----
.../org/apache/kyuubi/server/api/v1/dto.scala | 6 ++
.../server/api/v1/SessionsResourceSuite.scala | 90 ++++++++++++++++++++-
3 files changed, 174 insertions(+), 16 deletions(-)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
index d52dabf..8e30377 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
@@ -18,8 +18,7 @@
package org.apache.kyuubi.server.api.v1
import java.util.UUID
-import javax.ws.rs._
-import javax.ws.rs.{Consumes, DELETE, GET, Path, PathParam, POST, Produces}
+import javax.ws.rs.{Consumes, DELETE, GET, Path, PathParam, POST, Produces, _}
import javax.ws.rs.core.{MediaType, Response}
import scala.collection.JavaConverters._
@@ -32,7 +31,7 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType, TProtocolVersion}
import org.apache.kyuubi.Utils.error
import org.apache.kyuubi.cli.HandleIdentifier
-import org.apache.kyuubi.operation.OperationHandle
+import org.apache.kyuubi.operation.{OperationHandle, OperationType}
import org.apache.kyuubi.server.api.ApiRequestContext
import org.apache.kyuubi.session.SessionHandle
@@ -62,7 +61,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
@GET
@Path("{sessionHandle}")
def sessionInfo(@PathParam("sessionHandle") sessionHandleStr: String): SessionDetail = {
- val sessionHandle = getSessionHandle(sessionHandleStr)
+ val sessionHandle = parseSessionHandle(sessionHandleStr)
try {
val session = backendService.sessionManager.getSession(sessionHandle)
SessionDetail(
@@ -92,7 +91,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
def getInfo(
@PathParam("sessionHandle") sessionHandleStr: String,
@PathParam("infoType") infoType: Int): InfoDetail = {
- val sessionHandle = getSessionHandle(sessionHandleStr)
+ val sessionHandle = parseSessionHandle(sessionHandleStr)
val info = TGetInfoType.findByValue(infoType)
try {
@@ -153,7 +152,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
@DELETE
@Path("{sessionHandle}")
def closeSession(@PathParam("sessionHandle") sessionHandleStr: String): Response = {
- val sessionHandle = getSessionHandle(sessionHandleStr)
+ val sessionHandle = parseSessionHandle(sessionHandleStr)
backendService.closeSession(sessionHandle)
Response.ok().build()
}
@@ -168,7 +167,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
def executeStatement(
@PathParam("sessionHandle") sessionHandleStr: String,
request: StatementRequest): OperationHandle = {
- val sessionHandle = getSessionHandle(sessionHandleStr)
+ val sessionHandle = parseSessionHandle(sessionHandleStr)
try {
backendService.executeStatement(
sessionHandle,
@@ -189,7 +188,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
@POST
@Path("{sessionHandle}/operations/typeInfo")
def getTypeInfo(@PathParam("sessionHandle") sessionHandleStr: String): OperationHandle = {
- val sessionHandle = getSessionHandle(sessionHandleStr)
+ val sessionHandle = parseSessionHandle(sessionHandleStr)
try {
backendService.getTypeInfo(sessionHandle)
} catch {
@@ -206,7 +205,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
@POST
@Path("{sessionHandle}/operations/catalogs")
def getCatalogs(@PathParam("sessionHandle") sessionHandleStr: String): OperationHandle = {
- val sessionHandle = getSessionHandle(sessionHandleStr)
+ val sessionHandle = parseSessionHandle(sessionHandleStr)
try {
backendService.getCatalogs(sessionHandle)
} catch {
@@ -225,7 +224,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
def getSchemas(
@PathParam("sessionHandle") sessionHandleStr: String,
request: GetSchemasRequest): OperationHandle = {
- val sessionHandle = getSessionHandle(sessionHandleStr)
+ val sessionHandle = parseSessionHandle(sessionHandleStr)
try {
backendService.getSchemas(sessionHandle, request.catalogName, request.schemaName)
} catch {
@@ -244,7 +243,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
def getTables(
@PathParam("sessionHandle") sessionHandleStr: String,
request: GetTablesRequest): OperationHandle = {
- val sessionHandle = getSessionHandle(sessionHandleStr)
+ val sessionHandle = parseSessionHandle(sessionHandleStr)
try {
backendService.getTables(
sessionHandle,
@@ -266,7 +265,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
@POST
@Path("{sessionHandle}/operations/tableTypes")
def getTableTypes(@PathParam("sessionHandle") sessionHandleStr: String): OperationHandle = {
- val sessionHandle = getSessionHandle(sessionHandleStr)
+ val sessionHandle = parseSessionHandle(sessionHandleStr)
try {
backendService.getTableTypes(sessionHandle)
} catch {
@@ -285,7 +284,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
def getColumns(
@PathParam("sessionHandle") sessionHandleStr: String,
request: GetColumnsRequest): OperationHandle = {
- val sessionHandle = getSessionHandle(sessionHandleStr)
+ val sessionHandle = parseSessionHandle(sessionHandleStr)
try {
backendService.getColumns(
sessionHandle,
@@ -309,7 +308,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
def getFunctions(
@PathParam("sessionHandle") sessionHandleStr: String,
request: GetFunctionsRequest): OperationHandle = {
- val sessionHandle = getSessionHandle(sessionHandleStr)
+ val sessionHandle = parseSessionHandle(sessionHandleStr)
try {
backendService.getFunctions(
sessionHandle,
@@ -322,7 +321,72 @@ private[v1] class SessionsResource extends ApiRequestContext {
}
}
- def getSessionHandle(sessionHandleStr: String): SessionHandle = {
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.APPLICATION_JSON)),
+ description = "Close an operation")
+ @DELETE
+ @Path("{sessionHandle}/operations/{operationHandle}")
+ def closeOperation(
+ @PathParam("sessionHandle") sessionHandleStr: String,
+ @PathParam("operationHandle") operationHandleStr: String): OperationHandle = {
+ val sessionHandle = parseSessionHandle(sessionHandleStr)
+ val operationHandle = parseOperationHandle(operationHandleStr)
+ try {
+ backendService.sessionManager.getSession(sessionHandle).closeOperation(operationHandle)
+ operationHandle
+ } catch {
+ case NonFatal(_) =>
+ throw new NotFoundException(s"Error closing an operation")
+ }
+ }
+
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.APPLICATION_JSON)),
+ description =
+ "Get an operation detail with a given session identifier and operation identifier")
+ @GET
+ @Path("{sessionHandle}/operations/{operationHandle}")
+ def getOperationHandle(
+ @PathParam("sessionHandle") sessionHandleStr: String,
+ @PathParam("operationHandle") operationHandleStr: String): OperationDetail = {
+ val operationHandle = parseOperationHandle(operationHandleStr)
+ try {
+ val operation = backendService.sessionManager.operationManager.getOperation(operationHandle)
+ OperationDetail(operation.shouldRunAsync, operation.isTimedOut, operation.getStatus)
+ } catch {
+ case NonFatal(e) =>
+ throw new NotFoundException(s"Error closing an operation")
+ }
+ }
+
+ def parseOperationHandle(operationHandleStr: String): OperationHandle = {
+ try {
+ val operationHandleParts = operationHandleStr.split("\\|")
+ require(
+ operationHandleParts.size == 4,
+ s"Expected 4 parameters but found ${operationHandleParts.size}.")
+
+ val handleIdentifier = new HandleIdentifier(
+ UUID.fromString(operationHandleParts(0)),
+ UUID.fromString(operationHandleParts(1)))
+
+ val protocolVersion = TProtocolVersion.findByValue(operationHandleParts(2).toInt)
+ val operationType = OperationType.withName(operationHandleParts(3))
+ val operationHandle = new OperationHandle(handleIdentifier, operationType, protocolVersion)
+
+ operationHandle
+ } catch {
+ case NonFatal(e) =>
+ error(s"Error getting operationHandle by $operationHandleStr.", e)
+ throw new NotFoundException(s"Error getting operationHandle by $operationHandleStr.")
+ }
+ }
+
+ def parseSessionHandle(sessionHandleStr: String): SessionHandle = {
try {
val splitSessionHandle = sessionHandleStr.split("\\|")
val handleIdentifier = new HandleIdentifier(
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
index 6721370..f458446 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
@@ -17,6 +17,7 @@
package org.apache.kyuubi.server.api.v1
+import org.apache.kyuubi.operation.OperationStatus
import org.apache.kyuubi.session.SessionHandle
case class SessionOpenCount(openSessionCount: Int)
@@ -77,3 +78,8 @@ case class GetFunctionsRequest(
catalogName: String,
schemaName: String,
functionName: String)
+
+case class OperationDetail(
+ shouldRunAsync: Boolean,
+ isTimedOut: Boolean,
+ operationStatus: OperationStatus)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
index d489ee5..a84e606 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
@@ -24,7 +24,7 @@ import javax.ws.rs.core.{MediaType, Response}
import scala.concurrent.duration._
import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
-import org.apache.kyuubi.operation.{OperationHandle, OperationType}
+import org.apache.kyuubi.operation.{OperationHandle, OperationState, OperationType}
import org.apache.kyuubi.session.SessionHandle
class SessionsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
@@ -300,4 +300,92 @@ class SessionsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
assert(operationHandle.typ == OperationType.GET_FUNCTIONS)
}
}
+
+ test("test get an operation status by identifier") {
+ val requestObj = SessionOpenRequest(
+ 1,
+ "admin",
+ "123456",
+ "localhost",
+ Map("testConfig" -> "testValue"))
+
+ withKyuubiRestServer { (_, _, _, webTarget) =>
+ var response: Response = webTarget.path("api/v1/sessions")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
+
+ val sessionHandle = response.readEntity(classOf[SessionHandle])
+ val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
+ s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
+
+ val pathPrefix = s"api/v1/sessions/$serializedSessionHandle"
+
+ response = webTarget.path(s"$pathPrefix/operations/catalogs")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
+ assert(200 == response.getStatus)
+ var operationHandle = response.readEntity(classOf[OperationHandle])
+ assert(operationHandle.typ == OperationType.GET_CATALOGS)
+
+ val serializedOperationHandle = s"${operationHandle.identifier.publicId}|" +
+ s"${operationHandle.identifier.secretId}|${operationHandle.protocol.getValue}|" +
+ s"${operationHandle.typ.toString}"
+
+ response = webTarget.path(s"$pathPrefix/operations/$serializedOperationHandle")
+ .request(MediaType.APPLICATION_JSON_TYPE).get()
+ val operationDetail = response.readEntity(classOf[OperationDetail])
+ assert(200 == response.getStatus)
+ assert(operationDetail.operationStatus.state == OperationState.FINISHED)
+
+ // Invalid operationHandleStr
+ val invalidOperationHandle = s"${operationHandle.identifier.publicId}|" +
+ s"${operationHandle.identifier.secretId}|${operationHandle.protocol.getValue}|GET_TYPE_INFO"
+ response = webTarget.path(s"$pathPrefix/operations/$invalidOperationHandle")
+ .request(MediaType.APPLICATION_JSON_TYPE).get()
+ assert(404 == response.getStatus)
+
+ }
+ }
+
+ test("test close an operation") {
+ val requestObj = SessionOpenRequest(
+ 1,
+ "admin",
+ "123456",
+ "localhost",
+ Map("testConfig" -> "testValue"))
+
+ withKyuubiRestServer { (_, _, _, webTarget) =>
+ var response: Response = webTarget.path("api/v1/sessions")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
+
+ val sessionHandle = response.readEntity(classOf[SessionHandle])
+ val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
+ s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
+
+ val pathPrefix = s"api/v1/sessions/$serializedSessionHandle"
+
+ response = webTarget.path(s"$pathPrefix/operations/catalogs")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
+ assert(200 == response.getStatus)
+ var operationHandle = response.readEntity(classOf[OperationHandle])
+ assert(operationHandle.typ == OperationType.GET_CATALOGS)
+
+ val serializedOperationHandle = s"${operationHandle.identifier.publicId}|" +
+ s"${operationHandle.identifier.secretId}|${operationHandle.protocol.getValue}|" +
+ s"${operationHandle.typ.toString}"
+
+ response = webTarget.path(s"$pathPrefix/operations/$serializedOperationHandle")
+ .request(MediaType.APPLICATION_JSON_TYPE).delete()
+ assert(200 == response.getStatus)
+
+ // verify operationHandle
+ response = webTarget.path(s"$pathPrefix/operations/$serializedOperationHandle")
+ .request(MediaType.APPLICATION_JSON_TYPE).get()
+ assert(404 == response.getStatus)
+
+ }
+ }
}