You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2021/12/15 02:03:14 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #1539] Replace operationDetail with KyuubiEvent
This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 22d967e [KYUUBI #1539] Replace operationDetail with KyuubiEvent
22d967e is described below
commit 22d967e9c0315589532cb64eec1786491515d29f
Author: simon <zh...@cvte.com>
AuthorDate: Wed Dec 15 10:03:05 2021 +0800
[KYUUBI #1539] Replace operationDetail with KyuubiEvent
### _Why are the changes needed?_
#1539
Replace operationDetail with KyuubiEvent
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #1540 from simon824/event.
Closes #1539
9d4e2a04 [Simon] Merge branch 'apache:master' into event
0cd7d15a [simon] replace operationDetail with OperationEvent
adf327e8 [simon] replace operationDetail with OperationEvent
06b6ef8c [simon] replace operationDetail with OperationEvent
b0973e20 [simon] replace operationDetail with OperationEvent
cf83fafb [simon] replace operationDetail with OperationEvent
bf955fbf [simon] replace operationDetail with OperationEvent
49c17988 [simon] replace operationDetail with OperationEvent
8990b701 [simon] replace operationDetail with OperationEvent
03937c2c [simon] replace operationDetail with OperationEvent
a3981776 [simon] replace operationDetail with KyuubiEvent
fc5d747d [simon] replace operationDetail with KyuubiEvent
344eba6a [simon] Revert
b4461411 [simon] codestyle
1d758920 [simon] replace operationDetail with KyuubiEvent
258303ae [simon] init
Lead-authored-by: simon <zh...@cvte.com>
Co-authored-by: Simon <36...@qq.com>
Signed-off-by: Kent Yao <ya...@apache.org>
---
.../engine/spark/operation/ExecuteStatement.scala | 2 +-
.../engine/spark/operation/PlanOnlyStatement.scala | 2 +-
.../kyuubi/operation/AbstractOperation.scala | 2 +-
...ementEvent.scala => KyuubiOperationEvent.scala} | 38 +++++------
.../apache/kyuubi/operation/ExecuteStatement.scala | 6 +-
.../org/apache/kyuubi/operation/LaunchEngine.scala | 2 +-
.../kyuubi/server/api/v1/OperationsResource.scala | 18 ++---
.../org/apache/kyuubi/server/api/v1/dto.scala | 6 --
.../kyuubi/events/EventLoggingServiceSuite.scala | 2 +-
.../server/api/v1/OperationsResourceSuite.scala | 77 ++++++++++++----------
.../server/api/v1/SessionsResourceSuite.scala | 2 +-
11 files changed, 80 insertions(+), 77 deletions(-)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 81f123b..fff226a 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -37,7 +37,7 @@ import org.apache.kyuubi.util.ThreadUtils
class ExecuteStatement(
session: Session,
- override protected val statement: String,
+ override val statement: String,
override val shouldRunAsync: Boolean,
queryTimeout: Long,
incrementalCollect: Boolean)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
index 95cfdae..a454164 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
@@ -31,7 +31,7 @@ import org.apache.kyuubi.session.Session
*/
class PlanOnlyStatement(
session: Session,
- override protected val statement: String,
+ override val statement: String,
mode: OperationMode)
extends SparkOperation(OperationType.EXECUTE_STATEMENT, session) {
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
index 67d78e4..b89f3f7 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
@@ -58,7 +58,7 @@ abstract class AbstractOperation(opType: OperationType, session: Session)
def getBackgroundHandle: Future[_] = _backgroundHandle
- protected def statement: String = opType.toString
+ def statement: String = opType.toString
protected def setHasResultSet(hasResultSet: Boolean): Unit = {
this.hasResultSet = hasResultSet
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiStatementEvent.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala
similarity index 68%
rename from kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiStatementEvent.scala
rename to kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala
index e405954..2a9bfd7 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiStatementEvent.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala
@@ -18,30 +18,30 @@
package org.apache.kyuubi.events
import org.apache.kyuubi.Utils
-import org.apache.kyuubi.operation.{ExecuteStatement, OperationHandle}
+import org.apache.kyuubi.operation.{KyuubiOperation, OperationHandle}
/**
- * A [[KyuubiStatementEvent]] used to tracker the lifecycle of a statement at server side.
+ * A [[KyuubiOperationEvent]] used to tracker the lifecycle of an operation at server side.
* <ul>
- * <li>Statement Basis</li>
- * <li>Statement Live Status</li>
+ * <li>Operation Basis</li>
+ * <li>Operation Live Status</li>
* <li>Parent Session Id</li>
* </ul>
*
- * @param statementId the unique identifier of a single statement
- * @param remoteId the unique identifier of a single statement at engine side
+ * @param statementId the unique identifier of a single operation
+ * @param remoteId the unique identifier of a single operation at engine side
* @param statement the sql that you execute
* @param shouldRunAsync the flag indicating whether the query runs synchronously or not
* @param state the current operation state
* @param eventTime the time when the event created & logged
* @param createTime the time for changing to the current operation state
- * @param startTime the time the query start to time of this statement
+ * @param startTime the time the query start to time of this operation
* @param completeTime time time the query ends
* @param exception: caught exception if have
* @param sessionId the identifier of the parent session
* @param sessionUser the authenticated client user
*/
-case class KyuubiStatementEvent private (
+case class KyuubiOperationEvent private (
statementId: String,
remoteId: String,
statement: String,
@@ -55,25 +55,25 @@ case class KyuubiStatementEvent private (
sessionId: String,
sessionUser: String) extends KyuubiServerEvent {
- // statement events are partitioned by the date when the corresponding operations are
+ // operation events are partitioned by the date when the corresponding operations are
// created.
override def partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(createTime)) :: Nil
}
-object KyuubiStatementEvent {
+object KyuubiOperationEvent {
/**
- * Shorthand for instantiating a statement event with a [[ExecuteStatement]] instance
+ * Shorthand for instantiating a operation event with a [[KyuubiOperation]] instance
*/
- def apply(statement: ExecuteStatement): KyuubiStatementEvent = {
- val session = statement.getSession
- val status = statement.getStatus
- new KyuubiStatementEvent(
- statement.getHandle.identifier.toString,
- Option(statement.remoteOpHandle()).map(OperationHandle(_).identifier.toString).orNull,
- statement.statement,
- statement.shouldRunAsync,
+ def apply(operation: KyuubiOperation): KyuubiOperationEvent = {
+ val session = operation.getSession
+ val status = operation.getStatus
+ new KyuubiOperationEvent(
+ operation.getHandle.identifier.toString,
+ Option(operation.remoteOpHandle()).map(OperationHandle(_).identifier.toString).orNull,
+ operation.statement,
+ operation.shouldRunAsync,
status.state.name(),
status.lastModified,
status.create,
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index 398056b..da4cfc6 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -25,7 +25,7 @@ import org.apache.thrift.TException
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.events.KyuubiStatementEvent
+import org.apache.kyuubi.events.KyuubiOperationEvent
import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.log.OperationLog
@@ -38,7 +38,7 @@ class ExecuteStatement(
override val shouldRunAsync: Boolean,
queryTimeout: Long)
extends KyuubiOperation(OperationType.EXECUTE_STATEMENT, session) {
- EventLoggingService.onEvent(KyuubiStatementEvent(this))
+ EventLoggingService.onEvent(KyuubiOperationEvent(this))
final private val _operationLog: OperationLog =
if (shouldRunAsync) {
@@ -173,7 +173,7 @@ class ExecuteStatement(
override def setState(newState: OperationState): Unit = {
super.setState(newState)
- EventLoggingService.onEvent(KyuubiStatementEvent(this))
+ EventLoggingService.onEvent(KyuubiOperationEvent(this))
}
override def close(): Unit = {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
index 3e106ed..8e087df 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
@@ -23,7 +23,7 @@ import org.apache.kyuubi.session.KyuubiSessionImpl
class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: Boolean)
extends KyuubiOperation(OperationType.UNKNOWN_OPERATION, session) {
- override protected def statement: String = "LAUNCH_ENGINE"
+ override def statement: String = "LAUNCH_ENGINE"
private lazy val _operationLog: OperationLog =
if (shouldRunAsync) {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
index 003499d..17abf51 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
@@ -27,6 +27,8 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse
import io.swagger.v3.oas.annotations.tags.Tag
import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.events.KyuubiOperationEvent
+import org.apache.kyuubi.operation.KyuubiOperation
import org.apache.kyuubi.operation.OperationHandle.parseOperationHandle
import org.apache.kyuubi.server.api.ApiRequestContext
@@ -39,18 +41,18 @@ private[v1] class OperationsResource extends ApiRequestContext {
content = Array(new Content(
mediaType = MediaType.APPLICATION_JSON)),
description =
- "Get an operation detail with a given session identifier and operation identifier")
+ "Get an operation event")
@GET
- @Path("{operationHandle}")
- def getOperationDetail(
- @PathParam("operationHandle") operationHandleStr: String): OperationDetail = {
+ @Path("{operationHandle}/event")
+ def getOperationEvent(
+ @PathParam("operationHandle") operationHandleStr: String): KyuubiOperationEvent = {
try {
- val operation = backendService.sessionManager.operationManager
- .getOperation(parseOperationHandle(operationHandleStr))
- OperationDetail(operation.shouldRunAsync, operation.isTimedOut, operation.getStatus)
+ val opHandle = parseOperationHandle(operationHandleStr)
+ val operation = backendService.sessionManager.operationManager.getOperation(opHandle)
+ KyuubiOperationEvent(operation.asInstanceOf[KyuubiOperation])
} catch {
case NonFatal(_) =>
- throw new NotFoundException(s"Error getting an operation detail")
+ throw new NotFoundException(s"Error getting an operation event")
}
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
index 9e32300..25a107a 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
@@ -17,7 +17,6 @@
package org.apache.kyuubi.server.api.v1
-import org.apache.kyuubi.operation.OperationStatus
import org.apache.kyuubi.session.SessionHandle
case class SessionOpenCount(openSessionCount: Int)
@@ -79,9 +78,4 @@ case class GetFunctionsRequest(
schemaName: String,
functionName: String)
-case class OperationDetail(
- shouldRunAsync: Boolean,
- isTimedOut: Boolean,
- operationStatus: OperationStatus)
-
case class OpActionRequest(action: String)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
index fe52472..b8ecdd1 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
@@ -56,7 +56,7 @@ class EventLoggingServiceSuite extends WithKyuubiServer with HiveJDBCTestHelper
test("round-trip for logging and querying statement events for both kyuubi server and engine") {
val hostName = InetAddress.getLocalHost.getCanonicalHostName
val serverStatementEventPath =
- Paths.get(serverLogRoot, "kyuubi_statement", s"day=$currentDate", s"server-$hostName.json")
+ Paths.get(serverLogRoot, "kyuubi_operation", s"day=$currentDate", s"server-$hostName.json")
val engineStatementEventPath =
Paths.get(engineLogRoot, "spark_statement", s"day=$currentDate", "*.json")
val sql = "select timestamp'2021-06-01'"
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala
index b342b64..a9d04f5 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala
@@ -18,84 +18,91 @@
package org.apache.kyuubi.server.api.v1
import javax.ws.rs.client.{Entity, WebTarget}
-import javax.ws.rs.core.{MediaType, Response}
+import javax.ws.rs.core.MediaType
+
+import org.apache.hive.service.rpc.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2
import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
-import org.apache.kyuubi.operation.{OperationHandle, OperationState}
-import org.apache.kyuubi.session.SessionHandle
+import org.apache.kyuubi.events.KyuubiOperationEvent
+import org.apache.kyuubi.operation.{ExecuteStatement, GetCatalogs, OperationState, OperationType}
+import org.apache.kyuubi.operation.OperationType.OperationType
+import org.apache.kyuubi.server.KyuubiRestFrontendService
class OperationsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
- test("test get an operation detail by identifier") {
- withKyuubiRestServer { (_, _, _, webTarget) =>
- val opHandleStr = getOpHandleStr(webTarget, "catalogs")
+ test("test get an operation event") {
+ withKyuubiRestServer { (fe, _, _, webTarget) =>
+ val catalogsHandleStr = getOpHandleStr(fe, OperationType.GET_CATALOGS)
+ var response = webTarget.path(s"api/v1/operations/$catalogsHandleStr/event")
+ .request(MediaType.APPLICATION_JSON_TYPE).get()
+ val operationEvent = response.readEntity(classOf[KyuubiOperationEvent])
+ assert(200 == response.getStatus)
+ assert(operationEvent.state == OperationState.INITIALIZED.name())
- var response = webTarget.path(s"api/v1/operations/$opHandleStr")
+ val statementHandleStr = getOpHandleStr(fe, OperationType.EXECUTE_STATEMENT)
+ response = webTarget.path(s"api/v1/operations/$statementHandleStr/event")
.request(MediaType.APPLICATION_JSON_TYPE).get()
- val operationDetail = response.readEntity(classOf[OperationDetail])
+ val statementEvent = response.readEntity(classOf[KyuubiOperationEvent])
assert(200 == response.getStatus)
- assert(operationDetail.operationStatus.state == OperationState.FINISHED)
+ assert(statementEvent.state == OperationState.INITIALIZED.name())
// Invalid operationHandleStr
- val invalidOperationHandle = opHandleStr.replaceAll("GET_CATALOGS", "GET_TYPE_INFO")
- response = webTarget.path(s"api/v1/operations/$invalidOperationHandle")
+ val invalidOperationHandle =
+ statementHandleStr.replaceAll("EXECUTE_STATEMENT", "GET_TYPE_INFO")
+ response = webTarget.path(s"api/v1/operations/$invalidOperationHandle/event")
.request(MediaType.APPLICATION_JSON_TYPE).get()
assert(404 == response.getStatus)
-
}
}
test("test apply an action for an operation") {
- withKyuubiRestServer { (_, _, _, webTarget: WebTarget) =>
- val opHandleStr = getOpHandleStr(webTarget, "catalogs")
+ withKyuubiRestServer { (fe, _, _, webTarget: WebTarget) =>
+ val opHandleStr = getOpHandleStr(fe, OperationType.EXECUTE_STATEMENT)
var response = webTarget.path(s"api/v1/operations/$opHandleStr")
.request(MediaType.APPLICATION_JSON_TYPE)
.put(Entity.entity(OpActionRequest("cancel"), MediaType.APPLICATION_JSON_TYPE))
assert(200 == response.getStatus)
- response = webTarget.path(s"api/v1/operations/$opHandleStr")
+ response = webTarget.path(s"api/v1/operations/$opHandleStr/event")
.request(MediaType.APPLICATION_JSON_TYPE).get()
- val operationDetail = response.readEntity(classOf[OperationDetail])
- assert(operationDetail.operationStatus.state == OperationState.FINISHED ||
- operationDetail.operationStatus.state == OperationState.CANCELED)
+ val operationEvent = response.readEntity(classOf[KyuubiOperationEvent])
+ assert(operationEvent.state == OperationState.FINISHED.name() ||
+ operationEvent.state == OperationState.CANCELED.name())
response = webTarget.path(s"api/v1/operations/$opHandleStr")
.request(MediaType.APPLICATION_JSON_TYPE)
.put(Entity.entity(OpActionRequest("close"), MediaType.APPLICATION_JSON_TYPE))
assert(200 == response.getStatus)
- response = webTarget.path(s"api/v1/operations/$opHandleStr")
+ response = webTarget.path(s"api/v1/operations/$opHandleStr/event")
.request(MediaType.APPLICATION_JSON_TYPE).get()
assert(404 == response.getStatus)
}
}
- def getOpHandleStr(webTarget: WebTarget, operationType: String): String = {
- val requestObj = SessionOpenRequest(
- 1,
+ def getOpHandleStr(fe: KyuubiRestFrontendService, typ: OperationType): String = {
+ val sessionManager = fe.be.sessionManager
+ val sessionHandle = sessionManager.openSession(
+ HIVE_CLI_SERVICE_PROTOCOL_V2,
"admin",
"123456",
"localhost",
Map("testConfig" -> "testValue"))
+ val session = sessionManager.getSession(sessionHandle)
- var response: Response = webTarget.path("api/v1/sessions")
- .request(MediaType.APPLICATION_JSON_TYPE)
- .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
- val sessionHandle = response.readEntity(classOf[SessionHandle])
- val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
- s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
-
- response = webTarget.path(s"api/v1/sessions/$serializedSessionHandle/operations/$operationType")
- .request(MediaType.APPLICATION_JSON_TYPE)
- .post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
- assert(200 == response.getStatus)
- val operationHandle = response.readEntity(classOf[OperationHandle])
+ val op = typ match {
+ case OperationType.EXECUTE_STATEMENT =>
+ new ExecuteStatement(session, "show tables", true, 3000)
+ case OperationType.GET_CATALOGS =>
+ new GetCatalogs(session)
+ }
+ sessionManager.operationManager.addOperation(op)
+ val operationHandle = op.getHandle
s"${operationHandle.identifier.publicId}|" +
s"${operationHandle.identifier.secretId}|${operationHandle.protocol.getValue}|" +
s"${operationHandle.typ.toString}"
-
}
}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
index 1549e17..119843a 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
@@ -336,7 +336,7 @@ class SessionsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
assert(200 == response.getStatus)
// verify operation
- response = webTarget.path(s"api/v1/operations/$serializedOperationHandle")
+ response = webTarget.path(s"api/v1/operations/$serializedOperationHandle/event")
.request(MediaType.APPLICATION_JSON_TYPE).get()
assert(404 == response.getStatus)