You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by vi...@apache.org on 2022/02/09 03:31:21 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #1838] Clean up query results after query operations finish

This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 03c84f8  [KYUUBI #1838] Clean up query results after query operations finish
03c84f8 is described below

commit 03c84f8342d2af2c5b95c68d212511ca80a319e5
Author: Paul Lin <pa...@gmail.com>
AuthorDate: Wed Feb 9 11:31:09 2022 +0800

    [KYUUBI #1838] Clean up query results after query operations finish
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    Query results would be cached in memory, and we should clean it up when all rows are fetched.
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    
    This is a sub-task of KPIP-2 #1322.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #1859 from link3280/feature/KYUUBI-1838.
    
    Closes #1838
    
    3cef0766 [Paul Lin] [KYUUBI #1838] Improve logging message syntax
    c6253de8 [Paul Lin] [KYUUBI #1838] Log cleanup exceptions
    e0b1866f [Paul Lin] [KYUUBI #1838] Clean up query results after query operations finish
    
    Authored-by: Paul Lin <pa...@gmail.com>
    Signed-off-by: yanghua <ya...@gmail.com>
---
 .../engine/flink/operation/ExecuteStatement.scala  | 68 ++++++++++++++--------
 1 file changed, 43 insertions(+), 25 deletions(-)

diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 51e8415..06ae1fa 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -119,33 +119,42 @@ class ExecuteStatement(
   }
 
   private def runQueryOperation(operation: QueryOperation): Unit = {
-    val resultDescriptor = executor.executeQuery(sessionId, operation)
-
-    val resultID = resultDescriptor.getResultId
-
-    val rows = new ArrayBuffer[Row]()
-    var loop = true
-    while (loop) {
-      Thread.sleep(50) // slow the processing down
-
-      val result = executor.snapshotResult(sessionId, resultID, 2)
-      result.getType match {
-        case TypedResult.ResultType.PAYLOAD =>
-          rows.clear()
-          (1 to result.getPayload).foreach { page =>
-            rows ++= executor.retrieveResultPage(resultID, page).asScala
-          }
-        case TypedResult.ResultType.EOS => loop = false
-        case TypedResult.ResultType.EMPTY =>
+    var resultId: String = null
+    try {
+      val resultDescriptor = executor.executeQuery(sessionId, operation)
+
+      resultId = resultDescriptor.getResultId
+
+      val rows = new ArrayBuffer[Row]()
+      var loop = true
+
+      while (loop) {
+        Thread.sleep(50) // slow the processing down
+
+        val result = executor.snapshotResult(sessionId, resultId, 2)
+        result.getType match {
+          case TypedResult.ResultType.PAYLOAD =>
+            rows.clear()
+            (1 to result.getPayload).foreach { page =>
+              rows ++= executor.retrieveResultPage(resultId, page).asScala
+            }
+          case TypedResult.ResultType.EOS => loop = false
+          case TypedResult.ResultType.EMPTY =>
+        }
       }
-    }
 
-    resultSet = ResultSet.builder
-      .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
-      .columns(resultDescriptor.getResultSchema.getColumns)
-      .data(rows.toArray[Row])
-      .build
-    setState(OperationState.FINISHED)
+      resultSet = ResultSet.builder
+        .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+        .columns(resultDescriptor.getResultSchema.getColumns)
+        .data(rows.toArray[Row])
+        .build
+      setState(OperationState.FINISHED)
+
+    } finally {
+      if (resultId != null) {
+        cleanupQueryResult(resultId)
+      }
+    }
   }
 
   private def runSetOperation(setOperation: SetOperation): Unit = {
@@ -213,6 +222,15 @@ class ExecuteStatement(
     setState(OperationState.FINISHED)
   }
 
+  private def cleanupQueryResult(resultId: String): Unit = {
+    try {
+      executor.cancelQuery(sessionId, resultId)
+    } catch {
+      case t: Throwable =>
+        warn(s"Failed to clean result set $resultId in session $sessionId", t)
+    }
+  }
+
   private def addTimeoutMonitor(): Unit = {
     if (queryTimeout > 0) {
       val timeoutExecutor =