You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/04/14 12:46:37 UTC

[kyuubi] branch master updated: [KYUUBI #4711] JDBC client should catch task failed exception instead of NPE in the incremental mode

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new db46b5b32 [KYUUBI #4711] JDBC client should catch task failed exception instead of NPE in the incremental mode
db46b5b32 is described below

commit db46b5b320ffc3e58f84a0c3bb0d113783b9612b
Author: Fu Chen <cf...@gmail.com>
AuthorDate: Fri Apr 14 20:46:28 2023 +0800

    [KYUUBI #4711] JDBC client should catch task failed exception instead of NPE in the incremental mode
    
    ### _Why are the changes needed?_
    
    Since the job was lazily submitted in the incremental mode, the engine should not catch the task failed exception even though the operation is in the terminal state.
    
    Before this PR:
    
    ```
    0: jdbc:hive2://0.0.0.0:10009/> set kyuubi.operation.incremental.collect=true;
    +---------------------------------------+--------+
    |                  key                  | value  |
    +---------------------------------------+--------+
    | kyuubi.operation.incremental.collect  | true   |
    +---------------------------------------+--------+
    0: jdbc:hive2://0.0.0.0:10009/> SELECT raise_error('custom error message');
    Error:  (state=,code=0)
    0: jdbc:hive2://0.0.0.0:10009/>
    ```
    
    kyuubi server log
    
    ```
    2023-04-14 18:47:50.185 ERROR org.apache.kyuubi.server.KyuubiTBinaryFrontendService: Error fetching results:
    java.lang.NullPointerException: null
            at org.apache.kyuubi.server.BackendServiceMetric.$anonfun$fetchResults$1(BackendServiceMetric.scala:191) ~[classes/:?]
            at org.apache.kyuubi.metrics.MetricsSystem$.timerTracing(MetricsSystem.scala:111) ~[classes/:?]
            at org.apache.kyuubi.server.BackendServiceMetric.fetchResults(BackendServiceMetric.scala:187) ~[classes/:?]
            at org.apache.kyuubi.server.BackendServiceMetric.fetchResults$(BackendServiceMetric.scala:182) ~[classes/:?]
            at org.apache.kyuubi.server.KyuubiServer$$anon$1.fetchResults(KyuubiServer.scala:147) ~[classes/:?]
            at org.apache.kyuubi.service.TFrontendService.FetchResults(TFrontendService.scala:530) [classes/:?]
    ```
    
    After this PR:
    
    ```
    0: jdbc:hive2://0.0.0.0:10009/> set kyuubi.operation.incremental.collect=true;
    +---------------------------------------+--------+
    |                  key                  | value  |
    +---------------------------------------+--------+
    | kyuubi.operation.incremental.collect  | true   |
    +---------------------------------------+--------+
    0: jdbc:hive2://0.0.0.0:10009/> SELECT raise_error('custom error message');
    Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 3) (0.0.0.0 executor driver): java.lang.RuntimeException: custom error message
            at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
            at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
            at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
            at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
            at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
            at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
            at org.apache.spark.scheduler.Task.run(Task.scala:136)
            at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    
    Driver stacktrace:
            at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
            at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
            at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
            at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
            at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
            at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
            at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
            at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
            at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
            at scala.Option.foreach(Option.scala:407)
            at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
            at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
            at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
            at org.apache.spark.rdd.RDD.collectPartition$1(RDD.scala:1036)
            at org.apache.spark.rdd.RDD.$anonfun$toLocalIterator$3(RDD.scala:1038)
            at org.apache.spark.rdd.RDD.$anonfun$toLocalIterator$3$adapted(RDD.scala:1038)
            at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
            at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
            at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
            at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
            at org.apache.kyuubi.operation.IterableFetchIterator.hasNext(FetchIterator.scala:97)
            at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:268)
            at scala.collection.Iterator.toStream(Iterator.scala:1417)
            at scala.collection.Iterator.toStream$(Iterator.scala:1416)
            at scala.collection.AbstractIterator.toStream(Iterator.scala:1431)
            at scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:354)
            at scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:354)
            at scala.collection.AbstractIterator.toSeq(Iterator.scala:1431)
            at org.apache.kyuubi.engine.spark.operation.SparkOperation.$anonfun$getNextRowSet$1(SparkOperation.scala:265)
            at org.apache.kyuubi.engine.spark.operation.SparkOperation.$anonfun$withLocalProperties$1(SparkOperation.scala:155)
            at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
            at org.apache.kyuubi.engine.spark.operation.SparkOperation.withLocalProperties(SparkOperation.scala:139)
            at org.apache.kyuubi.engine.spark.operation.SparkOperation.getNextRowSet(SparkOperation.scala:243)
            at org.apache.kyuubi.operation.OperationManager.getOperationNextRowSet(OperationManager.scala:141)
            at org.apache.kyuubi.session.AbstractSession.fetchResults(AbstractSession.scala:240)
            at org.apache.kyuubi.service.AbstractBackendService.fetchResults(AbstractBackendService.scala:214)
            at org.apache.kyuubi.service.TFrontendService.FetchResults(TFrontendService.scala:530)
            at org.apache.hive.service.rpc.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1837)
            at org.apache.hive.service.rpc.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1822)
            at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
            at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
            at org.apache.kyuubi.service.authentication.TSetIpAddressProcessor.process(TSetIpAddressProcessor.scala:36)
            at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.RuntimeException: custom error message
            at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
            at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
            at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
            at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
            at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
            at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
            at org.apache.spark.scheduler.Task.run(Task.scala:136)
            at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
            ... 3 more (state=,code=0)
    0: jdbc:hive2://0.0.0.0:10009/>
    ```
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4711 from cfmcgrady/incremental-show-error-msg.
    
    Closes #4711
    
    66bb527ce [Fu Chen] JDBC client should catch task failed exception in the incremental mode
    
    Authored-by: Fu Chen <cf...@gmail.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../kyuubi/engine/spark/operation/SparkOperation.scala       |  5 +++--
 .../kyuubi/operation/KyuubiOperationPerConnectionSuite.scala | 12 +++++++++++-
 2 files changed, 14 insertions(+), 3 deletions(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index eb58407d4..cb7510a89 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -181,8 +181,9 @@ abstract class SparkOperation(session: Session)
           setOperationException(ke)
           throw ke
         } else if (isTerminalState(state)) {
-          setOperationException(KyuubiSQLException(errMsg))
-          warn(s"Ignore exception in terminal state with $statementId: $errMsg")
+          val ke = KyuubiSQLException(errMsg)
+          setOperationException(ke)
+          throw ke
         } else {
           error(s"Error operating $opType: $errMsg", e)
           val ke = KyuubiSQLException(s"Error operating $opType: $errMsg", e)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
index d04afbfb5..d0f1f065d 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
@@ -31,7 +31,7 @@ import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
 import org.apache.kyuubi.config.KyuubiConf.SESSION_CONF_ADVISOR
 import org.apache.kyuubi.engine.ApplicationState
 import org.apache.kyuubi.jdbc.KyuubiHiveDriver
-import org.apache.kyuubi.jdbc.hive.KyuubiConnection
+import org.apache.kyuubi.jdbc.hive.{KyuubiConnection, KyuubiSQLException}
 import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
 import org.apache.kyuubi.plugin.SessionConfAdvisor
 import org.apache.kyuubi.session.{KyuubiSessionManager, SessionType}
@@ -281,6 +281,16 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with HiveJDBCTe
       assert(rs.getString(2) === KYUUBI_VERSION)
     }
   }
+
+  test("JDBC client should catch task failed exception in the incremental mode") {
+    withJdbcStatement() { statement =>
+      statement.executeQuery(s"set ${KyuubiConf.OPERATION_INCREMENTAL_COLLECT.key}=true;")
+      val resultSet = statement.executeQuery(
+        "SELECT raise_error('client should catch this exception');")
+      val e = intercept[KyuubiSQLException](resultSet.next())
+      assert(e.getMessage.contains("client should catch this exception"))
+    }
+  }
 }
 
 class TestSessionConfAdvisor extends SessionConfAdvisor {