You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/06/14 02:34:23 UTC

[incubator-kyuubi] branch branch-1.5 updated: [KYUUBI #2872] Catch the exception for the iterator job when incremental collect is enabled

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/branch-1.5 by this push:
     new 659ced1ae [KYUUBI #2872] Catch the exception for the iterator job when incremental collect is enabled
659ced1ae is described below

commit 659ced1aecbb11ccac871f3828826e87a6b8bcb1
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Tue Jun 14 10:33:36 2022 +0800

    [KYUUBI #2872] Catch the exception for the iterator job when incremental collect is enabled
    
    ### _Why are the changes needed?_
    
    Catch the exception for the iterator job when incremental collect is enabled.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2872 from turboFei/catch_exception.
    
    Closes #2872
    
    aa7bf2be [Fei Wang] refactor
    b207b28b [Fei Wang] catch exception
    
    Authored-by: Fei Wang <fw...@ebay.com>
    Signed-off-by: Fei Wang <fw...@ebay.com>
    (cherry picked from commit 383a7a84af1a54fffb4f075fba603017dc9a4fc4)
    Signed-off-by: Fei Wang <fw...@ebay.com>
---
 .../engine/spark/operation/SparkOperation.scala    | 32 +++++++++++++---------
 1 file changed, 19 insertions(+), 13 deletions(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index 314b2276b..dc7e8a313 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -149,20 +149,26 @@ abstract class SparkOperation(opType: OperationType, session: Session)
 
   override def getResultSetSchema: TTableSchema = SchemaHelper.toTTableSchema(resultSchema)
 
-  override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
-    validateDefaultFetchOrientation(order)
-    assertState(OperationState.FINISHED)
-    setHasResultSet(true)
-    order match {
-      case FETCH_NEXT => iter.fetchNext()
-      case FETCH_PRIOR => iter.fetchPrior(rowSetSize);
-      case FETCH_FIRST => iter.fetchAbsolute(0);
+  override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet =
+    withLocalProperties {
+      var resultRowSet: TRowSet = null
+      try {
+        validateDefaultFetchOrientation(order)
+        assertState(OperationState.FINISHED)
+        setHasResultSet(true)
+        order match {
+          case FETCH_NEXT => iter.fetchNext()
+          case FETCH_PRIOR => iter.fetchPrior(rowSetSize);
+          case FETCH_FIRST => iter.fetchAbsolute(0);
+        }
+        val taken = iter.take(rowSetSize)
+        resultRowSet =
+          RowSet.toTRowSet(taken.toList, resultSchema, getProtocolVersion, timeZone)
+        resultRowSet.setStartRowOffset(iter.getPosition)
+      } catch onError(cancel = true)
+
+      resultRowSet
     }
-    val taken = iter.take(rowSetSize)
-    val resultRowSet = RowSet.toTRowSet(taken.toList, resultSchema, getProtocolVersion, timeZone)
-    resultRowSet.setStartRowOffset(iter.getPosition)
-    resultRowSet
-  }
 
   override def shouldRunAsync: Boolean = false
 }