You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/06/14 02:34:23 UTC
[incubator-kyuubi] branch branch-1.5 updated: [KYUUBI #2872] Catch the exception for the iterator job when incremental collect is enabled
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/branch-1.5 by this push:
new 659ced1ae [KYUUBI #2872] Catch the exception for the iterator job when incremental collect is enabled
659ced1ae is described below
commit 659ced1aecbb11ccac871f3828826e87a6b8bcb1
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Tue Jun 14 10:33:36 2022 +0800
[KYUUBI #2872] Catch the exception for the iterator job when incremental collect is enabled
### _Why are the changes needed?_
Catch the exception for the iterator job when incremental collect is enabled.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2872 from turboFei/catch_exception.
Closes #2872
aa7bf2be [Fei Wang] refactor
b207b28b [Fei Wang] catch exception
Authored-by: Fei Wang <fw...@ebay.com>
Signed-off-by: Fei Wang <fw...@ebay.com>
(cherry picked from commit 383a7a84af1a54fffb4f075fba603017dc9a4fc4)
Signed-off-by: Fei Wang <fw...@ebay.com>
---
.../engine/spark/operation/SparkOperation.scala | 32 +++++++++++++---------
1 file changed, 19 insertions(+), 13 deletions(-)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index 314b2276b..dc7e8a313 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -149,20 +149,26 @@ abstract class SparkOperation(opType: OperationType, session: Session)
override def getResultSetSchema: TTableSchema = SchemaHelper.toTTableSchema(resultSchema)
- override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
- validateDefaultFetchOrientation(order)
- assertState(OperationState.FINISHED)
- setHasResultSet(true)
- order match {
- case FETCH_NEXT => iter.fetchNext()
- case FETCH_PRIOR => iter.fetchPrior(rowSetSize);
- case FETCH_FIRST => iter.fetchAbsolute(0);
+ override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet =
+ withLocalProperties {
+ var resultRowSet: TRowSet = null
+ try {
+ validateDefaultFetchOrientation(order)
+ assertState(OperationState.FINISHED)
+ setHasResultSet(true)
+ order match {
+ case FETCH_NEXT => iter.fetchNext()
+ case FETCH_PRIOR => iter.fetchPrior(rowSetSize);
+ case FETCH_FIRST => iter.fetchAbsolute(0);
+ }
+ val taken = iter.take(rowSetSize)
+ resultRowSet =
+ RowSet.toTRowSet(taken.toList, resultSchema, getProtocolVersion, timeZone)
+ resultRowSet.setStartRowOffset(iter.getPosition)
+ } catch onError(cancel = true)
+
+ resultRowSet
}
- val taken = iter.take(rowSetSize)
- val resultRowSet = RowSet.toTRowSet(taken.toList, resultSchema, getProtocolVersion, timeZone)
- resultRowSet.setStartRowOffset(iter.getPosition)
- resultRowSet
- }
override def shouldRunAsync: Boolean = false
}