You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by vi...@apache.org on 2022/02/09 03:31:21 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #1838] Clean up query results after query operations finish
This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 03c84f8 [KYUUBI #1838] Clean up query results after query operations finish
03c84f8 is described below
commit 03c84f8342d2af2c5b95c68d212511ca80a319e5
Author: Paul Lin <pa...@gmail.com>
AuthorDate: Wed Feb 9 11:31:09 2022 +0800
[KYUUBI #1838] Clean up query results after query operations finish
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
Query results would be cached in memory, and we should clean it up when all rows are fetched.
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
This is a sub-task of KPIP-2 #1322.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #1859 from link3280/feature/KYUUBI-1838.
Closes #1838
3cef0766 [Paul Lin] [KYUUBI #1838] Improve logging message syntax
c6253de8 [Paul Lin] [KYUUBI #1838] Log cleanup exceptions
e0b1866f [Paul Lin] [KYUUBI #1838] Clean up query results after query operations finish
Authored-by: Paul Lin <pa...@gmail.com>
Signed-off-by: yanghua <ya...@gmail.com>
---
.../engine/flink/operation/ExecuteStatement.scala | 68 ++++++++++++++--------
1 file changed, 43 insertions(+), 25 deletions(-)
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 51e8415..06ae1fa 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -119,33 +119,42 @@ class ExecuteStatement(
}
private def runQueryOperation(operation: QueryOperation): Unit = {
- val resultDescriptor = executor.executeQuery(sessionId, operation)
-
- val resultID = resultDescriptor.getResultId
-
- val rows = new ArrayBuffer[Row]()
- var loop = true
- while (loop) {
- Thread.sleep(50) // slow the processing down
-
- val result = executor.snapshotResult(sessionId, resultID, 2)
- result.getType match {
- case TypedResult.ResultType.PAYLOAD =>
- rows.clear()
- (1 to result.getPayload).foreach { page =>
- rows ++= executor.retrieveResultPage(resultID, page).asScala
- }
- case TypedResult.ResultType.EOS => loop = false
- case TypedResult.ResultType.EMPTY =>
+ var resultId: String = null
+ try {
+ val resultDescriptor = executor.executeQuery(sessionId, operation)
+
+ resultId = resultDescriptor.getResultId
+
+ val rows = new ArrayBuffer[Row]()
+ var loop = true
+
+ while (loop) {
+ Thread.sleep(50) // slow the processing down
+
+ val result = executor.snapshotResult(sessionId, resultId, 2)
+ result.getType match {
+ case TypedResult.ResultType.PAYLOAD =>
+ rows.clear()
+ (1 to result.getPayload).foreach { page =>
+ rows ++= executor.retrieveResultPage(resultId, page).asScala
+ }
+ case TypedResult.ResultType.EOS => loop = false
+ case TypedResult.ResultType.EMPTY =>
+ }
}
- }
- resultSet = ResultSet.builder
- .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
- .columns(resultDescriptor.getResultSchema.getColumns)
- .data(rows.toArray[Row])
- .build
- setState(OperationState.FINISHED)
+ resultSet = ResultSet.builder
+ .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+ .columns(resultDescriptor.getResultSchema.getColumns)
+ .data(rows.toArray[Row])
+ .build
+ setState(OperationState.FINISHED)
+
+ } finally {
+ if (resultId != null) {
+ cleanupQueryResult(resultId)
+ }
+ }
}
private def runSetOperation(setOperation: SetOperation): Unit = {
@@ -213,6 +222,15 @@ class ExecuteStatement(
setState(OperationState.FINISHED)
}
+ private def cleanupQueryResult(resultId: String): Unit = {
+ try {
+ executor.cancelQuery(sessionId, resultId)
+ } catch {
+ case t: Throwable =>
+ warn(s"Failed to clean result set $resultId in session $sessionId", t)
+ }
+ }
+
private def addTimeoutMonitor(): Unit = {
if (queryTimeout > 0) {
val timeoutExecutor =